Files
claude-owner e0edc8f1e3 feat: 예측 실패 원인 노출 + /health/models 진단 + restart-ci.bat
사금향님이 만난 409 'both chronos & lgbm failed' 에러가 원인을 안 보여줘서
디버깅 어려웠음. 세 군데 보강:

1. ensemble.py: 두 모델 다 실패 시 chronos/lgbm 각각의 실제 에러 원문
   (type:message) 을 RuntimeError 메시지에 포함. predict.py 가 409 detail
   로 그대로 노출하므로 브라우저에서 바로 원인 확인 가능. LGBM 가 None
   반환 (체크포인트 없음) 인 경우도 'model checkpoint not found' 로 명시.

2. /health/models 엔드포인트 추가:
   - chronos.ping() — lazy load 시도 + 디바이스/모델명 반환
   - LGBM_MODEL_DIR 의 *.pkl 개수와 샘플 8개 파일명 반환. cold start
     (체크포인트 0개) 면 'no_checkpoints' 상태로 알림.

3. restart-ci.bat 추가 — restart.bat 에서 pause 빼고 종료 코드로만 알리는
   SSH 비대화형 친화 버전. 일반 사용은 그대로 restart.bat.
2026-05-23 15:42:44 +09:00

194 lines
6.9 KiB
Python

"""Chronos + LGBM 앙상블 추론.
final_price[h] = w_c * chronos.median[h-1] + w_l * lgbm.predicted_close
final_q10[h] = w_c * chronos.q10[h-1] + w_l * lgbm.predicted_close * 0.97
final_q90[h] = w_c * chronos.q90[h-1] + w_l * lgbm.predicted_close * 1.03
LGBM 은 단일 horizon 의 다음 종가(point) 만 주므로, 그 자체로는 신뢰구간이 없음.
근사로 ±3% band 를 LGBM 의 q10/q90 자리에 사용. Chronos 의 sample 분포가
주된 신뢰구간 정보 (Chronos 우세하면 ci 가 좁아짐).
direction 확률:
- LGBM 분류기에서 prob_up/flat/down (3-class) 그대로
- Chronos 는 next-day return 부호 비율: samples.shift1 / base_close - 1 의 부호
- 둘을 같은 가중치로 평균
LGBM 모델이 없으면 Chronos 단독으로 진행 (cold start).
Chronos 도 실패하면 LGBM 단독으로 진행.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
import numpy as np
from app.models.chronos import ChronosForecast
from app.models.chronos import forecast as chronos_forecast
from app.models.lgbm import LgbmForecast
from app.models.lgbm import predict_one as lgbm_predict
from app.models.weights import load_weights
logger = logging.getLogger(__name__)
@dataclass
class EnsembleStep:
horizon: int # 1..H 거래일 후
target_idx: int # chronos median 의 0-based 인덱스 (horizon-1)
point_close: float
ci_low: float
ci_high: float
prob_up: float
prob_flat: float
prob_down: float
direction: str # 'up' / 'flat' / 'down'
expected_return: float
@dataclass
class EnsemblePrediction:
code: str
base_close: float
horizons: list[int]
steps: list[EnsembleStep]
sources_used: list[str]
# shadow 저장용 원본 출력 (predict_one.py 가 ensemble + chronos 단독 + lgbm 단독
# 3 종을 predictions 에 적재해서 retrain_weekly 가 모델별 hit_rate 비교 가능하게 함).
chronos_raw: ChronosForecast | None = None
lgbm_raw: dict[int, LgbmForecast] = field(default_factory=dict)
def _chronos_direction(samples: list[list[float]], base_close: float, horizon: int) -> tuple[float, float, float]:
"""Chronos sample 분포에서 (prob_up, prob_flat, prob_down). ±0.3% flat band."""
if not samples:
return 0.33, 0.34, 0.33
arr = np.array(samples)[:, horizon - 1] # 해당 step 의 sample 값
ret = arr / base_close - 1.0
p_up = float((ret > 0.003).mean())
p_dn = float((ret < -0.003).mean())
p_fl = 1.0 - p_up - p_dn
return p_up, p_fl, p_dn
def predict(code: str, *, horizons: tuple[int, ...] = (1, 3, 5)) -> EnsemblePrediction:
"""한 종목에 대해 horizons 별 앙상블 예측. on-demand 추론용."""
max_h = max(horizons)
# Chronos: 종가 시계열 가져와서 max_h 까지 예측.
from app.models.features import build_features # local import
ff = build_features(code, lookback_days=400, horizons=horizons, with_targets=False)
df = ff.df
if df.empty:
raise RuntimeError(f"no OHLCV data for {code}")
closes = df["close"].astype(float).tolist()
base_close = float(closes[-1])
sources_used: list[str] = []
cf: ChronosForecast | None = None
chronos_err: str | None = None
try:
cf = chronos_forecast(closes, horizon=max_h, num_samples=30)
sources_used.append("chronos")
except Exception as exc: # noqa: BLE001
chronos_err = f"{type(exc).__name__}: {exc}"
logger.warning("chronos forecast failed for %s: %s", code, chronos_err)
steps: list[EnsembleStep] = []
lgbm_raw: dict[int, LgbmForecast] = {}
for h in horizons:
lf: LgbmForecast | None = None
lgbm_err: str | None = None
try:
lf = lgbm_predict(code, h)
if lf is not None:
sources_used.append(f"lgbm_h{h}")
lgbm_raw[h] = lf
else:
# predict_one 이 None 반환 = 체크포인트 파일 없음 (cold start).
lgbm_err = "model checkpoint not found (run retrain_weekly)"
except Exception as exc: # noqa: BLE001
lgbm_err = f"{type(exc).__name__}: {exc}"
logger.warning("lgbm predict failed for %s h=%d: %s", code, h, lgbm_err)
# 가중치 (DB 없으면 default 0.6/0.4).
w = load_weights(code, h)
wc, wl = w.w_chronos, w.w_lgbm
# 한쪽이 없으면 다른 쪽 전부.
if cf is None and lf is None:
# 사용자가 브라우저에서 바로 원인을 보게 두 에러를 그대로 노출.
raise RuntimeError(
f"both chronos & lgbm failed for {code} h={h}; "
f"chronos={chronos_err or 'unknown'}; lgbm={lgbm_err or 'unknown'}"
)
if cf is None:
wc, wl = 0.0, 1.0
if lf is None:
wc, wl = 1.0, 0.0
if cf is not None:
c_med = cf.median[h - 1]
c_q10 = cf.q10[h - 1]
c_q90 = cf.q90[h - 1]
else:
c_med = c_q10 = c_q90 = base_close # not used (wc=0)
if lf is not None:
l_close = lf.predicted_close
l_lo = l_close * 0.97
l_hi = l_close * 1.03
l_pu, l_pf, l_pd = lf.prob_up, lf.prob_flat, lf.prob_down
else:
l_close = l_lo = l_hi = base_close
l_pu = l_pf = l_pd = 0.0
point = wc * c_med + wl * l_close
lo = wc * c_q10 + wl * l_lo
hi = wc * c_q90 + wl * l_hi
if cf is not None:
cp_up, cp_fl, cp_dn = _chronos_direction(cf.samples, base_close, h)
else:
cp_up = cp_fl = cp_dn = 0.0
# direction prob: source 마다 weights 동일하게 가중평균
if lf is not None and cf is not None:
p_up = 0.5 * cp_up + 0.5 * l_pu
p_fl = 0.5 * cp_fl + 0.5 * l_pf
p_dn = 0.5 * cp_dn + 0.5 * l_pd
elif cf is not None:
p_up, p_fl, p_dn = cp_up, cp_fl, cp_dn
else:
p_up, p_fl, p_dn = l_pu, l_pf, l_pd
# 정규화 (혹시 합이 0 가 아닐 때)
s = max(p_up + p_fl + p_dn, 1e-9)
p_up, p_fl, p_dn = p_up / s, p_fl / s, p_dn / s
dir_lbl = "up" if p_up >= max(p_fl, p_dn) else ("down" if p_dn >= p_fl else "flat")
steps.append(
EnsembleStep(
horizon=h,
target_idx=h - 1,
point_close=float(point),
ci_low=float(lo),
ci_high=float(hi),
prob_up=float(p_up),
prob_flat=float(p_fl),
prob_down=float(p_dn),
direction=dir_lbl,
expected_return=float(point / base_close - 1.0),
)
)
return EnsemblePrediction(
code=code,
base_close=base_close,
horizons=list(horizons),
steps=steps,
sources_used=sources_used,
chronos_raw=cf,
lgbm_raw=lgbm_raw,
)