"""pykrx 기반 OHLCV / 외인-기관 거래대금 수집. 차단 위험을 줄이려고 호출 사이에 짧은 슬립을 둔다. 5년치는 종목당 약 1~2초. """ from __future__ import annotations import logging import time from dataclasses import dataclass from datetime import date, datetime, timedelta from sqlalchemy import text from app.db.connection import get_engine logger = logging.getLogger(__name__) # pykrx 는 'YYYYMMDD' 형식을 받음 _PYKRX_DATE = "%Y%m%d" @dataclass class FetchResult: code: str source: str inserted: int updated: int error: str | None = None def status(self) -> str: if self.error: return "failed" return "ok" def _to_pykrx(d: date) -> str: return d.strftime(_PYKRX_DATE) def fetch_ohlcv_daily(code: str, start: date, end: date, *, sleep_s: float = 0.2) -> FetchResult: """일봉 OHLCV 를 가져와 ohlcv_daily 에 upsert.""" try: from pykrx import stock as krx df = krx.get_market_ohlcv(_to_pykrx(start), _to_pykrx(end), code) except Exception as exc: # noqa: BLE001 logger.exception("pykrx ohlcv failed code=%s", code) return FetchResult(code, "pykrx_ohlcv", 0, 0, error=str(exc)) if df is None or df.empty: return FetchResult(code, "pykrx_ohlcv", 0, 0, error="empty") engine = get_engine() inserted = updated = 0 with engine.begin() as conn: for d, row in df.iterrows(): try: day = d.date() if hasattr(d, "date") else date.fromisoformat(str(d)[:10]) except Exception: # noqa: BLE001 continue res = conn.execute( text( """ INSERT INTO ohlcv_daily (code, date, open, high, low, close, volume) VALUES (:code, :date, :open, :high, :low, :close, :volume) ON CONFLICT (code, date) DO UPDATE SET open=EXCLUDED.open, high=EXCLUDED.high, low=EXCLUDED.low, close=EXCLUDED.close, volume=EXCLUDED.volume RETURNING (xmax = 0) AS inserted """ ), { "code": code, "date": day, "open": float(row.get("시가", 0) or 0), "high": float(row.get("고가", 0) or 0), "low": float(row.get("저가", 0) or 0), "close": float(row.get("종가", 0) or 0), "volume": int(row.get("거래량", 0) or 0), }, ) r = res.first() if r and r[0]: inserted += 1 else: updated += 1 time.sleep(sleep_s) return FetchResult(code, "pykrx_ohlcv", inserted, updated) def fetch_trading_value(code: str, start: date, end: date, *, sleep_s: float = 0.3) -> FetchResult: """외인/기관/개인 순매수 거래대금.""" try: from pykrx import stock as krx df = krx.get_market_trading_value_by_date( _to_pykrx(start), _to_pykrx(end), code, ) except Exception as exc: # noqa: BLE001 logger.exception("pykrx trading_value failed code=%s", code) return FetchResult(code, "pykrx_trading_value", 0, 0, error=str(exc)) if df is None or df.empty: return FetchResult(code, "pykrx_trading_value", 0, 0, error="empty") # 컬럼명: 외국인합계 / 기관합계 / 개인 (pykrx 1.0.x 기준) col_foreign = next((c for c in df.columns if "외국인" in c), None) col_inst = next((c for c in df.columns if "기관" in c), None) col_indiv = next((c for c in df.columns if "개인" in c), None) engine = get_engine() inserted = updated = 0 with engine.begin() as conn: for d, row in df.iterrows(): try: day = d.date() if hasattr(d, "date") else date.fromisoformat(str(d)[:10]) except Exception: # noqa: BLE001 continue res = conn.execute( text( """ INSERT INTO trading_value_daily (code, date, foreign_net, institution_net, individual_net) VALUES (:code, :date, :f, :i, :p) ON CONFLICT (code, date) DO UPDATE SET foreign_net=EXCLUDED.foreign_net, institution_net=EXCLUDED.institution_net, individual_net=EXCLUDED.individual_net RETURNING (xmax = 0) AS inserted """ ), { "code": code, "date": day, "f": float(row.get(col_foreign, 0) or 0) if col_foreign else 0.0, "i": float(row.get(col_inst, 0) or 0) if col_inst else 0.0, "p": float(row.get(col_indiv, 0) or 0) if col_indiv else 0.0, }, ) r = res.first() if r and r[0]: inserted += 1 else: updated += 1 time.sleep(sleep_s) return FetchResult(code, "pykrx_trading_value", inserted, updated) def backfill_code(code: str, *, years: int = 5) -> list[FetchResult]: end = date.today() start = end - timedelta(days=365 * years) return [ fetch_ohlcv_daily(code, start, end), fetch_trading_value(code, start, end), ]