Files
stock_chart_site/backend/app/fetch/pykrx_helper.py
tkrmagid 56f73a1f12 feat(phase-1a): external data fetchers + refresh pipeline + scheduler
10종목 시드 + pykrx OHLCV / 외인·기관 거래대금, KIS read-only EOD, OpenDART
공시, 네이버 금융 뉴스 스크레이퍼, 구글 뉴스 RSS, yfinance 거시(KOSPI/KOSDAQ/
USDKRW/US10Y) fetcher 를 추가하고 refresh_one / daily_batch / backfill /
APScheduler(16:00 KST) 파이프라인으로 묶음.

- backend/app/seed: 10종목 시드 (대형/고변동/테마/플랫폼/방어)
- backend/app/fetch: pykrx, kis, dart, news, macro, symbols_seed
- backend/app/pipelines: refresh_one, daily_batch, backfill(CLI), scheduler
- backend/app/api/refresh.py: POST /api/refresh/{code}?lookback_days=N
- backend/app/main.py: lifespan 으로 스케줄러 기동/종료, /health/keys 추가
- README: .env 복사 안내 보강

스모크 테스트 (실제 키 사용) 결과:
  KIS token  : ok (token 346자 발급)
  KIS daily  : 005930 11rows
  DART list  : 005930 30일 10건
  Naver news : 005930 12건
  Google RSS : "삼성전자" 92건

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-20 15:43:18 +09:00

151 lines
5.4 KiB
Python

"""pykrx 기반 OHLCV / 외인-기관 거래대금 수집.
차단 위험을 줄이려고 호출 사이에 짧은 슬립을 둔다. 5년치는 종목당 약 1~2초.
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from sqlalchemy import text
from app.db.connection import get_engine
logger = logging.getLogger(__name__)
# pykrx 는 'YYYYMMDD' 형식을 받음
_PYKRX_DATE = "%Y%m%d"
@dataclass
class FetchResult:
code: str
source: str
inserted: int
updated: int
error: str | None = None
def status(self) -> str:
if self.error:
return "failed"
return "ok"
def _to_pykrx(d: date) -> str:
return d.strftime(_PYKRX_DATE)
def fetch_ohlcv_daily(code: str, start: date, end: date, *, sleep_s: float = 0.2) -> FetchResult:
"""일봉 OHLCV 를 가져와 ohlcv_daily 에 upsert."""
try:
from pykrx import stock as krx
df = krx.get_market_ohlcv(_to_pykrx(start), _to_pykrx(end), code)
except Exception as exc: # noqa: BLE001
logger.exception("pykrx ohlcv failed code=%s", code)
return FetchResult(code, "pykrx_ohlcv", 0, 0, error=str(exc))
if df is None or df.empty:
return FetchResult(code, "pykrx_ohlcv", 0, 0, error="empty")
engine = get_engine()
inserted = updated = 0
with engine.begin() as conn:
for d, row in df.iterrows():
try:
day = d.date() if hasattr(d, "date") else date.fromisoformat(str(d)[:10])
except Exception: # noqa: BLE001
continue
res = conn.execute(
text(
"""
INSERT INTO ohlcv_daily (code, date, open, high, low, close, volume)
VALUES (:code, :date, :open, :high, :low, :close, :volume)
ON CONFLICT (code, date) DO UPDATE
SET open=EXCLUDED.open, high=EXCLUDED.high, low=EXCLUDED.low,
close=EXCLUDED.close, volume=EXCLUDED.volume
RETURNING (xmax = 0) AS inserted
"""
),
{
"code": code,
"date": day,
"open": float(row.get("시가", 0) or 0),
"high": float(row.get("고가", 0) or 0),
"low": float(row.get("저가", 0) or 0),
"close": float(row.get("종가", 0) or 0),
"volume": int(row.get("거래량", 0) or 0),
},
)
r = res.first()
if r and r[0]:
inserted += 1
else:
updated += 1
time.sleep(sleep_s)
return FetchResult(code, "pykrx_ohlcv", inserted, updated)
def fetch_trading_value(code: str, start: date, end: date, *, sleep_s: float = 0.3) -> FetchResult:
"""외인/기관/개인 순매수 거래대금."""
try:
from pykrx import stock as krx
df = krx.get_market_trading_value_by_date(
_to_pykrx(start), _to_pykrx(end), code,
)
except Exception as exc: # noqa: BLE001
logger.exception("pykrx trading_value failed code=%s", code)
return FetchResult(code, "pykrx_trading_value", 0, 0, error=str(exc))
if df is None or df.empty:
return FetchResult(code, "pykrx_trading_value", 0, 0, error="empty")
# 컬럼명: 외국인합계 / 기관합계 / 개인 (pykrx 1.0.x 기준)
col_foreign = next((c for c in df.columns if "외국인" in c), None)
col_inst = next((c for c in df.columns if "기관" in c), None)
col_indiv = next((c for c in df.columns if "개인" in c), None)
engine = get_engine()
inserted = updated = 0
with engine.begin() as conn:
for d, row in df.iterrows():
try:
day = d.date() if hasattr(d, "date") else date.fromisoformat(str(d)[:10])
except Exception: # noqa: BLE001
continue
res = conn.execute(
text(
"""
INSERT INTO trading_value_daily (code, date, foreign_net, institution_net, individual_net)
VALUES (:code, :date, :f, :i, :p)
ON CONFLICT (code, date) DO UPDATE
SET foreign_net=EXCLUDED.foreign_net,
institution_net=EXCLUDED.institution_net,
individual_net=EXCLUDED.individual_net
RETURNING (xmax = 0) AS inserted
"""
),
{
"code": code,
"date": day,
"f": float(row.get(col_foreign, 0) or 0) if col_foreign else 0.0,
"i": float(row.get(col_inst, 0) or 0) if col_inst else 0.0,
"p": float(row.get(col_indiv, 0) or 0) if col_indiv else 0.0,
},
)
r = res.first()
if r and r[0]:
inserted += 1
else:
updated += 1
time.sleep(sleep_s)
return FetchResult(code, "pykrx_trading_value", inserted, updated)
def backfill_code(code: str, *, years: int = 5) -> list[FetchResult]:
end = date.today()
start = end - timedelta(days=365 * years)
return [
fetch_ohlcv_daily(code, start, end),
fetch_trading_value(code, start, end),
]