reviewer 지적사항 반영: 1. KIS 분봉이 한 번에 30개만 와서 10분봉이 최대 3개만 나오던 문제 → fetch_minute_range() 추가. FID_INPUT_HOUR_1 을 30분씩 후퇴시키며 페이지네이션, 중복 ts 자연 dedupe, max_pages=20 으로 무한루프 방지. _ensure_intraday_fresh 는 last_ts+1m ~ now 빈 구간만 채우므로 평소엔 1~2 페이지로 끝남. 2. 장외/주말에 매번 KIS 를 때리던 문제 → - 주말: 'weekend' 반환, KIS 안 부름 (분봉 endpoint 는 당일만 지원) - 평일 장외 + 오늘 데이터 있음: 'cached_closed' 반환 - 장중 + 10분 이내: 'fresh' 반환 토큰/조회 제한 다시 밟지 않음. 3. 프론트 horizon 입력 60 vs 백엔드 30 불일치 → PredictionPanel 의 cap 을 30 으로 맞춤. 백엔드 predict.py 의 학습/검증 범위와 일치. placeholder 도 '1~30' 으로 명시.
341 lines
13 KiB
Python
341 lines
13 KiB
Python
"""차트 데이터 API: OHLCV + 보조 데이터 (감성, 거시).
|
|
|
|
UI: /code 페이지가 호출 → lightweight-charts 캔들 데이터로 사용.
|
|
|
|
interval 파라미터로 캔들 단위 선택:
|
|
- "10m" : 당일 10분봉. ohlcv_1m 을 time_bucket 으로 10분 단위 집계.
|
|
stale (>10분) 이면 KIS inquire-time-itemchartprice 로 즉시 보충.
|
|
- "1d" : 일봉. ohlcv_daily 직접 조회. 비어있으면 pykrx auto-refresh.
|
|
- "1w" : 주봉. ohlcv_daily 를 date_trunc('week') 로 집계.
|
|
- "1mo" : 월봉. ohlcv_daily 를 date_trunc('month') 로 집계.
|
|
|
|
10m 외에는 date 필드가 'YYYY-MM-DD' ISO date 문자열,
|
|
10m 일 때는 'YYYY-MM-DDTHH:MM:SS' ISO datetime (KST) 으로 통일.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import date, datetime, time as dtime, timedelta, timezone
|
|
|
|
from fastapi import APIRouter, HTTPException, Query
|
|
from sqlalchemy import text
|
|
|
|
from app.db.connection import get_engine
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/api/chart", tags=["chart"])
|
|
|
|
ALLOWED_INTERVALS = ("10m", "1d", "1w", "1mo")
|
|
KST = timezone(timedelta(hours=9))
|
|
|
|
|
|
def _query_ohlcv_daily(conn, code: str, start: date, end: date):
|
|
return conn.execute(
|
|
text(
|
|
"""
|
|
SELECT date, open, high, low, close, volume
|
|
FROM ohlcv_daily
|
|
WHERE code = :c AND date BETWEEN :s AND :e
|
|
ORDER BY date
|
|
"""
|
|
),
|
|
{"c": code, "s": start, "e": end},
|
|
).all()
|
|
|
|
|
|
def _query_ohlcv_bucketed(conn, code: str, start: date, end: date, trunc: str):
|
|
"""1d → 1w/1mo 집계. date_trunc 로 bucket 잡고, 첫/마지막/최고/최저/합 집계.
|
|
|
|
open=bucket 첫 거래일 시가, close=마지막 거래일 종가. PostgreSQL window 함수로 구한다.
|
|
"""
|
|
return conn.execute(
|
|
text(
|
|
f"""
|
|
WITH base AS (
|
|
SELECT date_trunc(:trunc, date)::date AS bucket,
|
|
date, open, high, low, close, volume
|
|
FROM ohlcv_daily
|
|
WHERE code = :c AND date BETWEEN :s AND :e
|
|
),
|
|
ranked AS (
|
|
SELECT bucket, date, open, high, low, close, volume,
|
|
ROW_NUMBER() OVER (PARTITION BY bucket ORDER BY date ASC) AS rn_first,
|
|
ROW_NUMBER() OVER (PARTITION BY bucket ORDER BY date DESC) AS rn_last
|
|
FROM base
|
|
)
|
|
SELECT bucket AS date,
|
|
MAX(open) FILTER (WHERE rn_first = 1) AS open,
|
|
MAX(high) AS high,
|
|
MIN(low) AS low,
|
|
MAX(close) FILTER (WHERE rn_last = 1) AS close,
|
|
SUM(volume) AS volume
|
|
FROM ranked
|
|
GROUP BY bucket
|
|
ORDER BY bucket
|
|
"""
|
|
),
|
|
{"c": code, "s": start, "e": end, "trunc": trunc},
|
|
).all()
|
|
|
|
|
|
def _query_ohlcv_10m(conn, code: str, start_ts: datetime, end_ts: datetime):
|
|
"""ohlcv_1m → 10분봉. TimescaleDB time_bucket 으로 10분 단위 집계.
|
|
|
|
first()/last() 는 TimescaleDB 의 집계함수.
|
|
"""
|
|
return conn.execute(
|
|
text(
|
|
"""
|
|
SELECT time_bucket(INTERVAL '10 minutes', ts) AS bucket,
|
|
first(open, ts) AS open,
|
|
MAX(high) AS high,
|
|
MIN(low) AS low,
|
|
last(close, ts) AS close,
|
|
SUM(volume) AS volume
|
|
FROM ohlcv_1m
|
|
WHERE code = :c AND ts >= :s AND ts < :e
|
|
GROUP BY bucket
|
|
ORDER BY bucket
|
|
"""
|
|
),
|
|
{"c": code, "s": start_ts, "e": end_ts},
|
|
).all()
|
|
|
|
|
|
def _upsert_ohlcv_1m(conn, code: str, rows: list[dict]) -> int:
|
|
"""KIS 분봉 응답을 ohlcv_1m 에 UPSERT. 같은 (code, ts) 는 덮어쓰기 (장중 갱신용)."""
|
|
if not rows:
|
|
return 0
|
|
conn.execute(
|
|
text(
|
|
"""
|
|
INSERT INTO ohlcv_1m (code, ts, open, high, low, close, volume)
|
|
VALUES (:code, :ts, :open, :high, :low, :close, :volume)
|
|
ON CONFLICT (code, ts) DO UPDATE SET
|
|
open = EXCLUDED.open,
|
|
high = EXCLUDED.high,
|
|
low = EXCLUDED.low,
|
|
close = EXCLUDED.close,
|
|
volume = EXCLUDED.volume
|
|
"""
|
|
),
|
|
[{"code": code, **r} for r in rows],
|
|
)
|
|
return len(rows)
|
|
|
|
|
|
def _intraday_window_today() -> tuple[datetime, datetime]:
|
|
"""오늘 KST 의 장 시간대 윈도우 (08:50 ~ 16:00). 토/일은 직전 영업일."""
|
|
now = datetime.now(KST)
|
|
d = now.date()
|
|
# 주말이면 직전 금요일로
|
|
while d.weekday() >= 5:
|
|
d -= timedelta(days=1)
|
|
start = datetime.combine(d, dtime(8, 50), tzinfo=KST)
|
|
end = datetime.combine(d, dtime(16, 0), tzinfo=KST)
|
|
return start, end
|
|
|
|
|
|
def _ensure_intraday_fresh(conn, code: str) -> str:
|
|
"""오늘 윈도우의 ohlcv_1m 을 필요한 만큼만 KIS 에서 보충.
|
|
|
|
분기:
|
|
- 주말: KIS 분봉 endpoint 는 "당일" 만 지원 → 호출하지 않음. 'weekend'.
|
|
- 장외 (평일 09:00 이전 또는 15:35 이후) + 이미 오늘 데이터 있음: 'cached_closed'.
|
|
(마감 후엔 데이터 늘지 않으므로 KIS 호출 의미 없음)
|
|
- 장중 + last_ts 가 10분 이내: 'fresh' (DB 만 읽음)
|
|
- 그 외 (장중 stale / 장 막 끝나서 마지막 마감 데이터 1회 필요 / 캐시 비어있음):
|
|
last_ts+1m ~ now 사이의 빈 구간을 fetch_minute_range 로 페이지네이션 채움.
|
|
DB 캐시가 그날 데이터를 이미 갖고 있으면 자연히 호출 1~2 페이지로 끝.
|
|
|
|
Returns: 'fresh' | 'refreshed' | 'cached_closed' | 'weekend' |
|
|
'skipped_missing_key' | 'failed' | 'no_data'
|
|
"""
|
|
now = datetime.now(KST)
|
|
if now.weekday() >= 5:
|
|
return "weekend"
|
|
|
|
win_start, win_end = _intraday_window_today()
|
|
last_ts = conn.execute(
|
|
text(
|
|
"SELECT MAX(ts) FROM ohlcv_1m WHERE code = :c AND ts >= :s AND ts < :e"
|
|
),
|
|
{"c": code, "s": win_start, "e": win_end},
|
|
).scalar()
|
|
|
|
market_open = dtime(9, 0)
|
|
market_close_buffer = dtime(15, 35)
|
|
in_session = market_open <= now.time() <= market_close_buffer
|
|
|
|
# 장외이고 이미 오늘 데이터 있음 → 추가 호출 불필요
|
|
if not in_session and last_ts is not None:
|
|
return "cached_closed"
|
|
|
|
# 장중 + 10분 이내 갱신 → 추가 호출 불필요
|
|
if in_session and last_ts is not None and (now - last_ts) < timedelta(minutes=10):
|
|
return "fresh"
|
|
|
|
# fetch 윈도우 = [last_ts+1m or win_start, min(now, win_end)]
|
|
fetch_to = min(now, win_end)
|
|
if last_ts is not None and last_ts >= win_start:
|
|
fetch_from = last_ts + timedelta(minutes=1)
|
|
else:
|
|
fetch_from = win_start
|
|
if fetch_from >= fetch_to:
|
|
return "fresh"
|
|
|
|
try:
|
|
from app.fetch.kis import SkippedMissingKey, fetch_minute_range
|
|
except Exception: # noqa: BLE001
|
|
return "failed"
|
|
|
|
try:
|
|
rows = fetch_minute_range(code, fetch_from, fetch_to)
|
|
except SkippedMissingKey:
|
|
return "skipped_missing_key"
|
|
except Exception: # noqa: BLE001
|
|
logger.exception("intraday refresh failed for %s", code)
|
|
return "failed"
|
|
|
|
if not rows:
|
|
return "no_data"
|
|
_upsert_ohlcv_1m(conn, code, rows)
|
|
conn.commit()
|
|
return "refreshed"
|
|
|
|
|
|
@router.get("/{code}")
|
|
def get_chart(
|
|
code: str,
|
|
days: int = Query(default=180, ge=1, le=3650),
|
|
interval: str = Query(default="1d"),
|
|
include_sentiment: bool = Query(default=True),
|
|
include_trading_value: bool = Query(default=True),
|
|
) -> dict:
|
|
if interval not in ALLOWED_INTERVALS:
|
|
raise HTTPException(status_code=400, detail=f"interval must be one of {ALLOWED_INTERVALS}")
|
|
|
|
eng = get_engine()
|
|
end = date.today()
|
|
start = end - timedelta(days=days)
|
|
with eng.connect() as conn:
|
|
symbol = conn.execute(
|
|
text("SELECT code, name, market FROM symbols WHERE code = :c"),
|
|
{"c": code},
|
|
).first()
|
|
if not symbol:
|
|
raise HTTPException(status_code=404, detail=f"unknown code: {code}")
|
|
|
|
ohlcv: list[dict] = []
|
|
intraday_status: str | None = None
|
|
|
|
if interval == "10m":
|
|
intraday_status = _ensure_intraday_fresh(conn, code)
|
|
win_start, win_end = _intraday_window_today()
|
|
rows = _query_ohlcv_10m(conn, code, win_start, win_end)
|
|
ohlcv = [
|
|
{
|
|
# KST aware datetime → ISO datetime. 프론트에서 Date 파싱.
|
|
"date": (r[0].astimezone(KST) if r[0].tzinfo else r[0].replace(tzinfo=KST))
|
|
.strftime("%Y-%m-%dT%H:%M:%S"),
|
|
"open": float(r[1]) if r[1] is not None else None,
|
|
"high": float(r[2]) if r[2] is not None else None,
|
|
"low": float(r[3]) if r[3] is not None else None,
|
|
"close": float(r[4]) if r[4] is not None else None,
|
|
"volume": int(r[5]) if r[5] is not None else None,
|
|
}
|
|
for r in rows
|
|
]
|
|
else:
|
|
if interval == "1d":
|
|
rows = _query_ohlcv_daily(conn, code, start, end)
|
|
elif interval == "1w":
|
|
rows = _query_ohlcv_bucketed(conn, code, start, end, "week")
|
|
else: # "1mo"
|
|
rows = _query_ohlcv_bucketed(conn, code, start, end, "month")
|
|
|
|
if not rows and interval == "1d":
|
|
# 첫 방문 → pykrx auto-refresh.
|
|
try:
|
|
from app.pipelines.refresh_one import refresh_code
|
|
logger.info("chart: ohlcv_daily empty for %s — auto-refresh", code)
|
|
refresh_code(symbol[0], symbol[1], lookback_days=max(days, 365))
|
|
rows = _query_ohlcv_daily(conn, code, start, end)
|
|
except Exception: # noqa: BLE001
|
|
logger.exception("chart: auto-refresh failed for %s", code)
|
|
|
|
ohlcv = [
|
|
{
|
|
"date": str(r[0]),
|
|
"open": float(r[1]) if r[1] is not None else None,
|
|
"high": float(r[2]) if r[2] is not None else None,
|
|
"low": float(r[3]) if r[3] is not None else None,
|
|
"close": float(r[4]) if r[4] is not None else None,
|
|
"volume": int(r[5]) if r[5] is not None else None,
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
sentiment: list[dict] = []
|
|
if include_sentiment and interval != "10m":
|
|
try:
|
|
s_rows = conn.execute(
|
|
text(
|
|
"""
|
|
SELECT date, n_articles, mean_score, weighted_score
|
|
FROM v_sentiment_daily
|
|
WHERE code = :c AND date BETWEEN :s AND :e
|
|
ORDER BY date
|
|
"""
|
|
),
|
|
{"c": code, "s": start, "e": end},
|
|
).all()
|
|
sentiment = [
|
|
{
|
|
"date": str(r[0]),
|
|
"n_articles": int(r[1]) if r[1] is not None else 0,
|
|
"mean_score": float(r[2]) if r[2] is not None else None,
|
|
"weighted_score": float(r[3]) if r[3] is not None else None,
|
|
}
|
|
for r in s_rows
|
|
]
|
|
except Exception: # noqa: BLE001
|
|
sentiment = []
|
|
|
|
trading: list[dict] = []
|
|
if include_trading_value and interval != "10m":
|
|
tv_rows = conn.execute(
|
|
text(
|
|
"""
|
|
SELECT date, foreign_net, institution_net, individual_net
|
|
FROM trading_value_daily
|
|
WHERE code = :c AND date BETWEEN :s AND :e
|
|
ORDER BY date
|
|
"""
|
|
),
|
|
{"c": code, "s": start, "e": end},
|
|
).all()
|
|
trading = [
|
|
{
|
|
"date": str(r[0]),
|
|
"foreign_net": float(r[1]) if r[1] is not None else None,
|
|
"institution_net": float(r[2]) if r[2] is not None else None,
|
|
"individual_net": float(r[3]) if r[3] is not None else None,
|
|
}
|
|
for r in tv_rows
|
|
]
|
|
|
|
return {
|
|
"code": symbol[0],
|
|
"name": symbol[1],
|
|
"market": symbol[2],
|
|
"interval": interval,
|
|
"intraday_status": intraday_status,
|
|
"range": {"from": str(start), "to": str(end)},
|
|
"today": date.today().isoformat(),
|
|
"ohlcv": ohlcv,
|
|
"sentiment": sentiment,
|
|
"trading_value": trading,
|
|
}
|