Files
tkrmagid bf4fb01146 feat(phase-4): LGBM 모델 + 앙상블 + 매칭/재학습 잡
- backend/app/models/lgbm.py: 종목 × horizon 별 LightGBM 회귀(y_ret_h)
  + 다중분류(y_dir_h, 3-class). joblib 으로 backend/data/models/{code}_h{H}_*.pkl
  저장. early_stopping(30). predict_one() 으로 최신 영업일 피처에 추론.
- backend/app/models/weights.py: ensemble_weights 테이블 IO,
  default w_chronos=0.6 / w_lgbm=0.4 (DB 행 없으면 fallback).
- backend/app/models/ensemble.py: Chronos sample 분포 + LGBM regression+cls
  결합. point/q10/q90 + prob_up/flat/down + direction 라벨. 한쪽 모델
  실패 시 다른 쪽 단독 fallback (cold start: chronos 단독).
- backend/app/pipelines/predict_one.py: predict_and_store(). 결과를
  predictions 테이블에 UPSERT, user_triggered 누적 OR. base_date = 마지막
  ohlcv 거래일, target_date = base_date + H 영업일(주말 스킵, 공휴일은
  매칭잡에서 자연 보정).
- backend/app/pipelines/match_outcomes.py: target_date == d 인
  user_triggered=TRUE 예측을 d 의 실제 종가와 매칭 → prediction_outcomes
  적재. direction_hit(±0.3% flat band) + abs_error. 실제 종가 없으면
  자연 skip.
- backend/app/pipelines/retrain_weekly.py: 시드 10종목 × H 재학습 +
  최근 30일 model_performance 적재.
- backend/app/db/migrations/003_ensemble_weights.sql: (code, horizon) →
  (w_chronos, w_lgbm, hit_rate_*, sample_count).
- backend/app/pipelines/scheduler.py:
    daily_batch    : 평일 16:00 KST
    match_outcomes : 평일 16:30 KST   ← 사용자가 확정한 매칭 시점
    retrain_weekly : 일요일 02:00 KST

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-20 16:03:01 +09:00

181 lines
5.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""LightGBM 회귀 + 분류 모델. 종목 × horizon 별 별도 저장.
- 회귀: target = y_ret_h{H}. 예측 후 base_close*(1+pred) 로 가격 환산.
- 분류: target = y_dir_h{H}{-1, 0, +1}. 3-class softmax 로 prob_up/flat/down.
저장 경로: backend/data/models/{code}_h{H}_reg.pkl, _cls.pkl (joblib).
"""
from __future__ import annotations
import logging
import os
from dataclasses import dataclass
from pathlib import Path
import joblib
import numpy as np
import pandas as pd
from app.models.features import build_features, feature_columns
logger = logging.getLogger(__name__)
MODEL_DIR = Path(os.environ.get("LGBM_MODEL_DIR", "/app/data/models"))
@dataclass
class LgbmForecast:
horizon: int
base_close: float
predicted_close: float
predicted_return: float
prob_up: float
prob_flat: float
prob_down: float
def _model_paths(code: str, horizon: int) -> tuple[Path, Path]:
MODEL_DIR.mkdir(parents=True, exist_ok=True)
return (
MODEL_DIR / f"{code}_h{horizon}_reg.pkl",
MODEL_DIR / f"{code}_h{horizon}_cls.pkl",
)
def _prepare_xy(code: str, horizon: int, lookback_days: int) -> tuple[pd.DataFrame, pd.Series, pd.Series, list[str]]:
ff = build_features(
code,
lookback_days=lookback_days,
horizons=(horizon,),
with_targets=True,
)
df = ff.df
if df.empty:
return df, pd.Series(dtype=float), pd.Series(dtype=int), []
y_ret_col = f"y_ret_h{horizon}"
y_dir_col = f"y_dir_h{horizon}"
# 타깃 NaN (마지막 H 행) 제거.
df = df.dropna(subset=[y_ret_col, y_dir_col])
feats = feature_columns(df)
if not feats:
return df, pd.Series(dtype=float), pd.Series(dtype=int), []
X = df[feats]
# LightGBM 은 NaN 자체 처리 가능.
y_ret = df[y_ret_col].astype(float)
y_dir = df[y_dir_col].astype(int)
return X, y_ret, y_dir, feats
def train_one(code: str, horizon: int, *, lookback_days: int = 365 * 3) -> dict:
"""1종목 × 1 horizon 학습. 저장된 모델 파일 경로 + 샘플 수 반환."""
import lightgbm as lgb
X, y_ret, y_dir, feats = _prepare_xy(code, horizon, lookback_days)
if X.empty or len(X) < 100:
return {"code": code, "horizon": horizon, "status": "skipped_too_few_rows", "n_rows": int(len(X))}
reg_params = dict(
objective="regression",
learning_rate=0.05,
num_leaves=31,
min_data_in_leaf=20,
feature_fraction=0.85,
bagging_fraction=0.8,
bagging_freq=5,
verbose=-1,
)
cls_params = dict(
objective="multiclass",
num_class=3,
learning_rate=0.05,
num_leaves=31,
min_data_in_leaf=20,
feature_fraction=0.85,
bagging_fraction=0.8,
bagging_freq=5,
verbose=-1,
)
# 분류는 -1/0/1 → 0/1/2 인덱스로 매핑.
y_dir_idx = (y_dir + 1).astype(int)
n = len(X)
split = int(n * 0.85)
X_tr, X_val = X.iloc[:split], X.iloc[split:]
yr_tr, yr_val = y_ret.iloc[:split], y_ret.iloc[split:]
yc_tr, yc_val = y_dir_idx.iloc[:split], y_dir_idx.iloc[split:]
reg_train = lgb.Dataset(X_tr, label=yr_tr)
reg_valid = lgb.Dataset(X_val, label=yr_val, reference=reg_train)
reg_model = lgb.train(
reg_params,
reg_train,
num_boost_round=400,
valid_sets=[reg_valid],
callbacks=[lgb.early_stopping(stopping_rounds=30, verbose=False)],
)
cls_train = lgb.Dataset(X_tr, label=yc_tr)
cls_valid = lgb.Dataset(X_val, label=yc_val, reference=cls_train)
cls_model = lgb.train(
cls_params,
cls_train,
num_boost_round=400,
valid_sets=[cls_valid],
callbacks=[lgb.early_stopping(stopping_rounds=30, verbose=False)],
)
reg_path, cls_path = _model_paths(code, horizon)
joblib.dump({"model": reg_model, "features": feats}, reg_path)
joblib.dump({"model": cls_model, "features": feats}, cls_path)
return {
"code": code,
"horizon": horizon,
"status": "ok",
"n_rows": int(len(X)),
"reg_best_iter": int(reg_model.best_iteration or 0),
"cls_best_iter": int(cls_model.best_iteration or 0),
"reg_path": str(reg_path),
"cls_path": str(cls_path),
}
def predict_one(code: str, horizon: int, *, lookback_days: int = 400) -> LgbmForecast | None:
"""1종목 × 1 horizon 추론. 모델 없으면 None.
가장 최신 영업일 피처를 사용. base_close 는 그 행의 close.
"""
reg_path, cls_path = _model_paths(code, horizon)
if not reg_path.exists() or not cls_path.exists():
return None
reg_blob = joblib.load(reg_path)
cls_blob = joblib.load(cls_path)
feats_reg = reg_blob["features"]
feats_cls = cls_blob["features"]
reg_model = reg_blob["model"]
cls_model = cls_blob["model"]
ff = build_features(code, lookback_days=lookback_days, horizons=(horizon,), with_targets=False)
df = ff.df
if df.empty:
return None
last = df.iloc[[-1]]
base_close = float(last["close"].iloc[0])
# 피처 정렬 (모델이 학습 당시 본 컬럼 순서대로).
X_reg = last.reindex(columns=feats_reg).fillna(value=np.nan)
X_cls = last.reindex(columns=feats_cls).fillna(value=np.nan)
pred_ret = float(reg_model.predict(X_reg)[0])
probs = cls_model.predict(X_cls)[0]
# 인덱스 0=-1(down), 1=0(flat), 2=+1(up)
prob_down, prob_flat, prob_up = float(probs[0]), float(probs[1]), float(probs[2])
return LgbmForecast(
horizon=horizon,
base_close=base_close,
predicted_close=base_close * (1.0 + pred_ret),
predicted_return=pred_ret,
prob_up=prob_up,
prob_flat=prob_flat,
prob_down=prob_down,
)