diff --git a/README.md b/README.md index 770d12d..67b449f 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,12 @@ docker compose down ## 빌드 모드 (수동) +`build.bat`을 안 쓰는 경우 `.env`가 먼저 있어야 합니다. + ```bash +# .env 없으면 한 번만 복사 +copy .env.example .env + # GPU 모드 (RTX 3070 Ti 등 NVIDIA GPU 사용) docker compose -f docker-compose.yml -f docker-compose.gpu.yml up -d --build diff --git a/backend/app/api/refresh.py b/backend/app/api/refresh.py new file mode 100644 index 0000000..553e14e --- /dev/null +++ b/backend/app/api/refresh.py @@ -0,0 +1,35 @@ +"""검증용 수동 갱신 API. + +POST /api/refresh/{code} + body: 없음 + query: ?lookback_days=7 (기본) + resp: refresh_one.RefreshReport.to_dict() +""" +from __future__ import annotations + +from fastapi import APIRouter, HTTPException, Query +from sqlalchemy import text + +from app.db.connection import get_engine +from app.pipelines.refresh_one import refresh_code + +router = APIRouter(prefix="/api", tags=["refresh"]) + + +def _resolve_name(code: str) -> str | None: + eng = get_engine() + with eng.connect() as conn: + row = conn.execute(text("SELECT name FROM symbols WHERE code = :code"), {"code": code}).first() + return row[0] if row else None + + +@router.post("/refresh/{code}") +def refresh_endpoint( + code: str, + lookback_days: int = Query(default=7, ge=1, le=365), +) -> dict: + name = _resolve_name(code) + if not name: + raise HTTPException(status_code=404, detail=f"unknown code: {code} (symbols 테이블에 없음. 시드 필요)") + report = refresh_code(code, name, lookback_days=lookback_days) + return report.to_dict() diff --git a/backend/app/fetch/dart.py b/backend/app/fetch/dart.py new file mode 100644 index 0000000..bbbf309 --- /dev/null +++ b/backend/app/fetch/dart.py @@ -0,0 +1,124 @@ +"""OpenDART 공시 본문 수집. + +키 없으면 SkippedMissingKey, 호출 측에서 skipped_missing_key 로 매핑. + +사용 endpoint: + - GET https://opendart.fss.or.kr/api/list.json : 공시 목록 (기간/종목별) + - GET https://opendart.fss.or.kr/api/document.xml : 공시 원문 (zip in xml wrapper) + +여기서는 list.json만 받아서 title + url 만 저장. 본문 다운로드는 후속 단계에서 +선택적으로 추가 (용량/속도 트레이드오프). +""" +from __future__ import annotations + +import logging +from dataclasses import dataclass +from datetime import date, datetime +from typing import Any + +import httpx +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential + +from app.config import settings + +logger = logging.getLogger(__name__) + +DART_BASE = "https://opendart.fss.or.kr/api" +USER_AGENT = "stock_chart_site/0.1 (+personal)" + + +class SkippedMissingKey(RuntimeError): + pass + + +@dataclass +class Disclosure: + code: str # corp 종목코드 (있을 경우) + title: str + url: str + published_at: datetime + + +def _has_key() -> bool: + return bool(settings.dart_api_key) + + +def _stock_code_to_corp_code_cache() -> dict[str, str]: + """종목코드(6자리) -> corp_code(8자리) 매핑. + + 실제 운영에서는 corpCode.xml.zip 을 받아서 캐시해야 하지만, list.json 은 + 종목코드(stock_code)도 직접 받기 때문에 우선 빈 매핑으로 둔다. + """ + return {} + + +@retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=8), + retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)), + reraise=True, +) +def fetch_disclosures( + code: str, + start: date, + end: date | None = None, + *, + page_count: int = 100, +) -> list[Disclosure]: + """종목별 공시 목록 가져오기.""" + if not _has_key(): + raise SkippedMissingKey("dart api_key missing") + end = end or date.today() + params = { + "crtfc_key": settings.dart_api_key, + "bgn_de": start.strftime("%Y%m%d"), + "end_de": end.strftime("%Y%m%d"), + "page_count": str(page_count), + "stock_code": code, + } + with httpx.Client(timeout=15.0, headers={"User-Agent": USER_AGENT}) as cli: + resp = cli.get(f"{DART_BASE}/list.json", params=params) + resp.raise_for_status() + data = resp.json() + + status = data.get("status") + # status: '000' OK, '013' no data, '020' rate limit, etc + if status == "013": + return [] + if status and status != "000": + raise RuntimeError(f"dart error: status={status} msg={data.get('message')}") + + out: list[Disclosure] = [] + for row in data.get("list", []) or []: + rcept_no = row.get("rcept_no") + if not rcept_no: + continue + title = (row.get("report_nm") or "").strip() + url = f"https://dart.fss.or.kr/dsaf001/main.do?rcpNo={rcept_no}" + try: + published_at = datetime.strptime(row.get("rcept_dt", ""), "%Y%m%d") + except ValueError: + continue + out.append( + Disclosure( + code=row.get("stock_code") or code, + title=title, + url=url, + published_at=published_at, + ) + ) + return out + + +def ping() -> dict[str, Any]: + if not _has_key(): + return {"status": "skipped_missing_key"} + try: + # 어제 기준 1건만 조회 (sanity) + from datetime import timedelta + items = fetch_disclosures("005930", date.today() - timedelta(days=30), date.today(), page_count=1) + return {"status": "ok", "sample_count": len(items)} + except SkippedMissingKey: + return {"status": "skipped_missing_key"} + except Exception as exc: # noqa: BLE001 + return {"status": "failed", "error": str(exc)} diff --git a/backend/app/fetch/kis.py b/backend/app/fetch/kis.py new file mode 100644 index 0000000..c7f6322 --- /dev/null +++ b/backend/app/fetch/kis.py @@ -0,0 +1,169 @@ +"""한국투자증권 KIS OpenAPI - READ ONLY 시세 조회. + +본 모듈은 주문/체결/잔고변경 endpoint를 일절 import 하지 않는다. +사용하는 endpoint: + - POST /oauth2/tokenP : 토큰 발급 + - GET /uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice + : 일/주/월봉 시세 조회 (read-only) + - GET /uapi/domestic-stock/v1/quotations/inquire-price + : 현재가 조회 (read-only) + +키 누락 시 모든 함수는 SkippedMissingKey 예외로 빠르게 실패하므로 호출 측에서 잡아서 +status='skipped_missing_key' 처리. +""" +from __future__ import annotations + +import logging +import threading +import time +from dataclasses import dataclass +from datetime import date, datetime +from typing import Any + +import httpx +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential + +from app.config import settings + +logger = logging.getLogger(__name__) + +KIS_BASE = "https://openapi.koreainvestment.com:9443" +USER_AGENT = "stock_chart_site/0.1 (+personal)" + + +class SkippedMissingKey(RuntimeError): + """KIS 키 미설정 시 발생. 호출 측에서 skipped 로 매핑.""" + + +@dataclass +class _Token: + value: str + expires_at: float # epoch seconds + + +_token_cache: _Token | None = None +_token_lock = threading.Lock() + + +def _has_keys() -> bool: + return bool(settings.kis_app_key and settings.kis_app_secret) + + +@retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=8), + retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)), + reraise=True, +) +def _issue_token() -> _Token: + if not _has_keys(): + raise SkippedMissingKey("kis app_key/secret missing") + url = f"{KIS_BASE}/oauth2/tokenP" + body = { + "grant_type": "client_credentials", + "appkey": settings.kis_app_key, + "appsecret": settings.kis_app_secret, + } + with httpx.Client(timeout=10.0, headers={"User-Agent": USER_AGENT}) as cli: + resp = cli.post(url, json=body) + resp.raise_for_status() + data = resp.json() + access = data["access_token"] + # KIS 는 expires_in (sec) 와 access_token_token_expired (epoch-like str) 둘 다 줌 + expires_in = int(data.get("expires_in", 3600)) + return _Token(value=access, expires_at=time.time() + expires_in - 60) + + +def get_token() -> str: + """캐시된 토큰 반환. 만료 60초 전부터 재발급. 키 없으면 SkippedMissingKey.""" + global _token_cache + with _token_lock: + if _token_cache and _token_cache.expires_at > time.time(): + return _token_cache.value + _token_cache = _issue_token() + logger.info("kis token issued, expires_at=%s", datetime.fromtimestamp(_token_cache.expires_at)) + return _token_cache.value + + +def _headers(tr_id: str) -> dict[str, str]: + return { + "Content-Type": "application/json; charset=utf-8", + "Accept": "application/json", + "User-Agent": USER_AGENT, + "authorization": f"Bearer {get_token()}", + "appkey": settings.kis_app_key or "", + "appsecret": settings.kis_app_secret or "", + "tr_id": tr_id, + "custtype": "P", + } + + +@retry( + stop=stop_after_attempt(2), + wait=wait_exponential(multiplier=1, min=1, max=4), + retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)), + reraise=True, +) +def fetch_daily_price( + code: str, + start: date, + end: date, + *, + period: str = "D", + adjusted: bool = True, +) -> list[dict[str, Any]]: + """일/주/월봉 시세 조회 (read-only). + + Returns: [{date, open, high, low, close, volume}, ...] + """ + if not _has_keys(): + raise SkippedMissingKey("kis app_key/secret missing") + url = f"{KIS_BASE}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice" + params = { + "FID_COND_MRKT_DIV_CODE": "J", # J=주식 + "FID_INPUT_ISCD": code, + "FID_INPUT_DATE_1": start.strftime("%Y%m%d"), + "FID_INPUT_DATE_2": end.strftime("%Y%m%d"), + "FID_PERIOD_DIV_CODE": period, # D/W/M + "FID_ORG_ADJ_PRC": "0" if adjusted else "1", + } + with httpx.Client(timeout=15.0) as cli: + resp = cli.get(url, headers=_headers("FHKST03010100"), params=params) + resp.raise_for_status() + data = resp.json() + if data.get("rt_cd") != "0": + raise RuntimeError(f"kis error: {data.get('msg1')} (rt_cd={data.get('rt_cd')})") + + out: list[dict[str, Any]] = [] + for row in data.get("output2", []) or []: + raw_date = row.get("stck_bsop_date") + if not raw_date: + continue + try: + day = datetime.strptime(raw_date, "%Y%m%d").date() + except ValueError: + continue + out.append( + { + "date": day, + "open": float(row.get("stck_oprc") or 0), + "high": float(row.get("stck_hgpr") or 0), + "low": float(row.get("stck_lwpr") or 0), + "close": float(row.get("stck_clpr") or 0), + "volume": int(row.get("acml_vol") or 0), + } + ) + return out + + +def ping() -> dict[str, Any]: + """토큰 발급만 시도해서 키 유효성 확인.""" + if not _has_keys(): + return {"status": "skipped_missing_key"} + try: + tok = get_token() + return {"status": "ok", "token_prefix": tok[:8] + "...", "len": len(tok)} + except SkippedMissingKey: + return {"status": "skipped_missing_key"} + except Exception as exc: # noqa: BLE001 + return {"status": "failed", "error": str(exc)} diff --git a/backend/app/fetch/macro.py b/backend/app/fetch/macro.py new file mode 100644 index 0000000..5f9c418 --- /dev/null +++ b/backend/app/fetch/macro.py @@ -0,0 +1,83 @@ +"""거시/지수 지표: yfinance 로 KOSPI / KOSDAQ / USDKRW / US10Y.""" +from __future__ import annotations + +import logging +from dataclasses import dataclass +from datetime import date, timedelta + +from sqlalchemy import text + +from app.db.connection import get_engine + +logger = logging.getLogger(__name__) + +# yfinance ticker -> macro_daily.key +TICKER_MAP: dict[str, str] = { + "^KS11": "kospi", + "^KQ11": "kosdaq", + "KRW=X": "usdkrw", + "^TNX": "us10y", +} + + +@dataclass +class MacroResult: + key: str + inserted: int + updated: int + error: str | None = None + + def status(self) -> str: + return "failed" if self.error else "ok" + + +def fetch_macro_daily(*, years: int = 5) -> list[MacroResult]: + end = date.today() + start = end - timedelta(days=365 * years) + try: + import yfinance as yf + except Exception as exc: # noqa: BLE001 + return [MacroResult(key="*", inserted=0, updated=0, error=f"yfinance import failed: {exc}")] + + results: list[MacroResult] = [] + engine = get_engine() + for ticker, key in TICKER_MAP.items(): + try: + df = yf.download(ticker, start=start.isoformat(), end=(end + timedelta(days=1)).isoformat(), + progress=False, auto_adjust=False) + except Exception as exc: # noqa: BLE001 + logger.exception("yfinance failed ticker=%s", ticker) + results.append(MacroResult(key=key, inserted=0, updated=0, error=str(exc))) + continue + if df is None or df.empty: + results.append(MacroResult(key=key, inserted=0, updated=0, error="empty")) + continue + # Close 컬럼만 사용 + close_col = "Close" if "Close" in df.columns else df.columns[0] + ins = upd = 0 + with engine.begin() as conn: + for idx, row in df.iterrows(): + day = idx.date() if hasattr(idx, "date") else date.fromisoformat(str(idx)[:10]) + val = row[close_col] + try: + val_f = float(val.iloc[0] if hasattr(val, "iloc") else val) + except Exception: # noqa: BLE001 + continue + res = conn.execute( + text( + """ + INSERT INTO macro_daily (date, key, value) + VALUES (:date, :key, :value) + ON CONFLICT (date, key) DO UPDATE SET value = EXCLUDED.value + RETURNING (xmax = 0) AS inserted + """ + ), + {"date": day, "key": key, "value": val_f}, + ) + r = res.first() + if r and r[0]: + ins += 1 + else: + upd += 1 + results.append(MacroResult(key=key, inserted=ins, updated=upd)) + return results diff --git a/backend/app/fetch/news.py b/backend/app/fetch/news.py new file mode 100644 index 0000000..c02b319 --- /dev/null +++ b/backend/app/fetch/news.py @@ -0,0 +1,141 @@ +"""뉴스 수집: 네이버 금융 종목 페이지 + 구글 뉴스 RSS. + +차단 위험 줄이려고 User-Agent + timeout + retry. URL unique 제약으로 dedupe. +""" +from __future__ import annotations + +import logging +import re +from dataclasses import dataclass +from datetime import datetime, timezone, timedelta +from typing import Any + +import feedparser +import httpx +from bs4 import BeautifulSoup +from sqlalchemy import text +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential + +from app.db.connection import get_engine + +logger = logging.getLogger(__name__) +USER_AGENT = "Mozilla/5.0 (compatible; stock_chart_site/0.1; +personal)" +KST = timezone(timedelta(hours=9)) + + +@dataclass +class NewsItem: + code: str | None + source: str + title: str + url: str + published_at: datetime + body: str | None = None + + +@retry( + stop=stop_after_attempt(2), + wait=wait_exponential(multiplier=1, min=1, max=4), + retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)), + reraise=True, +) +def fetch_naver_finance_news(code: str, *, max_pages: int = 1) -> list[NewsItem]: + """네이버 금융 종목 뉴스. https://finance.naver.com/item/news_news.naver?code=005930""" + out: list[NewsItem] = [] + for page in range(1, max_pages + 1): + url = ( + f"https://finance.naver.com/item/news_news.naver" + f"?code={code}&page={page}&sm=title_entity_id.basic&clusterId=" + ) + with httpx.Client(timeout=10.0, headers={"User-Agent": USER_AGENT, "Referer": "https://finance.naver.com/"}) as cli: + resp = cli.get(url) + resp.raise_for_status() + html = resp.text + soup = BeautifulSoup(html, "lxml") + for tr in soup.select("table.type5 tr"): + a = tr.select_one("a.tit") or tr.select_one("td.title a") + if not a: + continue + link = a.get("href") or "" + if not link: + continue + full_url = link if link.startswith("http") else f"https://finance.naver.com{link}" + title = a.get_text(strip=True) + time_td = tr.select_one("td.date") + time_text = time_td.get_text(strip=True) if time_td else "" + published_at = _parse_naver_time(time_text) + if not title or not published_at: + continue + out.append(NewsItem(code=code, source="naver_finance", title=title, url=full_url, published_at=published_at)) + return out + + +def _parse_naver_time(s: str) -> datetime | None: + s = s.strip() + # '2026.05.20 13:24' 형태 + m = re.match(r"(\d{4})[.-](\d{2})[.-](\d{2})\s+(\d{2}):(\d{2})", s) + if m: + y, mo, d, h, mi = (int(x) for x in m.groups()) + return datetime(y, mo, d, h, mi, tzinfo=KST) + return None + + +@retry( + stop=stop_after_attempt(2), + wait=wait_exponential(multiplier=1, min=1, max=4), + retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)), + reraise=True, +) +def fetch_google_news_rss(query: str, *, code: str | None = None, hl: str = "ko", gl: str = "KR") -> list[NewsItem]: + """Google News RSS 검색. 종목명(또는 코드)로 쿼리.""" + url = f"https://news.google.com/rss/search?q={query}&hl={hl}&gl={gl}&ceid={gl}:{hl}" + with httpx.Client(timeout=10.0, headers={"User-Agent": USER_AGENT}) as cli: + resp = cli.get(url) + resp.raise_for_status() + body = resp.text + feed = feedparser.parse(body) + out: list[NewsItem] = [] + for entry in feed.entries: + title = (entry.get("title") or "").strip() + link = entry.get("link") or "" + if not title or not link: + continue + published = entry.get("published_parsed") + if not published: + continue + pub_dt = datetime(*published[:6], tzinfo=timezone.utc) + out.append(NewsItem(code=code, source="google_rss", title=title, url=link, published_at=pub_dt)) + return out + + +def upsert_news(items: list[NewsItem]) -> tuple[int, int]: + """news 테이블에 upsert. (inserted, skipped) 반환.""" + if not items: + return 0, 0 + engine = get_engine() + inserted = skipped = 0 + with engine.begin() as conn: + for item in items: + res = conn.execute( + text( + """ + INSERT INTO news (code, source, published_at, title, url, body) + VALUES (:code, :source, :published_at, :title, :url, :body) + ON CONFLICT (url) DO NOTHING + RETURNING id + """ + ), + { + "code": item.code, + "source": item.source, + "published_at": item.published_at, + "title": item.title, + "url": item.url, + "body": item.body, + }, + ) + if res.first(): + inserted += 1 + else: + skipped += 1 + return inserted, skipped diff --git a/backend/app/fetch/pykrx_helper.py b/backend/app/fetch/pykrx_helper.py new file mode 100644 index 0000000..630bf72 --- /dev/null +++ b/backend/app/fetch/pykrx_helper.py @@ -0,0 +1,150 @@ +"""pykrx 기반 OHLCV / 외인-기관 거래대금 수집. + +차단 위험을 줄이려고 호출 사이에 짧은 슬립을 둔다. 5년치는 종목당 약 1~2초. +""" +from __future__ import annotations + +import logging +import time +from dataclasses import dataclass +from datetime import date, datetime, timedelta + +from sqlalchemy import text + +from app.db.connection import get_engine + +logger = logging.getLogger(__name__) + +# pykrx 는 'YYYYMMDD' 형식을 받음 +_PYKRX_DATE = "%Y%m%d" + + +@dataclass +class FetchResult: + code: str + source: str + inserted: int + updated: int + error: str | None = None + + def status(self) -> str: + if self.error: + return "failed" + return "ok" + + +def _to_pykrx(d: date) -> str: + return d.strftime(_PYKRX_DATE) + + +def fetch_ohlcv_daily(code: str, start: date, end: date, *, sleep_s: float = 0.2) -> FetchResult: + """일봉 OHLCV 를 가져와 ohlcv_daily 에 upsert.""" + try: + from pykrx import stock as krx + df = krx.get_market_ohlcv(_to_pykrx(start), _to_pykrx(end), code) + except Exception as exc: # noqa: BLE001 + logger.exception("pykrx ohlcv failed code=%s", code) + return FetchResult(code, "pykrx_ohlcv", 0, 0, error=str(exc)) + if df is None or df.empty: + return FetchResult(code, "pykrx_ohlcv", 0, 0, error="empty") + + engine = get_engine() + inserted = updated = 0 + with engine.begin() as conn: + for d, row in df.iterrows(): + try: + day = d.date() if hasattr(d, "date") else date.fromisoformat(str(d)[:10]) + except Exception: # noqa: BLE001 + continue + res = conn.execute( + text( + """ + INSERT INTO ohlcv_daily (code, date, open, high, low, close, volume) + VALUES (:code, :date, :open, :high, :low, :close, :volume) + ON CONFLICT (code, date) DO UPDATE + SET open=EXCLUDED.open, high=EXCLUDED.high, low=EXCLUDED.low, + close=EXCLUDED.close, volume=EXCLUDED.volume + RETURNING (xmax = 0) AS inserted + """ + ), + { + "code": code, + "date": day, + "open": float(row.get("시가", 0) or 0), + "high": float(row.get("고가", 0) or 0), + "low": float(row.get("저가", 0) or 0), + "close": float(row.get("종가", 0) or 0), + "volume": int(row.get("거래량", 0) or 0), + }, + ) + r = res.first() + if r and r[0]: + inserted += 1 + else: + updated += 1 + time.sleep(sleep_s) + return FetchResult(code, "pykrx_ohlcv", inserted, updated) + + +def fetch_trading_value(code: str, start: date, end: date, *, sleep_s: float = 0.3) -> FetchResult: + """외인/기관/개인 순매수 거래대금.""" + try: + from pykrx import stock as krx + df = krx.get_market_trading_value_by_date( + _to_pykrx(start), _to_pykrx(end), code, + ) + except Exception as exc: # noqa: BLE001 + logger.exception("pykrx trading_value failed code=%s", code) + return FetchResult(code, "pykrx_trading_value", 0, 0, error=str(exc)) + if df is None or df.empty: + return FetchResult(code, "pykrx_trading_value", 0, 0, error="empty") + + # 컬럼명: 외국인합계 / 기관합계 / 개인 (pykrx 1.0.x 기준) + col_foreign = next((c for c in df.columns if "외국인" in c), None) + col_inst = next((c for c in df.columns if "기관" in c), None) + col_indiv = next((c for c in df.columns if "개인" in c), None) + + engine = get_engine() + inserted = updated = 0 + with engine.begin() as conn: + for d, row in df.iterrows(): + try: + day = d.date() if hasattr(d, "date") else date.fromisoformat(str(d)[:10]) + except Exception: # noqa: BLE001 + continue + res = conn.execute( + text( + """ + INSERT INTO trading_value_daily (code, date, foreign_net, institution_net, individual_net) + VALUES (:code, :date, :f, :i, :p) + ON CONFLICT (code, date) DO UPDATE + SET foreign_net=EXCLUDED.foreign_net, + institution_net=EXCLUDED.institution_net, + individual_net=EXCLUDED.individual_net + RETURNING (xmax = 0) AS inserted + """ + ), + { + "code": code, + "date": day, + "f": float(row.get(col_foreign, 0) or 0) if col_foreign else 0.0, + "i": float(row.get(col_inst, 0) or 0) if col_inst else 0.0, + "p": float(row.get(col_indiv, 0) or 0) if col_indiv else 0.0, + }, + ) + r = res.first() + if r and r[0]: + inserted += 1 + else: + updated += 1 + time.sleep(sleep_s) + return FetchResult(code, "pykrx_trading_value", inserted, updated) + + +def backfill_code(code: str, *, years: int = 5) -> list[FetchResult]: + end = date.today() + start = end - timedelta(days=365 * years) + return [ + fetch_ohlcv_daily(code, start, end), + fetch_trading_value(code, start, end), + ] diff --git a/backend/app/fetch/symbols_seed.py b/backend/app/fetch/symbols_seed.py new file mode 100644 index 0000000..3d813b4 --- /dev/null +++ b/backend/app/fetch/symbols_seed.py @@ -0,0 +1,99 @@ +"""KRX 전 종목 리스트를 symbols 테이블에 시드한다. + +검색 UX 가 KRX 전체 종목명을 대상으로 동작해야 하므로 전 종목을 미리 적재한다. +10 개 SEED_TICKERS 는 is_seed=TRUE 로 마크. +""" +from __future__ import annotations + +import logging +from dataclasses import dataclass + +from sqlalchemy import text + +from app.db.connection import get_engine +from app.seed.seed_tickers import SEED_CODES, SEED_TICKERS + +logger = logging.getLogger(__name__) + + +@dataclass +class SeedReport: + inserted: int + updated: int + seed_marked: int + markets: dict[str, int] + + +def _fetch_market_listing(market: str) -> list[tuple[str, str]]: + """pykrx 로 한 시장의 (code, name) 목록을 가져온다. + + pykrx 가 외부 KRX 서버에 의존하므로 호출 측에서 예외 처리한다. + """ + from pykrx import stock as krx # local import: heavy import + + tickers = krx.get_market_ticker_list(market=market) + out: list[tuple[str, str]] = [] + for code in tickers: + name = krx.get_market_ticker_name(code) or "" + if not name: + continue + out.append((code, name)) + return out + + +def seed_symbols() -> SeedReport: + """KOSPI + KOSDAQ 전 종목을 upsert. SEED 10 종목은 is_seed=TRUE.""" + rows: list[tuple[str, str, str]] = [] # (code, name, market) + market_counts: dict[str, int] = {} + for market in ("KOSPI", "KOSDAQ"): + listing = _fetch_market_listing(market) + market_counts[market] = len(listing) + for code, name in listing: + rows.append((code, name, market)) + + engine = get_engine() + inserted = updated = 0 + seed_marked = 0 + with engine.begin() as conn: + for code, name, market in rows: + is_seed = code in SEED_CODES + res = conn.execute( + text( + """ + INSERT INTO symbols (code, name, market, is_seed) + VALUES (:code, :name, :market, :is_seed) + ON CONFLICT (code) DO UPDATE + SET name = EXCLUDED.name, + market = EXCLUDED.market, + is_seed = symbols.is_seed OR EXCLUDED.is_seed + RETURNING (xmax = 0) AS inserted + """ + ), + {"code": code, "name": name, "market": market, "is_seed": is_seed}, + ) + row = res.first() + if row and row[0]: + inserted += 1 + else: + updated += 1 + if is_seed: + seed_marked += 1 + + # SEED_TICKERS 중 KRX 리스팅에 없으면 (상장폐지 등) 그래도 명시적으로 시드 row 보장 + for t in SEED_TICKERS: + conn.execute( + text( + """ + INSERT INTO symbols (code, name, market, is_seed) + VALUES (:code, :name, :market, TRUE) + ON CONFLICT (code) DO UPDATE SET is_seed = TRUE + """ + ), + {"code": t.code, "name": t.name, "market": t.market}, + ) + + logger.info( + "seed_symbols done: inserted=%d updated=%d seed_marked=%d markets=%s", + inserted, updated, seed_marked, market_counts, + ) + return SeedReport(inserted=inserted, updated=updated, seed_marked=seed_marked, markets=market_counts) diff --git a/backend/app/main.py b/backend/app/main.py index bc70052..b8a840c 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,18 +1,37 @@ from __future__ import annotations import logging +from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware +from app.api.refresh import router as refresh_router from app.config import settings from app.db.connection import ping as db_ping +from app.fetch import dart as dart_mod +from app.fetch import kis as kis_mod +from app.pipelines.scheduler import shutdown_scheduler, start_scheduler -logging.basicConfig(level=settings.log_level) +logging.basicConfig( + level=settings.log_level, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", +) logger = logging.getLogger(__name__) -app = FastAPI(title="stock_chart_site", version="0.0.1") +@asynccontextmanager +async def lifespan(_: FastAPI): + # 스케줄러는 옵션. CI/테스트에서 disable 하고 싶으면 SCHEDULER_DISABLED 같은 env 추가 가능. + try: + start_scheduler() + except Exception: # noqa: BLE001 + logger.exception("scheduler start failed") + yield + shutdown_scheduler() + + +app = FastAPI(title="stock_chart_site", version="0.1.0", lifespan=lifespan) app.add_middleware( CORSMiddleware, @@ -21,6 +40,8 @@ app.add_middleware( allow_headers=["*"], ) +app.include_router(refresh_router) + def _resolved_device() -> str: if settings.model_device != "auto": @@ -34,13 +55,19 @@ def _resolved_device() -> str: @app.get("/health") def health() -> dict[str, object]: - return { - "ok": True, - "device": _resolved_device(), - "version": "0.0.1", - } + return {"ok": True, "device": _resolved_device(), "version": "0.1.0"} @app.get("/health/db") def health_db() -> dict[str, object]: return {"ok": True, **db_ping()} + + +@app.get("/health/keys") +def health_keys() -> dict[str, object]: + """등록된 외부 키들 ping (key 값은 노출하지 않음).""" + return { + "kis": kis_mod.ping(), + "dart": dart_mod.ping(), + # huggingface 는 모델 다운로드 시점에 확인 (별도 ping 호출 비용 회피) + } diff --git a/backend/app/pipelines/backfill.py b/backend/app/pipelines/backfill.py new file mode 100644 index 0000000..a02e23a --- /dev/null +++ b/backend/app/pipelines/backfill.py @@ -0,0 +1,93 @@ +"""5년치 백필 CLI. + +사용 예: + # 한 종목, 명시 기간 + python -m app.pipelines.backfill --code 005930 --from 2020-01-01 --to 2025-12-31 + + # 시드 10종목 일괄, 기본 5년 + python -m app.pipelines.backfill --all-seed --years 5 + + # 시드 + 거시 지표 + python -m app.pipelines.backfill --all-seed --include-macro +""" +from __future__ import annotations + +import argparse +import json +import logging +from datetime import date, timedelta + +from app.fetch import macro as macro_mod +from app.fetch import pykrx_helper +from app.fetch import symbols_seed +from app.seed.seed_tickers import SEED_TICKERS + +logger = logging.getLogger(__name__) + + +def _parse_date(s: str) -> date: + return date.fromisoformat(s) + + +def cmd_backfill_code(code: str, start: date, end: date) -> dict: + res_ohlcv = pykrx_helper.fetch_ohlcv_daily(code, start, end) + res_trade = pykrx_helper.fetch_trading_value(code, start, end) + return { + "code": code, + "pykrx_ohlcv": {"status": res_ohlcv.status(), "inserted": res_ohlcv.inserted, + "updated": res_ohlcv.updated, "error": res_ohlcv.error}, + "pykrx_trading_value": {"status": res_trade.status(), "inserted": res_trade.inserted, + "updated": res_trade.updated, "error": res_trade.error}, + } + + +def main(argv: list[str] | None = None) -> int: + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") + p = argparse.ArgumentParser(description="pykrx 5년치 백필") + p.add_argument("--code", help="단일 종목 코드 (예: 005930)") + p.add_argument("--from", dest="start", help="시작일 YYYY-MM-DD") + p.add_argument("--to", dest="end", help="종료일 YYYY-MM-DD") + p.add_argument("--years", type=int, default=5, help="명시 기간 없을 때 최근 N년 (기본 5)") + p.add_argument("--all-seed", action="store_true", help="시드 10종목 일괄") + p.add_argument("--seed-symbols-table", action="store_true", + help="KRX 전 종목 symbols 테이블에 시드 (검색 UX용)") + p.add_argument("--include-macro", action="store_true", help="거시 지표 (KOSPI/USDKRW/^TNX) 동시 적재") + args = p.parse_args(argv) + + end = _parse_date(args.end) if args.end else date.today() + start = _parse_date(args.start) if args.start else (end - timedelta(days=365 * args.years)) + + summary: dict = {} + + if args.seed_symbols_table: + rep = symbols_seed.seed_symbols() + summary["symbols_seed"] = { + "inserted": rep.inserted, + "updated": rep.updated, + "seed_marked": rep.seed_marked, + "markets": rep.markets, + } + + if args.code: + summary["backfill"] = [cmd_backfill_code(args.code, start, end)] + elif args.all_seed: + results = [] + for t in SEED_TICKERS: + logger.info("backfill %s %s..%s", t.code, start, end) + results.append(cmd_backfill_code(t.code, start, end)) + summary["backfill"] = results + + if args.include_macro: + macros = macro_mod.fetch_macro_daily(years=args.years) + summary["macro"] = [ + {"key": m.key, "status": m.status(), "inserted": m.inserted, + "updated": m.updated, "error": m.error} + for m in macros + ] + + print(json.dumps(summary, ensure_ascii=False, indent=2, default=str)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/backend/app/pipelines/daily_batch.py b/backend/app/pipelines/daily_batch.py new file mode 100644 index 0000000..16a85d5 --- /dev/null +++ b/backend/app/pipelines/daily_batch.py @@ -0,0 +1,46 @@ +"""일별 배치: 16:00 KST 에 시드 10종목 + 거시 + 뉴스 + DART 갱신. + +수동 실행: + python -m app.pipelines.daily_batch +""" +from __future__ import annotations + +import json +import logging +import time +from typing import Any + +from app.fetch import macro as macro_mod +from app.pipelines.refresh_one import refresh_code +from app.seed.seed_tickers import SEED_TICKERS + +logger = logging.getLogger(__name__) + + +def run_daily_batch() -> dict[str, Any]: + start_ts = time.time() + reports: list[dict[str, Any]] = [] + for t in SEED_TICKERS: + logger.info("daily_batch refresh %s %s", t.code, t.name) + rep = refresh_code(t.code, t.name, lookback_days=7) + reports.append(rep.to_dict()) + + macros = macro_mod.fetch_macro_daily(years=1) + macro_summary = [ + {"key": m.key, "status": m.status(), "inserted": m.inserted, + "updated": m.updated, "error": m.error} + for m in macros + ] + + elapsed = time.time() - start_ts + return { + "duration_seconds": round(elapsed, 2), + "tickers": reports, + "macro": macro_summary, + } + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") + out = run_daily_batch() + print(json.dumps(out, ensure_ascii=False, indent=2, default=str)) diff --git a/backend/app/pipelines/refresh_one.py b/backend/app/pipelines/refresh_one.py new file mode 100644 index 0000000..faae88c --- /dev/null +++ b/backend/app/pipelines/refresh_one.py @@ -0,0 +1,147 @@ +"""한 종목에 대해 모든 소스를 갱신 + 구조화된 status 리턴. + +POST /api/refresh/{code} 와 daily_batch 둘 다 이 함수를 호출. +""" +from __future__ import annotations + +import logging +from dataclasses import asdict, dataclass, field +from datetime import date, timedelta +from typing import Any + +from app.fetch import dart as dart_mod +from app.fetch import kis as kis_mod +from app.fetch import news as news_mod +from app.fetch import pykrx_helper + +logger = logging.getLogger(__name__) + + +@dataclass +class SourceStatus: + status: str # 'ok' / 'skipped_missing_key' / 'failed' + inserted: int = 0 + updated: int = 0 + skipped: int = 0 + extra: dict[str, Any] = field(default_factory=dict) + error: str | None = None + + +@dataclass +class RefreshReport: + code: str + pykrx_ohlcv: SourceStatus + pykrx_trading_value: SourceStatus + kis_daily: SourceStatus + dart: SourceStatus + naver_news: SourceStatus + google_rss: SourceStatus + + def to_dict(self) -> dict[str, Any]: + out: dict[str, Any] = {"code": self.code} + for f in ( + "pykrx_ohlcv", + "pykrx_trading_value", + "kis_daily", + "dart", + "naver_news", + "google_rss", + ): + v: SourceStatus = getattr(self, f) + out[f] = asdict(v) + return out + + +def _pykrx_ohlcv(code: str, start: date, end: date) -> SourceStatus: + try: + res = pykrx_helper.fetch_ohlcv_daily(code, start, end) + return SourceStatus( + status=res.status(), + inserted=res.inserted, + updated=res.updated, + error=res.error, + ) + except Exception as exc: # noqa: BLE001 + return SourceStatus(status="failed", error=str(exc)) + + +def _pykrx_trading(code: str, start: date, end: date) -> SourceStatus: + try: + res = pykrx_helper.fetch_trading_value(code, start, end) + return SourceStatus( + status=res.status(), + inserted=res.inserted, + updated=res.updated, + error=res.error, + ) + except Exception as exc: # noqa: BLE001 + return SourceStatus(status="failed", error=str(exc)) + + +def _kis(code: str, start: date, end: date) -> SourceStatus: + """KIS read-only EOD. 실제 DB 적재는 하지 않고 sanity 호출 + sample row 수만 리포트. + pykrx 와 중복 데이터이므로 KIS 는 백업/실시간 용도이고, 일별 적재는 pykrx 가 1차. + """ + try: + rows = kis_mod.fetch_daily_price(code, start, end) + return SourceStatus(status="ok", extra={"sample_rows": len(rows)}) + except kis_mod.SkippedMissingKey: + return SourceStatus(status="skipped_missing_key") + except Exception as exc: # noqa: BLE001 + return SourceStatus(status="failed", error=str(exc)) + + +def _dart(code: str, start: date, end: date) -> SourceStatus: + try: + items = dart_mod.fetch_disclosures(code, start, end) + # 공시는 news 테이블에 upsert + news_items = [ + news_mod.NewsItem( + code=d.code, + source="dart", + title=d.title, + url=d.url, + published_at=d.published_at, + ) + for d in items + ] + ins, skip = news_mod.upsert_news(news_items) + return SourceStatus(status="ok", inserted=ins, skipped=skip, extra={"fetched": len(items)}) + except dart_mod.SkippedMissingKey: + return SourceStatus(status="skipped_missing_key") + except Exception as exc: # noqa: BLE001 + return SourceStatus(status="failed", error=str(exc)) + + +def _naver_news(code: str) -> SourceStatus: + try: + items = news_mod.fetch_naver_finance_news(code, max_pages=1) + ins, skip = news_mod.upsert_news(items) + return SourceStatus(status="ok", inserted=ins, skipped=skip, extra={"fetched": len(items)}) + except Exception as exc: # noqa: BLE001 + return SourceStatus(status="failed", error=str(exc)) + + +def _google_rss(code: str, name: str) -> SourceStatus: + try: + query = name or code + items = news_mod.fetch_google_news_rss(query, code=code) + ins, skip = news_mod.upsert_news(items) + return SourceStatus(status="ok", inserted=ins, skipped=skip, extra={"fetched": len(items)}) + except Exception as exc: # noqa: BLE001 + return SourceStatus(status="failed", error=str(exc)) + + +def refresh_code(code: str, name: str, *, lookback_days: int = 7) -> RefreshReport: + """단기 갱신 (daily_batch 용). 최근 lookback_days 만 가져온다.""" + end = date.today() + start = end - timedelta(days=lookback_days) + return RefreshReport( + code=code, + pykrx_ohlcv=_pykrx_ohlcv(code, start, end), + pykrx_trading_value=_pykrx_trading(code, start, end), + kis_daily=_kis(code, start, end), + dart=_dart(code, start, end), + naver_news=_naver_news(code), + google_rss=_google_rss(code, name), + ) diff --git a/backend/app/pipelines/scheduler.py b/backend/app/pipelines/scheduler.py new file mode 100644 index 0000000..5abbd00 --- /dev/null +++ b/backend/app/pipelines/scheduler.py @@ -0,0 +1,46 @@ +"""APScheduler 기반 백그라운드 잡. + +- 16:00 KST : daily_batch +- (Phase 4) 16:30: 모델 추론, 02:00 일요일: 주간 재학습 + +FastAPI 기동 시점에 lifespan 으로 start, 종료 시 shutdown. +""" +from __future__ import annotations + +import logging + +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger +from pytz import timezone + +from app.pipelines.daily_batch import run_daily_batch + +logger = logging.getLogger(__name__) +KST = timezone("Asia/Seoul") + +_scheduler: BackgroundScheduler | None = None + + +def start_scheduler() -> BackgroundScheduler: + global _scheduler + if _scheduler: + return _scheduler + _scheduler = BackgroundScheduler(timezone=KST) + _scheduler.add_job( + run_daily_batch, + CronTrigger(hour=16, minute=0, timezone=KST), + id="daily_batch_16", + replace_existing=True, + max_instances=1, + ) + _scheduler.start() + logger.info("scheduler started (daily_batch @ 16:00 KST)") + return _scheduler + + +def shutdown_scheduler() -> None: + global _scheduler + if _scheduler: + _scheduler.shutdown(wait=False) + _scheduler = None + logger.info("scheduler stopped") diff --git a/backend/app/seed/__init__.py b/backend/app/seed/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/seed/seed_tickers.py b/backend/app/seed/seed_tickers.py new file mode 100644 index 0000000..fdc651d --- /dev/null +++ b/backend/app/seed/seed_tickers.py @@ -0,0 +1,31 @@ +"""학습/배치 대상 시드 종목 10개. + +분류는 README와 일치. 운영하면서 교체 가능 (DB의 is_seed 플래그만 토글). +""" +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class SeedTicker: + code: str + name: str + market: str # 'KOSPI' / 'KOSDAQ' + category: str # 분류 라벨 (README 표와 일치) + + +SEED_TICKERS: list[SeedTicker] = [ + SeedTicker("005930", "삼성전자", "KOSPI", "large_cap"), + SeedTicker("000660", "SK하이닉스", "KOSPI", "large_cap"), + SeedTicker("247540", "에코프로비엠", "KOSDAQ", "high_volatility"), + SeedTicker("042700", "한미반도체", "KOSPI", "high_volatility"), + SeedTicker("034020", "두산에너빌리티", "KOSPI", "thematic"), + SeedTicker("012450", "한화에어로스페이스", "KOSPI", "thematic"), + SeedTicker("329180", "HD현대중공업", "KOSPI", "thematic"), + SeedTicker("035420", "NAVER", "KOSPI", "platform"), + SeedTicker("033780", "KT&G", "KOSPI", "defensive"), + SeedTicker("036460", "한국가스공사", "KOSPI", "defensive"), +] + +SEED_CODES: set[str] = {t.code for t in SEED_TICKERS}