"""prediction_outcomes 매칭 배치. 평일 16:30 KST 에 실행. 다음 거래일 장 종료 후 (KRX 정규장 마감 15:30) 의 확정 종가가 16:00~16:30 사이 pykrx 로 들어온 뒤, 매칭 미해결 예측을 실제 종가와 매칭한다. 이월/공휴일 정책: target_date 가 calendar date 라서 비거래일이면 ohlcv_daily 에 행이 없다. 그래서 `target_date <= today` 인 미해결 행을 전부 후보로 잡고, 각 행마다 `target_date <= ohlcv_daily.date <= today` 범위의 최초 거래일 종가로 매칭한다 (=다음 거래일로 자동 이월). shadow prediction 도 같은 방식으로 매칭한다 (user_triggered 필터 없음). """ from __future__ import annotations import json import logging from dataclasses import dataclass from datetime import date, timedelta from typing import Any from sqlalchemy import text from app.db.connection import get_engine logger = logging.getLogger(__name__) # direction_hit 판정 시 ±0.3% 이내는 flat. (features 의 FLAT_BAND 와 동일) FLAT_BAND = 0.003 @dataclass class MatchSummary: today: str candidates: int matched: int skipped_no_actual: int already_resolved: int def _direction_label(ret: float) -> str: if ret > FLAT_BAND: return "up" if ret < -FLAT_BAND: return "down" return "flat" def match_up_to(today: date) -> MatchSummary: """target_date <= today 인 모든 미해결 예측을 매칭. 각 행마다 ohlcv_daily 에서 target_date 이상, today 이하 범위의 최초 거래일 종가를 actual_close 로 사용 — 공휴일/주말 이월 자연 처리. """ eng = get_engine() with eng.begin() as conn: candidate_rows = conn.execute( text( """ SELECT p.id, p.code, p.base_date, p.target_date, p.horizon, p.point_forecast, p.direction, p.model FROM predictions p LEFT JOIN prediction_outcomes po ON po.prediction_id = p.id WHERE p.target_date <= :today AND po.prediction_id IS NULL """ ), {"today": today}, ).all() candidates = len(candidate_rows) if not candidates: return MatchSummary(str(today), 0, 0, 0, 0) matched = 0 skipped = 0 already = 0 for pid, code, base_date, target_date, horizon, point_forecast, pred_dir, model in candidate_rows: # 첫 거래일 종가 (target_date <= date <= today) actual_row = conn.execute( text( """ SELECT date, close FROM ohlcv_daily WHERE code = :c AND date >= :td AND date <= :today ORDER BY date ASC LIMIT 1 """ ), {"c": code, "td": target_date, "today": today}, ).first() if not actual_row or actual_row[1] is None: skipped += 1 continue actual_date = actual_row[0] actual = float(actual_row[1]) base_close_row = conn.execute( text("SELECT close FROM ohlcv_daily WHERE code = :c AND date = :d"), {"c": code, "d": base_date}, ).first() if not base_close_row or base_close_row[0] is None: skipped += 1 continue base_close = float(base_close_row[0]) actual_ret = actual / base_close - 1.0 actual_dir = _direction_label(actual_ret) dir_hit = (pred_dir == actual_dir) abs_err = abs(float(point_forecast) - actual) if point_forecast is not None else None try: conn.execute( text( """ INSERT INTO prediction_outcomes (prediction_id, code, target_date, horizon, model, predicted_close, actual_close, actual_return, direction_hit, abs_error) VALUES (:pid, :code, :d, :h, :m, :pc, :ac, :ar, :dh, :ae) ON CONFLICT (prediction_id) DO NOTHING """ ), { "pid": pid, "code": code, # 실제 매칭된 거래일 (이월된 경우 target_date 와 다를 수 있음) "d": actual_date, "h": horizon, "m": model, "pc": float(point_forecast) if point_forecast is not None else None, "ac": actual, "ar": float(actual_ret), "dh": bool(dir_hit), "ae": float(abs_err) if abs_err is not None else None, }, ) matched += 1 except Exception as exc: # noqa: BLE001 logger.warning("match insert failed pid=%s: %s", pid, exc) already += 1 return MatchSummary( today=str(today), candidates=candidates, matched=matched, skipped_no_actual=skipped, already_resolved=already, ) # 하위 호환 alias — 이전 시그니처를 쓰던 호출자 (예: 단일 날짜 매칭 테스트) def match_for_date(d: date) -> MatchSummary: """legacy: target_date == d 만 매칭하던 동작 → 이제 target_date <= d 전체 처리.""" return match_up_to(d) def match_today() -> dict[str, Any]: """평일 16:30 KST 호출용. target_date <= today (KST) 인 미해결 행 일괄 매칭.""" from datetime import datetime, timezone, timedelta as td kst = timezone(td(hours=9)) today = datetime.now(kst).date() summary = match_up_to(today) return { "today": str(today), "summary": summary.__dict__, } if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") out = match_today() print(json.dumps(out, ensure_ascii=False, indent=2, default=str))