"""차트 데이터 API: OHLCV + 보조 데이터 (감성, 거시). UI: /code 페이지 첫 로드 시 호출 → lightweight-charts 캔들 데이터로 사용. 첫 방문 시 ohlcv_daily 가 비어 있으면 (symbols 만 시드됨, daily_batch 아직 안 돔) 즉시 pykrx 로 자동 갱신 — 사용자 입장에선 한 번의 차트 요청으로 데이터까지 충전. """ from __future__ import annotations import logging from datetime import date, timedelta from fastapi import APIRouter, HTTPException, Query from sqlalchemy import text from app.db.connection import get_engine logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/chart", tags=["chart"]) def _query_ohlcv(conn, code: str, start: date, end: date): return conn.execute( text( """ SELECT date, open, high, low, close, volume FROM ohlcv_daily WHERE code = :c AND date BETWEEN :s AND :e ORDER BY date """ ), {"c": code, "s": start, "e": end}, ).all() @router.get("/{code}") def get_chart( code: str, days: int = Query(default=180, ge=10, le=3650), include_sentiment: bool = Query(default=True), include_trading_value: bool = Query(default=True), ) -> dict: eng = get_engine() end = date.today() start = end - timedelta(days=days) with eng.connect() as conn: symbol = conn.execute( text("SELECT code, name, market FROM symbols WHERE code = :c"), {"c": code}, ).first() if not symbol: raise HTTPException(status_code=404, detail=f"unknown code: {code}") ohlcv_rows = _query_ohlcv(conn, code, start, end) if not ohlcv_rows: # 첫 방문 — ohlcv_daily 가 비어있다. pykrx 로 즉시 채우고 재조회. # refresh_code 는 별도 트랜잭션으로 ohlcv/trading_value/news 모두 commit 하므로 # 이 conn 에서 다시 SELECT 하면 새 행이 보인다. lookback 은 차트 요청 범위 + # 예측 모델 학습용 마진 (Chronos/LightGBM 이 충분한 과거 시계열을 요구) 으로 365 하한. try: from app.pipelines.refresh_one import refresh_code logger.info("chart: ohlcv_daily empty for %s — auto-refresh", code) refresh_code(symbol[0], symbol[1], lookback_days=max(days, 365)) ohlcv_rows = _query_ohlcv(conn, code, start, end) except Exception: # noqa: BLE001 logger.exception("chart: auto-refresh failed for %s", code) ohlcv = [ { "date": str(r[0]), "open": float(r[1]) if r[1] is not None else None, "high": float(r[2]) if r[2] is not None else None, "low": float(r[3]) if r[3] is not None else None, "close": float(r[4]) if r[4] is not None else None, "volume": int(r[5]) if r[5] is not None else None, } for r in ohlcv_rows ] sentiment: list[dict] = [] if include_sentiment: try: s_rows = conn.execute( text( """ SELECT date, n_articles, mean_score, weighted_score FROM v_sentiment_daily WHERE code = :c AND date BETWEEN :s AND :e ORDER BY date """ ), {"c": code, "s": start, "e": end}, ).all() sentiment = [ { "date": str(r[0]), "n_articles": int(r[1]) if r[1] is not None else 0, "mean_score": float(r[2]) if r[2] is not None else None, "weighted_score": float(r[3]) if r[3] is not None else None, } for r in s_rows ] except Exception: # noqa: BLE001 # v_sentiment_daily 뷰 아직 없을 수 있음 (마이그레이션 미실행) sentiment = [] trading: list[dict] = [] if include_trading_value: tv_rows = conn.execute( text( """ SELECT date, foreign_net, institution_net, individual_net FROM trading_value_daily WHERE code = :c AND date BETWEEN :s AND :e ORDER BY date """ ), {"c": code, "s": start, "e": end}, ).all() trading = [ { "date": str(r[0]), "foreign_net": float(r[1]) if r[1] is not None else None, "institution_net": float(r[2]) if r[2] is not None else None, "individual_net": float(r[3]) if r[3] is not None else None, } for r in tv_rows ] return { "code": symbol[0], "name": symbol[1], "market": symbol[2], "range": {"from": str(start), "to": str(end)}, "ohlcv": ohlcv, "sentiment": sentiment, "trading_value": trading, }