Files
stock_chart_site/backend/app/fetch/kis.py
claude-owner 0a5c634680 feat(chart): 10m 실시간 / 일·주·월 토글 / 오늘 마커 / 예측 거래일 선택
- backend/app/api/chart.py: interval=10m|1d|1w|1mo. 10m 은 ohlcv_1m 을
  time_bucket(10min) 으로 집계, stale(>10분) 이면 KIS 분봉 fetch 후 재조회.
  1w/1mo 는 ohlcv_daily 를 date_trunc 로 집계. today 필드 추가.
- backend/app/fetch/kis.py: fetch_minute_price() 추가 (tr_id FHKST03010200).
  KIS 응답 KST 시각을 tz-aware datetime 으로 변환, 오름차순 정렬.
- web/lib/api.ts: ChartInterval 타입, getChart(interval), predict(horizons[]).
- web/components/StockChart.tsx: 10m 이면 timeVisible. 일·주·월에서 오늘
  화살표 마커 표시. ISO datetime 도 파싱.
- web/components/PredictionPanel.tsx: 단기/중기/장기 프리셋 + 사용자 직접
  지정 (예: 1,2,3,7). API 에 horizons 배열 전달.
- web/app/[code]/page.tsx: interval 칩 (10분/일/주/월). 10m 일 때 60초마다
  폴링. interval 별 기본 lookback (10m=1, 1d=180, 1w=730, 1mo=1825).
2026-05-23 01:34:29 +09:00

318 lines
12 KiB
Python

"""한국투자증권 KIS OpenAPI - READ ONLY 시세 조회.
본 모듈은 주문/체결/잔고변경 endpoint를 일절 import 하지 않는다.
사용하는 endpoint:
- POST /oauth2/tokenP : 토큰 발급
- GET /uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice
: 일/주/월봉 시세 조회 (read-only)
- GET /uapi/domestic-stock/v1/quotations/inquire-price
: 현재가 조회 (read-only)
키 누락 시 모든 함수는 SkippedMissingKey 예외로 빠르게 실패하므로 호출 측에서 잡아서
status='skipped_missing_key' 처리.
"""
from __future__ import annotations
import json
import logging
import os
import threading
import time
from dataclasses import dataclass
from datetime import date, datetime
from pathlib import Path
from typing import Any
import httpx
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from app.config import settings
logger = logging.getLogger(__name__)
KIS_BASE = "https://openapi.koreainvestment.com:9443"
USER_AGENT = "stock_chart_site/0.1 (+personal)"
# 토큰 디스크 캐시 경로. 기본값은 컨테이너 안 /app/.cache/kis_token.json — docker-compose
# 의 `./backend:/app` 바인드 마운트 덕에 호스트 `./backend/.cache/` 에 영속된다.
# `backend/.cache/` 는 .gitignore 에 들어있어 secrets 가 커밋되지 않는다.
#
# 왜 디스크 캐시가 필요한가:
# KIS 는 access_token 발급을 1분 1회, 하루 N회로 강하게 제한한다. 메모리만 쓰면
# `restart.bat` / `build.bat` / 컨테이너 재기동 때마다 새 발급 → 403 (EGW00133 등) 빈발.
# 토큰 자체는 24시간 유효하므로, 컨테이너 인스턴스가 바뀌어도 같은 토큰을 재사용한다.
_TOKEN_CACHE_PATH = Path(os.environ.get("KIS_TOKEN_CACHE_PATH", "/app/.cache/kis_token.json"))
class SkippedMissingKey(RuntimeError):
"""KIS 키 미설정 시 발생. 호출 측에서 skipped 로 매핑."""
@dataclass
class _Token:
value: str
expires_at: float # epoch seconds
_token_cache: _Token | None = None
_token_lock = threading.Lock()
def _has_keys() -> bool:
return bool(settings.kis_app_key and settings.kis_app_secret)
def _current_key_prefix() -> str:
# app_key 가 바뀌었는데 옛 키로 받은 토큰을 그대로 쓰면 401. 캐시 무효화 키로 사용.
return (settings.kis_app_key or "")[:8]
def _load_disk_cache() -> _Token | None:
try:
with _TOKEN_CACHE_PATH.open() as f:
data = json.load(f)
if data.get("key_prefix") != _current_key_prefix():
# .env 에서 app_key 가 바뀌었을 가능성 → 캐시 폐기
return None
tok = _Token(value=str(data["value"]), expires_at=float(data["expires_at"]))
if tok.expires_at <= time.time():
return None
return tok
except FileNotFoundError:
return None
except (OSError, ValueError, KeyError, TypeError) as exc:
logger.warning("kis token disk-cache read failed (%s): %s", _TOKEN_CACHE_PATH, exc)
return None
def _save_disk_cache(tok: _Token) -> None:
try:
_TOKEN_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
tmp = _TOKEN_CACHE_PATH.with_suffix(".json.tmp")
# atomic write: 부분 쓰기 중 컨테이너가 죽어도 다음 시작 시 깨진 파일 안 읽음
with tmp.open("w") as f:
json.dump(
{
"value": tok.value,
"expires_at": tok.expires_at,
"key_prefix": _current_key_prefix(),
},
f,
)
os.replace(tmp, _TOKEN_CACHE_PATH)
# 토큰 파일은 키 동등의 secret. 0600 권한.
try:
os.chmod(_TOKEN_CACHE_PATH, 0o600)
except OSError:
pass
except OSError as exc:
# 캐시 쓰기 실패는 치명적이지 않음 — 메모리 캐시로만 동작 가능. 경고만.
logger.warning("kis token disk-cache write failed (%s): %s", _TOKEN_CACHE_PATH, exc)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=8),
retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
reraise=True,
)
def _issue_token() -> _Token:
if not _has_keys():
raise SkippedMissingKey("kis app_key/secret missing")
url = f"{KIS_BASE}/oauth2/tokenP"
body = {
"grant_type": "client_credentials",
"appkey": settings.kis_app_key,
"appsecret": settings.kis_app_secret,
}
with httpx.Client(timeout=10.0, headers={"User-Agent": USER_AGENT}) as cli:
resp = cli.post(url, json=body)
resp.raise_for_status()
data = resp.json()
access = data["access_token"]
# KIS 는 expires_in (sec) 와 access_token_token_expired (epoch-like str) 둘 다 줌
expires_in = int(data.get("expires_in", 3600))
return _Token(value=access, expires_at=time.time() + expires_in - 60)
def get_token() -> str:
"""캐시된 토큰 반환. 메모리 → 디스크 → 신규 발급 순. 키 없으면 SkippedMissingKey.
디스크 캐시는 컨테이너 재기동 시 토큰 재발급 1분 제한 (EGW00133) 회피용.
"""
global _token_cache
with _token_lock:
if _token_cache and _token_cache.expires_at > time.time():
return _token_cache.value
disk = _load_disk_cache()
if disk is not None:
_token_cache = disk
logger.info(
"kis token loaded from disk, expires_at=%s",
datetime.fromtimestamp(disk.expires_at),
)
return disk.value
_token_cache = _issue_token()
_save_disk_cache(_token_cache)
logger.info(
"kis token issued (and cached to %s), expires_at=%s",
_TOKEN_CACHE_PATH,
datetime.fromtimestamp(_token_cache.expires_at),
)
return _token_cache.value
def _headers(tr_id: str) -> dict[str, str]:
return {
"Content-Type": "application/json; charset=utf-8",
"Accept": "application/json",
"User-Agent": USER_AGENT,
"authorization": f"Bearer {get_token()}",
"appkey": settings.kis_app_key or "",
"appsecret": settings.kis_app_secret or "",
"tr_id": tr_id,
"custtype": "P",
}
@retry(
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=1, min=1, max=4),
retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
reraise=True,
)
def fetch_daily_price(
code: str,
start: date,
end: date,
*,
period: str = "D",
adjusted: bool = True,
) -> list[dict[str, Any]]:
"""일/주/월봉 시세 조회 (read-only).
Returns: [{date, open, high, low, close, volume}, ...]
"""
if not _has_keys():
raise SkippedMissingKey("kis app_key/secret missing")
url = f"{KIS_BASE}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
params = {
"FID_COND_MRKT_DIV_CODE": "J", # J=주식
"FID_INPUT_ISCD": code,
"FID_INPUT_DATE_1": start.strftime("%Y%m%d"),
"FID_INPUT_DATE_2": end.strftime("%Y%m%d"),
"FID_PERIOD_DIV_CODE": period, # D/W/M
"FID_ORG_ADJ_PRC": "0" if adjusted else "1",
}
with httpx.Client(timeout=15.0) as cli:
resp = cli.get(url, headers=_headers("FHKST03010100"), params=params)
resp.raise_for_status()
data = resp.json()
if data.get("rt_cd") != "0":
raise RuntimeError(f"kis error: {data.get('msg1')} (rt_cd={data.get('rt_cd')})")
out: list[dict[str, Any]] = []
for row in data.get("output2", []) or []:
raw_date = row.get("stck_bsop_date")
if not raw_date:
continue
try:
day = datetime.strptime(raw_date, "%Y%m%d").date()
except ValueError:
continue
out.append(
{
"date": day,
"open": float(row.get("stck_oprc") or 0),
"high": float(row.get("stck_hgpr") or 0),
"low": float(row.get("stck_lwpr") or 0),
"close": float(row.get("stck_clpr") or 0),
"volume": int(row.get("acml_vol") or 0),
}
)
return out
@retry(
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=1, min=1, max=4),
retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
reraise=True,
)
def fetch_minute_price(
code: str,
*,
end_hour: str | None = None,
) -> list[dict[str, Any]]:
"""당일 1분봉 시세 조회 (read-only). 최신 30개 캔들을 반환.
KIS 분봉 endpoint (`inquire-time-itemchartprice`) 는 base 시각 (FID_INPUT_HOUR_1) 부터
역순으로 최대 30개의 1분봉을 돌려준다. base 를 비우면 KIS 가 가장 최근 시각으로 해석.
즉 장중 호출 → 직전 30분 / 장 종료 후 호출 → 15:00~15:30 의 30분.
Returns: [{ts: datetime(KST aware), open, high, low, close, volume}, ...]
ts 오름차순 정렬.
Note: 이 endpoint 는 "당일" 분봉만 지원. 어제 이전 분봉은 별도 endpoint 가 필요한데,
이 사이트의 사용 패턴 (장중 라이브 차트) 에는 당일 데이터로 충분하다.
"""
if not _has_keys():
raise SkippedMissingKey("kis app_key/secret missing")
url = f"{KIS_BASE}/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
params = {
"FID_ETC_CLS_CODE": "",
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": code,
# 비우면 KIS 가 "지금" 으로 해석. 장 마감 후엔 15:30:00 부근 데이터.
"FID_INPUT_HOUR_1": end_hour or "",
"FID_PW_DATA_INCU_YN": "Y", # 과거 데이터 포함 (장 시작 직후 빈 데이터 방지)
}
with httpx.Client(timeout=15.0) as cli:
resp = cli.get(url, headers=_headers("FHKST03010200"), params=params)
resp.raise_for_status()
data = resp.json()
if data.get("rt_cd") != "0":
raise RuntimeError(f"kis error: {data.get('msg1')} (rt_cd={data.get('rt_cd')})")
# KIS 응답은 KST. tz-aware 로 변환해서 DB (TIMESTAMPTZ) 에 안전 적재.
from datetime import timedelta, timezone as _tz
KST = _tz(timedelta(hours=9))
out: list[dict[str, Any]] = []
for row in data.get("output2", []) or []:
raw_date = row.get("stck_bsop_date")
raw_hour = row.get("stck_cntg_hour")
if not raw_date or not raw_hour:
continue
try:
ts = datetime.strptime(raw_date + raw_hour.zfill(6), "%Y%m%d%H%M%S").replace(tzinfo=KST)
except ValueError:
continue
out.append(
{
"ts": ts,
"open": float(row.get("stck_oprc") or 0),
"high": float(row.get("stck_hgpr") or 0),
"low": float(row.get("stck_lwpr") or 0),
# 분봉에서는 종가가 stck_prpr (현재가) 로 옴
"close": float(row.get("stck_prpr") or row.get("stck_clpr") or 0),
"volume": int(row.get("cntg_vol") or 0),
}
)
# KIS 응답은 보통 최신→과거 역순. UI/DB 적재 편의를 위해 오름차순으로 뒤집는다.
out.sort(key=lambda r: r["ts"])
return out
def ping() -> dict[str, Any]:
"""토큰 발급만 시도해서 키 유효성 확인."""
if not _has_keys():
return {"status": "skipped_missing_key"}
try:
tok = get_token()
return {"status": "ok", "token_prefix": tok[:8] + "...", "len": len(tok)}
except SkippedMissingKey:
return {"status": "skipped_missing_key"}
except Exception as exc: # noqa: BLE001
return {"status": "failed", "error": str(exc)}