Files
stock_chart_site/backend/app/fetch/kis.py
claude-owner e610599879 fix(chart): 10분봉 페이지네이션, 장외 캐시, 예측 horizon 캡 일치
reviewer 지적사항 반영:

1. KIS 분봉이 한 번에 30개만 와서 10분봉이 최대 3개만 나오던 문제 →
   fetch_minute_range() 추가. FID_INPUT_HOUR_1 을 30분씩 후퇴시키며
   페이지네이션, 중복 ts 자연 dedupe, max_pages=20 으로 무한루프 방지.
   _ensure_intraday_fresh 는 last_ts+1m ~ now 빈 구간만 채우므로 평소엔
   1~2 페이지로 끝남.

2. 장외/주말에 매번 KIS 를 때리던 문제 →
   - 주말: 'weekend' 반환, KIS 안 부름 (분봉 endpoint 는 당일만 지원)
   - 평일 장외 + 오늘 데이터 있음: 'cached_closed' 반환
   - 장중 + 10분 이내: 'fresh' 반환
   토큰/조회 제한 다시 밟지 않음.

3. 프론트 horizon 입력 60 vs 백엔드 30 불일치 →
   PredictionPanel 의 cap 을 30 으로 맞춤. 백엔드 predict.py 의 학습/검증
   범위와 일치. placeholder 도 '1~30' 으로 명시.
2026-05-23 01:41:43 +09:00

371 lines
14 KiB
Python

"""한국투자증권 KIS OpenAPI - READ ONLY 시세 조회.
본 모듈은 주문/체결/잔고변경 endpoint를 일절 import 하지 않는다.
사용하는 endpoint:
- POST /oauth2/tokenP : 토큰 발급
- GET /uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice
: 일/주/월봉 시세 조회 (read-only)
- GET /uapi/domestic-stock/v1/quotations/inquire-price
: 현재가 조회 (read-only)
키 누락 시 모든 함수는 SkippedMissingKey 예외로 빠르게 실패하므로 호출 측에서 잡아서
status='skipped_missing_key' 처리.
"""
from __future__ import annotations
import json
import logging
import os
import threading
import time
from dataclasses import dataclass
from datetime import date, datetime
from pathlib import Path
from typing import Any
import httpx
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from app.config import settings
logger = logging.getLogger(__name__)
KIS_BASE = "https://openapi.koreainvestment.com:9443"
USER_AGENT = "stock_chart_site/0.1 (+personal)"
# 토큰 디스크 캐시 경로. 기본값은 컨테이너 안 /app/.cache/kis_token.json — docker-compose
# 의 `./backend:/app` 바인드 마운트 덕에 호스트 `./backend/.cache/` 에 영속된다.
# `backend/.cache/` 는 .gitignore 에 들어있어 secrets 가 커밋되지 않는다.
#
# 왜 디스크 캐시가 필요한가:
# KIS 는 access_token 발급을 1분 1회, 하루 N회로 강하게 제한한다. 메모리만 쓰면
# `restart.bat` / `build.bat` / 컨테이너 재기동 때마다 새 발급 → 403 (EGW00133 등) 빈발.
# 토큰 자체는 24시간 유효하므로, 컨테이너 인스턴스가 바뀌어도 같은 토큰을 재사용한다.
_TOKEN_CACHE_PATH = Path(os.environ.get("KIS_TOKEN_CACHE_PATH", "/app/.cache/kis_token.json"))
class SkippedMissingKey(RuntimeError):
"""KIS 키 미설정 시 발생. 호출 측에서 skipped 로 매핑."""
@dataclass
class _Token:
value: str
expires_at: float # epoch seconds
_token_cache: _Token | None = None
_token_lock = threading.Lock()
def _has_keys() -> bool:
return bool(settings.kis_app_key and settings.kis_app_secret)
def _current_key_prefix() -> str:
# app_key 가 바뀌었는데 옛 키로 받은 토큰을 그대로 쓰면 401. 캐시 무효화 키로 사용.
return (settings.kis_app_key or "")[:8]
def _load_disk_cache() -> _Token | None:
try:
with _TOKEN_CACHE_PATH.open() as f:
data = json.load(f)
if data.get("key_prefix") != _current_key_prefix():
# .env 에서 app_key 가 바뀌었을 가능성 → 캐시 폐기
return None
tok = _Token(value=str(data["value"]), expires_at=float(data["expires_at"]))
if tok.expires_at <= time.time():
return None
return tok
except FileNotFoundError:
return None
except (OSError, ValueError, KeyError, TypeError) as exc:
logger.warning("kis token disk-cache read failed (%s): %s", _TOKEN_CACHE_PATH, exc)
return None
def _save_disk_cache(tok: _Token) -> None:
try:
_TOKEN_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
tmp = _TOKEN_CACHE_PATH.with_suffix(".json.tmp")
# atomic write: 부분 쓰기 중 컨테이너가 죽어도 다음 시작 시 깨진 파일 안 읽음
with tmp.open("w") as f:
json.dump(
{
"value": tok.value,
"expires_at": tok.expires_at,
"key_prefix": _current_key_prefix(),
},
f,
)
os.replace(tmp, _TOKEN_CACHE_PATH)
# 토큰 파일은 키 동등의 secret. 0600 권한.
try:
os.chmod(_TOKEN_CACHE_PATH, 0o600)
except OSError:
pass
except OSError as exc:
# 캐시 쓰기 실패는 치명적이지 않음 — 메모리 캐시로만 동작 가능. 경고만.
logger.warning("kis token disk-cache write failed (%s): %s", _TOKEN_CACHE_PATH, exc)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=8),
retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
reraise=True,
)
def _issue_token() -> _Token:
if not _has_keys():
raise SkippedMissingKey("kis app_key/secret missing")
url = f"{KIS_BASE}/oauth2/tokenP"
body = {
"grant_type": "client_credentials",
"appkey": settings.kis_app_key,
"appsecret": settings.kis_app_secret,
}
with httpx.Client(timeout=10.0, headers={"User-Agent": USER_AGENT}) as cli:
resp = cli.post(url, json=body)
resp.raise_for_status()
data = resp.json()
access = data["access_token"]
# KIS 는 expires_in (sec) 와 access_token_token_expired (epoch-like str) 둘 다 줌
expires_in = int(data.get("expires_in", 3600))
return _Token(value=access, expires_at=time.time() + expires_in - 60)
def get_token() -> str:
"""캐시된 토큰 반환. 메모리 → 디스크 → 신규 발급 순. 키 없으면 SkippedMissingKey.
디스크 캐시는 컨테이너 재기동 시 토큰 재발급 1분 제한 (EGW00133) 회피용.
"""
global _token_cache
with _token_lock:
if _token_cache and _token_cache.expires_at > time.time():
return _token_cache.value
disk = _load_disk_cache()
if disk is not None:
_token_cache = disk
logger.info(
"kis token loaded from disk, expires_at=%s",
datetime.fromtimestamp(disk.expires_at),
)
return disk.value
_token_cache = _issue_token()
_save_disk_cache(_token_cache)
logger.info(
"kis token issued (and cached to %s), expires_at=%s",
_TOKEN_CACHE_PATH,
datetime.fromtimestamp(_token_cache.expires_at),
)
return _token_cache.value
def _headers(tr_id: str) -> dict[str, str]:
return {
"Content-Type": "application/json; charset=utf-8",
"Accept": "application/json",
"User-Agent": USER_AGENT,
"authorization": f"Bearer {get_token()}",
"appkey": settings.kis_app_key or "",
"appsecret": settings.kis_app_secret or "",
"tr_id": tr_id,
"custtype": "P",
}
@retry(
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=1, min=1, max=4),
retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
reraise=True,
)
def fetch_daily_price(
code: str,
start: date,
end: date,
*,
period: str = "D",
adjusted: bool = True,
) -> list[dict[str, Any]]:
"""일/주/월봉 시세 조회 (read-only).
Returns: [{date, open, high, low, close, volume}, ...]
"""
if not _has_keys():
raise SkippedMissingKey("kis app_key/secret missing")
url = f"{KIS_BASE}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
params = {
"FID_COND_MRKT_DIV_CODE": "J", # J=주식
"FID_INPUT_ISCD": code,
"FID_INPUT_DATE_1": start.strftime("%Y%m%d"),
"FID_INPUT_DATE_2": end.strftime("%Y%m%d"),
"FID_PERIOD_DIV_CODE": period, # D/W/M
"FID_ORG_ADJ_PRC": "0" if adjusted else "1",
}
with httpx.Client(timeout=15.0) as cli:
resp = cli.get(url, headers=_headers("FHKST03010100"), params=params)
resp.raise_for_status()
data = resp.json()
if data.get("rt_cd") != "0":
raise RuntimeError(f"kis error: {data.get('msg1')} (rt_cd={data.get('rt_cd')})")
out: list[dict[str, Any]] = []
for row in data.get("output2", []) or []:
raw_date = row.get("stck_bsop_date")
if not raw_date:
continue
try:
day = datetime.strptime(raw_date, "%Y%m%d").date()
except ValueError:
continue
out.append(
{
"date": day,
"open": float(row.get("stck_oprc") or 0),
"high": float(row.get("stck_hgpr") or 0),
"low": float(row.get("stck_lwpr") or 0),
"close": float(row.get("stck_clpr") or 0),
"volume": int(row.get("acml_vol") or 0),
}
)
return out
@retry(
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=1, min=1, max=4),
retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
reraise=True,
)
def fetch_minute_price(
code: str,
*,
end_hour: str | None = None,
) -> list[dict[str, Any]]:
"""당일 1분봉 시세 조회 (read-only). 최신 30개 캔들을 반환.
KIS 분봉 endpoint (`inquire-time-itemchartprice`) 는 base 시각 (FID_INPUT_HOUR_1) 부터
역순으로 최대 30개의 1분봉을 돌려준다. base 를 비우면 KIS 가 가장 최근 시각으로 해석.
즉 장중 호출 → 직전 30분 / 장 종료 후 호출 → 15:00~15:30 의 30분.
Returns: [{ts: datetime(KST aware), open, high, low, close, volume}, ...]
ts 오름차순 정렬.
Note: 이 endpoint 는 "당일" 분봉만 지원. 어제 이전 분봉은 별도 endpoint 가 필요한데,
이 사이트의 사용 패턴 (장중 라이브 차트) 에는 당일 데이터로 충분하다.
"""
if not _has_keys():
raise SkippedMissingKey("kis app_key/secret missing")
url = f"{KIS_BASE}/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
params = {
"FID_ETC_CLS_CODE": "",
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": code,
# 비우면 KIS 가 "지금" 으로 해석. 장 마감 후엔 15:30:00 부근 데이터.
"FID_INPUT_HOUR_1": end_hour or "",
"FID_PW_DATA_INCU_YN": "Y", # 과거 데이터 포함 (장 시작 직후 빈 데이터 방지)
}
with httpx.Client(timeout=15.0) as cli:
resp = cli.get(url, headers=_headers("FHKST03010200"), params=params)
resp.raise_for_status()
data = resp.json()
if data.get("rt_cd") != "0":
raise RuntimeError(f"kis error: {data.get('msg1')} (rt_cd={data.get('rt_cd')})")
# KIS 응답은 KST. tz-aware 로 변환해서 DB (TIMESTAMPTZ) 에 안전 적재.
from datetime import timedelta, timezone as _tz
KST = _tz(timedelta(hours=9))
out: list[dict[str, Any]] = []
for row in data.get("output2", []) or []:
raw_date = row.get("stck_bsop_date")
raw_hour = row.get("stck_cntg_hour")
if not raw_date or not raw_hour:
continue
try:
ts = datetime.strptime(raw_date + raw_hour.zfill(6), "%Y%m%d%H%M%S").replace(tzinfo=KST)
except ValueError:
continue
out.append(
{
"ts": ts,
"open": float(row.get("stck_oprc") or 0),
"high": float(row.get("stck_hgpr") or 0),
"low": float(row.get("stck_lwpr") or 0),
# 분봉에서는 종가가 stck_prpr (현재가) 로 옴
"close": float(row.get("stck_prpr") or row.get("stck_clpr") or 0),
"volume": int(row.get("cntg_vol") or 0),
}
)
# KIS 응답은 보통 최신→과거 역순. UI/DB 적재 편의를 위해 오름차순으로 뒤집는다.
out.sort(key=lambda r: r["ts"])
return out
def fetch_minute_range(
code: str,
from_ts: datetime,
to_ts: datetime,
*,
max_pages: int = 20,
) -> list[dict[str, Any]]:
"""[from_ts, to_ts] 윈도우의 1분봉 전체. KIS 30-bar 페이지를 역순으로 반복 호출.
KIS `inquire-time-itemchartprice` 는 한 번에 최대 30개 1분봉만 주고,
`FID_INPUT_HOUR_1` 기준 그 시각 포함 이전 30분을 반환한다. 그래서 to_ts 부터
시작해서 가장 이른 응답 시각의 -1분을 다음 cursor 로 잡아 from_ts 까지 후퇴.
중복 키 (ts) 는 dict 로 자연 dedupe. 더 이상 새 행이 안 들어오거나 max_pages 도달
하면 종료 (rate-limit/무한루프 방지).
Note: 이 endpoint 는 "당일" 만 지원. from_ts/to_ts 는 같은 날짜여야 한다.
"""
if not _has_keys():
raise SkippedMissingKey("kis app_key/secret missing")
if from_ts >= to_ts:
return []
from datetime import timedelta as _td
accumulated: dict[datetime, dict[str, Any]] = {}
cursor = to_ts
pages = 0
while cursor > from_ts and pages < max_pages:
pages += 1
rows = fetch_minute_price(code, end_hour=cursor.strftime("%H%M%S"))
if not rows:
break
added = 0
for r in rows:
ts = r["ts"]
if ts < from_ts or ts > to_ts:
continue
if ts not in accumulated:
accumulated[ts] = r
added += 1
if added == 0:
# 같은 30개를 또 받았다 — 더 과거가 없거나 KIS 가 똑같은 페이지를 반환.
break
earliest_ts = min(r["ts"] for r in rows)
next_cursor = earliest_ts - _td(minutes=1)
if next_cursor >= cursor:
break
cursor = next_cursor
return sorted(accumulated.values(), key=lambda r: r["ts"])
def ping() -> dict[str, Any]:
"""토큰 발급만 시도해서 키 유효성 확인."""
if not _has_keys():
return {"status": "skipped_missing_key"}
try:
tok = get_token()
return {"status": "ok", "token_prefix": tok[:8] + "...", "len": len(tok)}
except SkippedMissingKey:
return {"status": "skipped_missing_key"}
except Exception as exc: # noqa: BLE001
return {"status": "failed", "error": str(exc)}