Files
stock_chart_site/backend/app/pipelines/match_outcomes.py
tkrmagid 0af556396e fix(match): 주말/공휴일 이월 매칭 (target_date <= today + 최초 거래일 종가)
- match_for_date(d) → match_up_to(today) 로 시맨틱 변경: target_date == d
  대신 target_date <= today AND outcomes 미존재 전부 후보로
- 각 후보마다 ohlcv_daily 에서 target_date 이상 today 이하 범위의 최초
  거래일 행을 actual_close 로 매칭 → 주말/공휴일 자동 이월
- user_triggered 필터 제거: chronos/lgbm shadow 행도 함께 매칭됨
- prediction_outcomes.target_date 에는 실제 매칭된 거래일을 기록
- 하위 호환: match_for_date(d) 는 match_up_to(d) 별칭으로 유지

리뷰어 지적 2번 (공휴일/주말이면 target_date 일치 행이 영원히 미매칭) 해결.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-20 16:27:09 +09:00

174 lines
6.1 KiB
Python

"""prediction_outcomes 매칭 배치.
평일 16:30 KST 에 실행. 다음 거래일 장 종료 후 (KRX 정규장 마감 15:30) 의
확정 종가가 16:00~16:30 사이 pykrx 로 들어온 뒤, 매칭 미해결 예측을 실제
종가와 매칭한다.
이월/공휴일 정책:
target_date 가 calendar date 라서 비거래일이면 ohlcv_daily 에 행이 없다.
그래서 `target_date <= today` 인 미해결 행을 전부 후보로 잡고, 각 행마다
`target_date <= ohlcv_daily.date <= today` 범위의 최초 거래일 종가로
매칭한다 (=다음 거래일로 자동 이월).
shadow prediction 도 같은 방식으로 매칭한다 (user_triggered 필터 없음).
"""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass
from datetime import date, timedelta
from typing import Any
from sqlalchemy import text
from app.db.connection import get_engine
logger = logging.getLogger(__name__)
# direction_hit 판정 시 ±0.3% 이내는 flat. (features 의 FLAT_BAND 와 동일)
FLAT_BAND = 0.003
@dataclass
class MatchSummary:
today: str
candidates: int
matched: int
skipped_no_actual: int
already_resolved: int
def _direction_label(ret: float) -> str:
if ret > FLAT_BAND:
return "up"
if ret < -FLAT_BAND:
return "down"
return "flat"
def match_up_to(today: date) -> MatchSummary:
"""target_date <= today 인 모든 미해결 예측을 매칭.
각 행마다 ohlcv_daily 에서 target_date 이상, today 이하 범위의 최초
거래일 종가를 actual_close 로 사용 — 공휴일/주말 이월 자연 처리.
"""
eng = get_engine()
with eng.begin() as conn:
candidate_rows = conn.execute(
text(
"""
SELECT p.id, p.code, p.base_date, p.target_date, p.horizon,
p.point_forecast, p.direction, p.model
FROM predictions p
LEFT JOIN prediction_outcomes po ON po.prediction_id = p.id
WHERE p.target_date <= :today
AND po.prediction_id IS NULL
"""
),
{"today": today},
).all()
candidates = len(candidate_rows)
if not candidates:
return MatchSummary(str(today), 0, 0, 0, 0)
matched = 0
skipped = 0
already = 0
for pid, code, base_date, target_date, horizon, point_forecast, pred_dir, model in candidate_rows:
# 첫 거래일 종가 (target_date <= date <= today)
actual_row = conn.execute(
text(
"""
SELECT date, close FROM ohlcv_daily
WHERE code = :c AND date >= :td AND date <= :today
ORDER BY date ASC
LIMIT 1
"""
),
{"c": code, "td": target_date, "today": today},
).first()
if not actual_row or actual_row[1] is None:
skipped += 1
continue
actual_date = actual_row[0]
actual = float(actual_row[1])
base_close_row = conn.execute(
text("SELECT close FROM ohlcv_daily WHERE code = :c AND date = :d"),
{"c": code, "d": base_date},
).first()
if not base_close_row or base_close_row[0] is None:
skipped += 1
continue
base_close = float(base_close_row[0])
actual_ret = actual / base_close - 1.0
actual_dir = _direction_label(actual_ret)
dir_hit = (pred_dir == actual_dir)
abs_err = abs(float(point_forecast) - actual) if point_forecast is not None else None
try:
conn.execute(
text(
"""
INSERT INTO prediction_outcomes
(prediction_id, code, target_date, horizon, model,
predicted_close, actual_close, actual_return, direction_hit, abs_error)
VALUES
(:pid, :code, :d, :h, :m, :pc, :ac, :ar, :dh, :ae)
ON CONFLICT (prediction_id) DO NOTHING
"""
),
{
"pid": pid,
"code": code,
# 실제 매칭된 거래일 (이월된 경우 target_date 와 다를 수 있음)
"d": actual_date,
"h": horizon,
"m": model,
"pc": float(point_forecast) if point_forecast is not None else None,
"ac": actual,
"ar": float(actual_ret),
"dh": bool(dir_hit),
"ae": float(abs_err) if abs_err is not None else None,
},
)
matched += 1
except Exception as exc: # noqa: BLE001
logger.warning("match insert failed pid=%s: %s", pid, exc)
already += 1
return MatchSummary(
today=str(today),
candidates=candidates,
matched=matched,
skipped_no_actual=skipped,
already_resolved=already,
)
# 하위 호환 alias — 이전 시그니처를 쓰던 호출자 (예: 단일 날짜 매칭 테스트)
def match_for_date(d: date) -> MatchSummary:
"""legacy: target_date == d 만 매칭하던 동작 → 이제 target_date <= d 전체 처리."""
return match_up_to(d)
def match_today() -> dict[str, Any]:
"""평일 16:30 KST 호출용. target_date <= today (KST) 인 미해결 행 일괄 매칭."""
from datetime import datetime, timezone, timedelta as td
kst = timezone(td(hours=9))
today = datetime.now(kst).date()
summary = match_up_to(today)
return {
"today": str(today),
"summary": summary.__dict__,
}
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
out = match_today()
print(json.dumps(out, ensure_ascii=False, indent=2, default=str))