"""한국투자증권 KIS OpenAPI - READ ONLY 시세 조회. 본 모듈은 주문/체결/잔고변경 endpoint를 일절 import 하지 않는다. 사용하는 endpoint: - POST /oauth2/tokenP : 토큰 발급 - GET /uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice : 일/주/월봉 시세 조회 (read-only) - GET /uapi/domestic-stock/v1/quotations/inquire-price : 현재가 조회 (read-only) 키 누락 시 모든 함수는 SkippedMissingKey 예외로 빠르게 실패하므로 호출 측에서 잡아서 status='skipped_missing_key' 처리. """ from __future__ import annotations import logging import threading import time from dataclasses import dataclass from datetime import date, datetime from typing import Any import httpx from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential from app.config import settings logger = logging.getLogger(__name__) KIS_BASE = "https://openapi.koreainvestment.com:9443" USER_AGENT = "stock_chart_site/0.1 (+personal)" class SkippedMissingKey(RuntimeError): """KIS 키 미설정 시 발생. 호출 측에서 skipped 로 매핑.""" @dataclass class _Token: value: str expires_at: float # epoch seconds _token_cache: _Token | None = None _token_lock = threading.Lock() def _has_keys() -> bool: return bool(settings.kis_app_key and settings.kis_app_secret) @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=8), retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)), reraise=True, ) def _issue_token() -> _Token: if not _has_keys(): raise SkippedMissingKey("kis app_key/secret missing") url = f"{KIS_BASE}/oauth2/tokenP" body = { "grant_type": "client_credentials", "appkey": settings.kis_app_key, "appsecret": settings.kis_app_secret, } with httpx.Client(timeout=10.0, headers={"User-Agent": USER_AGENT}) as cli: resp = cli.post(url, json=body) resp.raise_for_status() data = resp.json() access = data["access_token"] # KIS 는 expires_in (sec) 와 access_token_token_expired (epoch-like str) 둘 다 줌 expires_in = int(data.get("expires_in", 3600)) return _Token(value=access, expires_at=time.time() + expires_in - 60) def get_token() -> str: """캐시된 토큰 반환. 만료 60초 전부터 재발급. 키 없으면 SkippedMissingKey.""" global _token_cache with _token_lock: if _token_cache and _token_cache.expires_at > time.time(): return _token_cache.value _token_cache = _issue_token() logger.info("kis token issued, expires_at=%s", datetime.fromtimestamp(_token_cache.expires_at)) return _token_cache.value def _headers(tr_id: str) -> dict[str, str]: return { "Content-Type": "application/json; charset=utf-8", "Accept": "application/json", "User-Agent": USER_AGENT, "authorization": f"Bearer {get_token()}", "appkey": settings.kis_app_key or "", "appsecret": settings.kis_app_secret or "", "tr_id": tr_id, "custtype": "P", } @retry( stop=stop_after_attempt(2), wait=wait_exponential(multiplier=1, min=1, max=4), retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)), reraise=True, ) def fetch_daily_price( code: str, start: date, end: date, *, period: str = "D", adjusted: bool = True, ) -> list[dict[str, Any]]: """일/주/월봉 시세 조회 (read-only). Returns: [{date, open, high, low, close, volume}, ...] """ if not _has_keys(): raise SkippedMissingKey("kis app_key/secret missing") url = f"{KIS_BASE}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice" params = { "FID_COND_MRKT_DIV_CODE": "J", # J=주식 "FID_INPUT_ISCD": code, "FID_INPUT_DATE_1": start.strftime("%Y%m%d"), "FID_INPUT_DATE_2": end.strftime("%Y%m%d"), "FID_PERIOD_DIV_CODE": period, # D/W/M "FID_ORG_ADJ_PRC": "0" if adjusted else "1", } with httpx.Client(timeout=15.0) as cli: resp = cli.get(url, headers=_headers("FHKST03010100"), params=params) resp.raise_for_status() data = resp.json() if data.get("rt_cd") != "0": raise RuntimeError(f"kis error: {data.get('msg1')} (rt_cd={data.get('rt_cd')})") out: list[dict[str, Any]] = [] for row in data.get("output2", []) or []: raw_date = row.get("stck_bsop_date") if not raw_date: continue try: day = datetime.strptime(raw_date, "%Y%m%d").date() except ValueError: continue out.append( { "date": day, "open": float(row.get("stck_oprc") or 0), "high": float(row.get("stck_hgpr") or 0), "low": float(row.get("stck_lwpr") or 0), "close": float(row.get("stck_clpr") or 0), "volume": int(row.get("acml_vol") or 0), } ) return out def ping() -> dict[str, Any]: """토큰 발급만 시도해서 키 유효성 확인.""" if not _has_keys(): return {"status": "skipped_missing_key"} try: tok = get_token() return {"status": "ok", "token_prefix": tok[:8] + "...", "len": len(tok)} except SkippedMissingKey: return {"status": "skipped_missing_key"} except Exception as exc: # noqa: BLE001 return {"status": "failed", "error": str(exc)}