"""차트 데이터 API: OHLCV + 보조 데이터 (감성, 거시). UI: /code 페이지가 호출 → lightweight-charts 캔들 데이터로 사용. interval 파라미터로 캔들 단위 선택: - "10m" : 당일 10분봉. ohlcv_1m 을 time_bucket 으로 10분 단위 집계. stale (>10분) 이면 KIS inquire-time-itemchartprice 로 즉시 보충. - "1d" : 일봉. ohlcv_daily 직접 조회. 비어있으면 pykrx auto-refresh. - "1w" : 주봉. ohlcv_daily 를 date_trunc('week') 로 집계. - "1mo" : 월봉. ohlcv_daily 를 date_trunc('month') 로 집계. 10m 외에는 date 필드가 'YYYY-MM-DD' ISO date 문자열, 10m 일 때는 'YYYY-MM-DDTHH:MM:SS' ISO datetime (KST) 으로 통일. """ from __future__ import annotations import logging from datetime import date, datetime, time as dtime, timedelta, timezone from fastapi import APIRouter, HTTPException, Query from sqlalchemy import text from app.db.connection import get_engine logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/chart", tags=["chart"]) ALLOWED_INTERVALS = ("10m", "1d", "1w", "1mo") KST = timezone(timedelta(hours=9)) def _query_ohlcv_daily(conn, code: str, start: date, end: date): return conn.execute( text( """ SELECT date, open, high, low, close, volume FROM ohlcv_daily WHERE code = :c AND date BETWEEN :s AND :e ORDER BY date """ ), {"c": code, "s": start, "e": end}, ).all() def _query_ohlcv_bucketed(conn, code: str, start: date, end: date, trunc: str): """1d → 1w/1mo 집계. date_trunc 로 bucket 잡고, 첫/마지막/최고/최저/합 집계. open=bucket 첫 거래일 시가, close=마지막 거래일 종가. PostgreSQL window 함수로 구한다. """ return conn.execute( text( f""" WITH base AS ( SELECT date_trunc(:trunc, date)::date AS bucket, date, open, high, low, close, volume FROM ohlcv_daily WHERE code = :c AND date BETWEEN :s AND :e ), ranked AS ( SELECT bucket, date, open, high, low, close, volume, ROW_NUMBER() OVER (PARTITION BY bucket ORDER BY date ASC) AS rn_first, ROW_NUMBER() OVER (PARTITION BY bucket ORDER BY date DESC) AS rn_last FROM base ) SELECT bucket AS date, MAX(open) FILTER (WHERE rn_first = 1) AS open, MAX(high) AS high, MIN(low) AS low, MAX(close) FILTER (WHERE rn_last = 1) AS close, SUM(volume) AS volume FROM ranked GROUP BY bucket ORDER BY bucket """ ), {"c": code, "s": start, "e": end, "trunc": trunc}, ).all() def _query_ohlcv_10m(conn, code: str, start_ts: datetime, end_ts: datetime): """ohlcv_1m → 10분봉. TimescaleDB time_bucket 으로 10분 단위 집계. first()/last() 는 TimescaleDB 의 집계함수. """ return conn.execute( text( """ SELECT time_bucket(INTERVAL '10 minutes', ts) AS bucket, first(open, ts) AS open, MAX(high) AS high, MIN(low) AS low, last(close, ts) AS close, SUM(volume) AS volume FROM ohlcv_1m WHERE code = :c AND ts >= :s AND ts < :e GROUP BY bucket ORDER BY bucket """ ), {"c": code, "s": start_ts, "e": end_ts}, ).all() def _upsert_ohlcv_1m(conn, code: str, rows: list[dict]) -> int: """KIS 분봉 응답을 ohlcv_1m 에 UPSERT. 같은 (code, ts) 는 덮어쓰기 (장중 갱신용).""" if not rows: return 0 conn.execute( text( """ INSERT INTO ohlcv_1m (code, ts, open, high, low, close, volume) VALUES (:code, :ts, :open, :high, :low, :close, :volume) ON CONFLICT (code, ts) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close, volume = EXCLUDED.volume """ ), [{"code": code, **r} for r in rows], ) return len(rows) def _intraday_window_today() -> tuple[datetime, datetime]: """오늘 KST 의 장 시간대 윈도우 (08:50 ~ 16:00). 토/일은 직전 영업일.""" now = datetime.now(KST) d = now.date() # 주말이면 직전 금요일로 while d.weekday() >= 5: d -= timedelta(days=1) start = datetime.combine(d, dtime(8, 50), tzinfo=KST) end = datetime.combine(d, dtime(16, 0), tzinfo=KST) return start, end def _ensure_intraday_fresh(conn, code: str) -> str: """오늘 윈도우의 ohlcv_1m 을 필요한 만큼만 KIS 에서 보충. 분기: - 주말: KIS 분봉 endpoint 는 "당일" 만 지원 → 호출하지 않음. 'weekend'. - 장외 (평일 09:00 이전 또는 15:35 이후) + 이미 오늘 데이터 있음: 'cached_closed'. (마감 후엔 데이터 늘지 않으므로 KIS 호출 의미 없음) - 장중 + last_ts 가 10분 이내: 'fresh' (DB 만 읽음) - 그 외 (장중 stale / 장 막 끝나서 마지막 마감 데이터 1회 필요 / 캐시 비어있음): last_ts+1m ~ now 사이의 빈 구간을 fetch_minute_range 로 페이지네이션 채움. DB 캐시가 그날 데이터를 이미 갖고 있으면 자연히 호출 1~2 페이지로 끝. Returns: 'fresh' | 'refreshed' | 'cached_closed' | 'weekend' | 'skipped_missing_key' | 'failed' | 'no_data' """ now = datetime.now(KST) if now.weekday() >= 5: return "weekend" win_start, win_end = _intraday_window_today() last_ts = conn.execute( text( "SELECT MAX(ts) FROM ohlcv_1m WHERE code = :c AND ts >= :s AND ts < :e" ), {"c": code, "s": win_start, "e": win_end}, ).scalar() market_open = dtime(9, 0) market_close_buffer = dtime(15, 35) in_session = market_open <= now.time() <= market_close_buffer # 장외이고 이미 오늘 데이터 있음 → 추가 호출 불필요 if not in_session and last_ts is not None: return "cached_closed" # 장중 + 10분 이내 갱신 → 추가 호출 불필요 if in_session and last_ts is not None and (now - last_ts) < timedelta(minutes=10): return "fresh" # fetch 윈도우 = [last_ts+1m or win_start, min(now, win_end)] fetch_to = min(now, win_end) if last_ts is not None and last_ts >= win_start: fetch_from = last_ts + timedelta(minutes=1) else: fetch_from = win_start if fetch_from >= fetch_to: return "fresh" try: from app.fetch.kis import SkippedMissingKey, fetch_minute_range except Exception: # noqa: BLE001 return "failed" try: rows = fetch_minute_range(code, fetch_from, fetch_to) except SkippedMissingKey: return "skipped_missing_key" except Exception: # noqa: BLE001 logger.exception("intraday refresh failed for %s", code) return "failed" if not rows: return "no_data" _upsert_ohlcv_1m(conn, code, rows) conn.commit() return "refreshed" @router.get("/{code}") def get_chart( code: str, days: int = Query(default=180, ge=1, le=3650), interval: str = Query(default="1d"), include_sentiment: bool = Query(default=True), include_trading_value: bool = Query(default=True), ) -> dict: if interval not in ALLOWED_INTERVALS: raise HTTPException(status_code=400, detail=f"interval must be one of {ALLOWED_INTERVALS}") eng = get_engine() end = date.today() start = end - timedelta(days=days) with eng.connect() as conn: symbol = conn.execute( text("SELECT code, name, market FROM symbols WHERE code = :c"), {"c": code}, ).first() if not symbol: raise HTTPException(status_code=404, detail=f"unknown code: {code}") ohlcv: list[dict] = [] intraday_status: str | None = None if interval == "10m": intraday_status = _ensure_intraday_fresh(conn, code) win_start, win_end = _intraday_window_today() rows = _query_ohlcv_10m(conn, code, win_start, win_end) ohlcv = [ { # KST aware datetime → ISO datetime. 프론트에서 Date 파싱. "date": (r[0].astimezone(KST) if r[0].tzinfo else r[0].replace(tzinfo=KST)) .strftime("%Y-%m-%dT%H:%M:%S"), "open": float(r[1]) if r[1] is not None else None, "high": float(r[2]) if r[2] is not None else None, "low": float(r[3]) if r[3] is not None else None, "close": float(r[4]) if r[4] is not None else None, "volume": int(r[5]) if r[5] is not None else None, } for r in rows ] else: if interval == "1d": rows = _query_ohlcv_daily(conn, code, start, end) elif interval == "1w": rows = _query_ohlcv_bucketed(conn, code, start, end, "week") else: # "1mo" rows = _query_ohlcv_bucketed(conn, code, start, end, "month") if not rows and interval == "1d": # 첫 방문 → pykrx auto-refresh. try: from app.pipelines.refresh_one import refresh_code logger.info("chart: ohlcv_daily empty for %s — auto-refresh", code) refresh_code(symbol[0], symbol[1], lookback_days=max(days, 365)) rows = _query_ohlcv_daily(conn, code, start, end) except Exception: # noqa: BLE001 logger.exception("chart: auto-refresh failed for %s", code) ohlcv = [ { "date": str(r[0]), "open": float(r[1]) if r[1] is not None else None, "high": float(r[2]) if r[2] is not None else None, "low": float(r[3]) if r[3] is not None else None, "close": float(r[4]) if r[4] is not None else None, "volume": int(r[5]) if r[5] is not None else None, } for r in rows ] sentiment: list[dict] = [] if include_sentiment and interval != "10m": try: s_rows = conn.execute( text( """ SELECT date, n_articles, mean_score, weighted_score FROM v_sentiment_daily WHERE code = :c AND date BETWEEN :s AND :e ORDER BY date """ ), {"c": code, "s": start, "e": end}, ).all() sentiment = [ { "date": str(r[0]), "n_articles": int(r[1]) if r[1] is not None else 0, "mean_score": float(r[2]) if r[2] is not None else None, "weighted_score": float(r[3]) if r[3] is not None else None, } for r in s_rows ] except Exception: # noqa: BLE001 sentiment = [] trading: list[dict] = [] if include_trading_value and interval != "10m": tv_rows = conn.execute( text( """ SELECT date, foreign_net, institution_net, individual_net FROM trading_value_daily WHERE code = :c AND date BETWEEN :s AND :e ORDER BY date """ ), {"c": code, "s": start, "e": end}, ).all() trading = [ { "date": str(r[0]), "foreign_net": float(r[1]) if r[1] is not None else None, "institution_net": float(r[2]) if r[2] is not None else None, "individual_net": float(r[3]) if r[3] is not None else None, } for r in tv_rows ] return { "code": symbol[0], "name": symbol[1], "market": symbol[2], "interval": interval, "intraday_status": intraday_status, "range": {"from": str(start), "to": str(end)}, "today": date.today().isoformat(), "ohlcv": ohlcv, "sentiment": sentiment, "trading_value": trading, }