feat(phase-1a): external data fetchers + refresh pipeline + scheduler

10종목 시드 + pykrx OHLCV / 외인·기관 거래대금, KIS read-only EOD, OpenDART
공시, 네이버 금융 뉴스 스크레이퍼, 구글 뉴스 RSS, yfinance 거시(KOSPI/KOSDAQ/
USDKRW/US10Y) fetcher 를 추가하고 refresh_one / daily_batch / backfill /
APScheduler(16:00 KST) 파이프라인으로 묶음.

- backend/app/seed: 10종목 시드 (대형/고변동/테마/플랫폼/방어)
- backend/app/fetch: pykrx, kis, dart, news, macro, symbols_seed
- backend/app/pipelines: refresh_one, daily_batch, backfill(CLI), scheduler
- backend/app/api/refresh.py: POST /api/refresh/{code}?lookback_days=N
- backend/app/main.py: lifespan 으로 스케줄러 기동/종료, /health/keys 추가
- README: .env 복사 안내 보강

스모크 테스트 (실제 키 사용) 결과:
  KIS token  : ok (token 346자 발급)
  KIS daily  : 005930 11rows
  DART list  : 005930 30일 10건
  Naver news : 005930 12건
  Google RSS : "삼성전자" 92건

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
tkrmagid
2026-05-20 15:43:18 +09:00
parent cacddf5adf
commit 56f73a1f12
15 changed files with 1203 additions and 7 deletions

View File

@@ -0,0 +1,93 @@
"""5년치 백필 CLI.
사용 예:
# 한 종목, 명시 기간
python -m app.pipelines.backfill --code 005930 --from 2020-01-01 --to 2025-12-31
# 시드 10종목 일괄, 기본 5년
python -m app.pipelines.backfill --all-seed --years 5
# 시드 + 거시 지표
python -m app.pipelines.backfill --all-seed --include-macro
"""
from __future__ import annotations
import argparse
import json
import logging
from datetime import date, timedelta
from app.fetch import macro as macro_mod
from app.fetch import pykrx_helper
from app.fetch import symbols_seed
from app.seed.seed_tickers import SEED_TICKERS
logger = logging.getLogger(__name__)
def _parse_date(s: str) -> date:
return date.fromisoformat(s)
def cmd_backfill_code(code: str, start: date, end: date) -> dict:
res_ohlcv = pykrx_helper.fetch_ohlcv_daily(code, start, end)
res_trade = pykrx_helper.fetch_trading_value(code, start, end)
return {
"code": code,
"pykrx_ohlcv": {"status": res_ohlcv.status(), "inserted": res_ohlcv.inserted,
"updated": res_ohlcv.updated, "error": res_ohlcv.error},
"pykrx_trading_value": {"status": res_trade.status(), "inserted": res_trade.inserted,
"updated": res_trade.updated, "error": res_trade.error},
}
def main(argv: list[str] | None = None) -> int:
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
p = argparse.ArgumentParser(description="pykrx 5년치 백필")
p.add_argument("--code", help="단일 종목 코드 (예: 005930)")
p.add_argument("--from", dest="start", help="시작일 YYYY-MM-DD")
p.add_argument("--to", dest="end", help="종료일 YYYY-MM-DD")
p.add_argument("--years", type=int, default=5, help="명시 기간 없을 때 최근 N년 (기본 5)")
p.add_argument("--all-seed", action="store_true", help="시드 10종목 일괄")
p.add_argument("--seed-symbols-table", action="store_true",
help="KRX 전 종목 symbols 테이블에 시드 (검색 UX용)")
p.add_argument("--include-macro", action="store_true", help="거시 지표 (KOSPI/USDKRW/^TNX) 동시 적재")
args = p.parse_args(argv)
end = _parse_date(args.end) if args.end else date.today()
start = _parse_date(args.start) if args.start else (end - timedelta(days=365 * args.years))
summary: dict = {}
if args.seed_symbols_table:
rep = symbols_seed.seed_symbols()
summary["symbols_seed"] = {
"inserted": rep.inserted,
"updated": rep.updated,
"seed_marked": rep.seed_marked,
"markets": rep.markets,
}
if args.code:
summary["backfill"] = [cmd_backfill_code(args.code, start, end)]
elif args.all_seed:
results = []
for t in SEED_TICKERS:
logger.info("backfill %s %s..%s", t.code, start, end)
results.append(cmd_backfill_code(t.code, start, end))
summary["backfill"] = results
if args.include_macro:
macros = macro_mod.fetch_macro_daily(years=args.years)
summary["macro"] = [
{"key": m.key, "status": m.status(), "inserted": m.inserted,
"updated": m.updated, "error": m.error}
for m in macros
]
print(json.dumps(summary, ensure_ascii=False, indent=2, default=str))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,46 @@
"""일별 배치: 16:00 KST 에 시드 10종목 + 거시 + 뉴스 + DART 갱신.
수동 실행:
python -m app.pipelines.daily_batch
"""
from __future__ import annotations
import json
import logging
import time
from typing import Any
from app.fetch import macro as macro_mod
from app.pipelines.refresh_one import refresh_code
from app.seed.seed_tickers import SEED_TICKERS
logger = logging.getLogger(__name__)
def run_daily_batch() -> dict[str, Any]:
start_ts = time.time()
reports: list[dict[str, Any]] = []
for t in SEED_TICKERS:
logger.info("daily_batch refresh %s %s", t.code, t.name)
rep = refresh_code(t.code, t.name, lookback_days=7)
reports.append(rep.to_dict())
macros = macro_mod.fetch_macro_daily(years=1)
macro_summary = [
{"key": m.key, "status": m.status(), "inserted": m.inserted,
"updated": m.updated, "error": m.error}
for m in macros
]
elapsed = time.time() - start_ts
return {
"duration_seconds": round(elapsed, 2),
"tickers": reports,
"macro": macro_summary,
}
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
out = run_daily_batch()
print(json.dumps(out, ensure_ascii=False, indent=2, default=str))

View File

@@ -0,0 +1,147 @@
"""한 종목에 대해 모든 소스를 갱신 + 구조화된 status 리턴.
POST /api/refresh/{code} 와 daily_batch 둘 다 이 함수를 호출.
"""
from __future__ import annotations
import logging
from dataclasses import asdict, dataclass, field
from datetime import date, timedelta
from typing import Any
from app.fetch import dart as dart_mod
from app.fetch import kis as kis_mod
from app.fetch import news as news_mod
from app.fetch import pykrx_helper
logger = logging.getLogger(__name__)
@dataclass
class SourceStatus:
status: str # 'ok' / 'skipped_missing_key' / 'failed'
inserted: int = 0
updated: int = 0
skipped: int = 0
extra: dict[str, Any] = field(default_factory=dict)
error: str | None = None
@dataclass
class RefreshReport:
code: str
pykrx_ohlcv: SourceStatus
pykrx_trading_value: SourceStatus
kis_daily: SourceStatus
dart: SourceStatus
naver_news: SourceStatus
google_rss: SourceStatus
def to_dict(self) -> dict[str, Any]:
out: dict[str, Any] = {"code": self.code}
for f in (
"pykrx_ohlcv",
"pykrx_trading_value",
"kis_daily",
"dart",
"naver_news",
"google_rss",
):
v: SourceStatus = getattr(self, f)
out[f] = asdict(v)
return out
def _pykrx_ohlcv(code: str, start: date, end: date) -> SourceStatus:
try:
res = pykrx_helper.fetch_ohlcv_daily(code, start, end)
return SourceStatus(
status=res.status(),
inserted=res.inserted,
updated=res.updated,
error=res.error,
)
except Exception as exc: # noqa: BLE001
return SourceStatus(status="failed", error=str(exc))
def _pykrx_trading(code: str, start: date, end: date) -> SourceStatus:
try:
res = pykrx_helper.fetch_trading_value(code, start, end)
return SourceStatus(
status=res.status(),
inserted=res.inserted,
updated=res.updated,
error=res.error,
)
except Exception as exc: # noqa: BLE001
return SourceStatus(status="failed", error=str(exc))
def _kis(code: str, start: date, end: date) -> SourceStatus:
"""KIS read-only EOD. 실제 DB 적재는 하지 않고 sanity 호출 + sample row 수만 리포트.
pykrx 와 중복 데이터이므로 KIS 는 백업/실시간 용도이고, 일별 적재는 pykrx 가 1차.
"""
try:
rows = kis_mod.fetch_daily_price(code, start, end)
return SourceStatus(status="ok", extra={"sample_rows": len(rows)})
except kis_mod.SkippedMissingKey:
return SourceStatus(status="skipped_missing_key")
except Exception as exc: # noqa: BLE001
return SourceStatus(status="failed", error=str(exc))
def _dart(code: str, start: date, end: date) -> SourceStatus:
try:
items = dart_mod.fetch_disclosures(code, start, end)
# 공시는 news 테이블에 upsert
news_items = [
news_mod.NewsItem(
code=d.code,
source="dart",
title=d.title,
url=d.url,
published_at=d.published_at,
)
for d in items
]
ins, skip = news_mod.upsert_news(news_items)
return SourceStatus(status="ok", inserted=ins, skipped=skip, extra={"fetched": len(items)})
except dart_mod.SkippedMissingKey:
return SourceStatus(status="skipped_missing_key")
except Exception as exc: # noqa: BLE001
return SourceStatus(status="failed", error=str(exc))
def _naver_news(code: str) -> SourceStatus:
try:
items = news_mod.fetch_naver_finance_news(code, max_pages=1)
ins, skip = news_mod.upsert_news(items)
return SourceStatus(status="ok", inserted=ins, skipped=skip, extra={"fetched": len(items)})
except Exception as exc: # noqa: BLE001
return SourceStatus(status="failed", error=str(exc))
def _google_rss(code: str, name: str) -> SourceStatus:
try:
query = name or code
items = news_mod.fetch_google_news_rss(query, code=code)
ins, skip = news_mod.upsert_news(items)
return SourceStatus(status="ok", inserted=ins, skipped=skip, extra={"fetched": len(items)})
except Exception as exc: # noqa: BLE001
return SourceStatus(status="failed", error=str(exc))
def refresh_code(code: str, name: str, *, lookback_days: int = 7) -> RefreshReport:
"""단기 갱신 (daily_batch 용). 최근 lookback_days 만 가져온다."""
end = date.today()
start = end - timedelta(days=lookback_days)
return RefreshReport(
code=code,
pykrx_ohlcv=_pykrx_ohlcv(code, start, end),
pykrx_trading_value=_pykrx_trading(code, start, end),
kis_daily=_kis(code, start, end),
dart=_dart(code, start, end),
naver_news=_naver_news(code),
google_rss=_google_rss(code, name),
)

View File

@@ -0,0 +1,46 @@
"""APScheduler 기반 백그라운드 잡.
- 16:00 KST : daily_batch
- (Phase 4) 16:30: 모델 추론, 02:00 일요일: 주간 재학습
FastAPI 기동 시점에 lifespan 으로 start, 종료 시 shutdown.
"""
from __future__ import annotations
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from pytz import timezone
from app.pipelines.daily_batch import run_daily_batch
logger = logging.getLogger(__name__)
KST = timezone("Asia/Seoul")
_scheduler: BackgroundScheduler | None = None
def start_scheduler() -> BackgroundScheduler:
global _scheduler
if _scheduler:
return _scheduler
_scheduler = BackgroundScheduler(timezone=KST)
_scheduler.add_job(
run_daily_batch,
CronTrigger(hour=16, minute=0, timezone=KST),
id="daily_batch_16",
replace_existing=True,
max_instances=1,
)
_scheduler.start()
logger.info("scheduler started (daily_batch @ 16:00 KST)")
return _scheduler
def shutdown_scheduler() -> None:
global _scheduler
if _scheduler:
_scheduler.shutdown(wait=False)
_scheduler = None
logger.info("scheduler stopped")