"""한국투자증권 KIS OpenAPI - READ ONLY 시세 조회. 본 모듈은 주문/체결/잔고변경 endpoint를 일절 import 하지 않는다. 사용하는 endpoint: - POST /oauth2/tokenP : 토큰 발급 - GET /uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice : 일/주/월봉 시세 조회 (read-only) - GET /uapi/domestic-stock/v1/quotations/inquire-price : 현재가 조회 (read-only) 키 누락 시 모든 함수는 SkippedMissingKey 예외로 빠르게 실패하므로 호출 측에서 잡아서 status='skipped_missing_key' 처리. """ from __future__ import annotations import json import logging import os import threading import time from dataclasses import dataclass from datetime import date, datetime from pathlib import Path from typing import Any import httpx from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential from app.config import settings logger = logging.getLogger(__name__) KIS_BASE = "https://openapi.koreainvestment.com:9443" USER_AGENT = "stock_chart_site/0.1 (+personal)" # 토큰 디스크 캐시 경로. 기본값은 컨테이너 안 /app/.cache/kis_token.json — docker-compose # 의 `./backend:/app` 바인드 마운트 덕에 호스트 `./backend/.cache/` 에 영속된다. # `backend/.cache/` 는 .gitignore 에 들어있어 secrets 가 커밋되지 않는다. # # 왜 디스크 캐시가 필요한가: # KIS 는 access_token 발급을 1분 1회, 하루 N회로 강하게 제한한다. 메모리만 쓰면 # `restart.bat` / `build.bat` / 컨테이너 재기동 때마다 새 발급 → 403 (EGW00133 등) 빈발. # 토큰 자체는 24시간 유효하므로, 컨테이너 인스턴스가 바뀌어도 같은 토큰을 재사용한다. _TOKEN_CACHE_PATH = Path(os.environ.get("KIS_TOKEN_CACHE_PATH", "/app/.cache/kis_token.json")) class SkippedMissingKey(RuntimeError): """KIS 키 미설정 시 발생. 호출 측에서 skipped 로 매핑.""" @dataclass class _Token: value: str expires_at: float # epoch seconds _token_cache: _Token | None = None _token_lock = threading.Lock() def _has_keys() -> bool: return bool(settings.kis_app_key and settings.kis_app_secret) def _current_key_prefix() -> str: # app_key 가 바뀌었는데 옛 키로 받은 토큰을 그대로 쓰면 401. 캐시 무효화 키로 사용. return (settings.kis_app_key or "")[:8] def _load_disk_cache() -> _Token | None: try: with _TOKEN_CACHE_PATH.open() as f: data = json.load(f) if data.get("key_prefix") != _current_key_prefix(): # .env 에서 app_key 가 바뀌었을 가능성 → 캐시 폐기 return None tok = _Token(value=str(data["value"]), expires_at=float(data["expires_at"])) if tok.expires_at <= time.time(): return None return tok except FileNotFoundError: return None except (OSError, ValueError, KeyError, TypeError) as exc: logger.warning("kis token disk-cache read failed (%s): %s", _TOKEN_CACHE_PATH, exc) return None def _save_disk_cache(tok: _Token) -> None: try: _TOKEN_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True) tmp = _TOKEN_CACHE_PATH.with_suffix(".json.tmp") # atomic write: 부분 쓰기 중 컨테이너가 죽어도 다음 시작 시 깨진 파일 안 읽음 with tmp.open("w") as f: json.dump( { "value": tok.value, "expires_at": tok.expires_at, "key_prefix": _current_key_prefix(), }, f, ) os.replace(tmp, _TOKEN_CACHE_PATH) # 토큰 파일은 키 동등의 secret. 0600 권한. try: os.chmod(_TOKEN_CACHE_PATH, 0o600) except OSError: pass except OSError as exc: # 캐시 쓰기 실패는 치명적이지 않음 — 메모리 캐시로만 동작 가능. 경고만. logger.warning("kis token disk-cache write failed (%s): %s", _TOKEN_CACHE_PATH, exc) @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=8), retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)), reraise=True, ) def _issue_token() -> _Token: if not _has_keys(): raise SkippedMissingKey("kis app_key/secret missing") url = f"{KIS_BASE}/oauth2/tokenP" body = { "grant_type": "client_credentials", "appkey": settings.kis_app_key, "appsecret": settings.kis_app_secret, } with httpx.Client(timeout=10.0, headers={"User-Agent": USER_AGENT}) as cli: resp = cli.post(url, json=body) resp.raise_for_status() data = resp.json() access = data["access_token"] # KIS 는 expires_in (sec) 와 access_token_token_expired (epoch-like str) 둘 다 줌 expires_in = int(data.get("expires_in", 3600)) return _Token(value=access, expires_at=time.time() + expires_in - 60) def get_token() -> str: """캐시된 토큰 반환. 메모리 → 디스크 → 신규 발급 순. 키 없으면 SkippedMissingKey. 디스크 캐시는 컨테이너 재기동 시 토큰 재발급 1분 제한 (EGW00133) 회피용. """ global _token_cache with _token_lock: if _token_cache and _token_cache.expires_at > time.time(): return _token_cache.value disk = _load_disk_cache() if disk is not None: _token_cache = disk logger.info( "kis token loaded from disk, expires_at=%s", datetime.fromtimestamp(disk.expires_at), ) return disk.value _token_cache = _issue_token() _save_disk_cache(_token_cache) logger.info( "kis token issued (and cached to %s), expires_at=%s", _TOKEN_CACHE_PATH, datetime.fromtimestamp(_token_cache.expires_at), ) return _token_cache.value def _headers(tr_id: str) -> dict[str, str]: return { "Content-Type": "application/json; charset=utf-8", "Accept": "application/json", "User-Agent": USER_AGENT, "authorization": f"Bearer {get_token()}", "appkey": settings.kis_app_key or "", "appsecret": settings.kis_app_secret or "", "tr_id": tr_id, "custtype": "P", } @retry( stop=stop_after_attempt(2), wait=wait_exponential(multiplier=1, min=1, max=4), retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)), reraise=True, ) def fetch_daily_price( code: str, start: date, end: date, *, period: str = "D", adjusted: bool = True, ) -> list[dict[str, Any]]: """일/주/월봉 시세 조회 (read-only). Returns: [{date, open, high, low, close, volume}, ...] """ if not _has_keys(): raise SkippedMissingKey("kis app_key/secret missing") url = f"{KIS_BASE}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice" params = { "FID_COND_MRKT_DIV_CODE": "J", # J=주식 "FID_INPUT_ISCD": code, "FID_INPUT_DATE_1": start.strftime("%Y%m%d"), "FID_INPUT_DATE_2": end.strftime("%Y%m%d"), "FID_PERIOD_DIV_CODE": period, # D/W/M "FID_ORG_ADJ_PRC": "0" if adjusted else "1", } with httpx.Client(timeout=15.0) as cli: resp = cli.get(url, headers=_headers("FHKST03010100"), params=params) resp.raise_for_status() data = resp.json() if data.get("rt_cd") != "0": raise RuntimeError(f"kis error: {data.get('msg1')} (rt_cd={data.get('rt_cd')})") out: list[dict[str, Any]] = [] for row in data.get("output2", []) or []: raw_date = row.get("stck_bsop_date") if not raw_date: continue try: day = datetime.strptime(raw_date, "%Y%m%d").date() except ValueError: continue out.append( { "date": day, "open": float(row.get("stck_oprc") or 0), "high": float(row.get("stck_hgpr") or 0), "low": float(row.get("stck_lwpr") or 0), "close": float(row.get("stck_clpr") or 0), "volume": int(row.get("acml_vol") or 0), } ) return out def ping() -> dict[str, Any]: """토큰 발급만 시도해서 키 유효성 확인.""" if not _has_keys(): return {"status": "skipped_missing_key"} try: tok = get_token() return {"status": "ok", "token_prefix": tok[:8] + "...", "len": len(tok)} except SkippedMissingKey: return {"status": "skipped_missing_key"} except Exception as exc: # noqa: BLE001 return {"status": "failed", "error": str(exc)}