reviewer 지적사항 반영: 1. KIS 분봉이 한 번에 30개만 와서 10분봉이 최대 3개만 나오던 문제 → fetch_minute_range() 추가. FID_INPUT_HOUR_1 을 30분씩 후퇴시키며 페이지네이션, 중복 ts 자연 dedupe, max_pages=20 으로 무한루프 방지. _ensure_intraday_fresh 는 last_ts+1m ~ now 빈 구간만 채우므로 평소엔 1~2 페이지로 끝남. 2. 장외/주말에 매번 KIS 를 때리던 문제 → - 주말: 'weekend' 반환, KIS 안 부름 (분봉 endpoint 는 당일만 지원) - 평일 장외 + 오늘 데이터 있음: 'cached_closed' 반환 - 장중 + 10분 이내: 'fresh' 반환 토큰/조회 제한 다시 밟지 않음. 3. 프론트 horizon 입력 60 vs 백엔드 30 불일치 → PredictionPanel 의 cap 을 30 으로 맞춤. 백엔드 predict.py 의 학습/검증 범위와 일치. placeholder 도 '1~30' 으로 명시.
371 lines
14 KiB
Python
371 lines
14 KiB
Python
"""한국투자증권 KIS OpenAPI - READ ONLY 시세 조회.
|
|
|
|
본 모듈은 주문/체결/잔고변경 endpoint를 일절 import 하지 않는다.
|
|
사용하는 endpoint:
|
|
- POST /oauth2/tokenP : 토큰 발급
|
|
- GET /uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice
|
|
: 일/주/월봉 시세 조회 (read-only)
|
|
- GET /uapi/domestic-stock/v1/quotations/inquire-price
|
|
: 현재가 조회 (read-only)
|
|
|
|
키 누락 시 모든 함수는 SkippedMissingKey 예외로 빠르게 실패하므로 호출 측에서 잡아서
|
|
status='skipped_missing_key' 처리.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass
|
|
from datetime import date, datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
|
|
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
KIS_BASE = "https://openapi.koreainvestment.com:9443"
|
|
USER_AGENT = "stock_chart_site/0.1 (+personal)"
|
|
|
|
# 토큰 디스크 캐시 경로. 기본값은 컨테이너 안 /app/.cache/kis_token.json — docker-compose
|
|
# 의 `./backend:/app` 바인드 마운트 덕에 호스트 `./backend/.cache/` 에 영속된다.
|
|
# `backend/.cache/` 는 .gitignore 에 들어있어 secrets 가 커밋되지 않는다.
|
|
#
|
|
# 왜 디스크 캐시가 필요한가:
|
|
# KIS 는 access_token 발급을 1분 1회, 하루 N회로 강하게 제한한다. 메모리만 쓰면
|
|
# `restart.bat` / `build.bat` / 컨테이너 재기동 때마다 새 발급 → 403 (EGW00133 등) 빈발.
|
|
# 토큰 자체는 24시간 유효하므로, 컨테이너 인스턴스가 바뀌어도 같은 토큰을 재사용한다.
|
|
_TOKEN_CACHE_PATH = Path(os.environ.get("KIS_TOKEN_CACHE_PATH", "/app/.cache/kis_token.json"))
|
|
|
|
|
|
class SkippedMissingKey(RuntimeError):
|
|
"""KIS 키 미설정 시 발생. 호출 측에서 skipped 로 매핑."""
|
|
|
|
|
|
@dataclass
|
|
class _Token:
|
|
value: str
|
|
expires_at: float # epoch seconds
|
|
|
|
|
|
_token_cache: _Token | None = None
|
|
_token_lock = threading.Lock()
|
|
|
|
|
|
def _has_keys() -> bool:
|
|
return bool(settings.kis_app_key and settings.kis_app_secret)
|
|
|
|
|
|
def _current_key_prefix() -> str:
|
|
# app_key 가 바뀌었는데 옛 키로 받은 토큰을 그대로 쓰면 401. 캐시 무효화 키로 사용.
|
|
return (settings.kis_app_key or "")[:8]
|
|
|
|
|
|
def _load_disk_cache() -> _Token | None:
|
|
try:
|
|
with _TOKEN_CACHE_PATH.open() as f:
|
|
data = json.load(f)
|
|
if data.get("key_prefix") != _current_key_prefix():
|
|
# .env 에서 app_key 가 바뀌었을 가능성 → 캐시 폐기
|
|
return None
|
|
tok = _Token(value=str(data["value"]), expires_at=float(data["expires_at"]))
|
|
if tok.expires_at <= time.time():
|
|
return None
|
|
return tok
|
|
except FileNotFoundError:
|
|
return None
|
|
except (OSError, ValueError, KeyError, TypeError) as exc:
|
|
logger.warning("kis token disk-cache read failed (%s): %s", _TOKEN_CACHE_PATH, exc)
|
|
return None
|
|
|
|
|
|
def _save_disk_cache(tok: _Token) -> None:
|
|
try:
|
|
_TOKEN_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp = _TOKEN_CACHE_PATH.with_suffix(".json.tmp")
|
|
# atomic write: 부분 쓰기 중 컨테이너가 죽어도 다음 시작 시 깨진 파일 안 읽음
|
|
with tmp.open("w") as f:
|
|
json.dump(
|
|
{
|
|
"value": tok.value,
|
|
"expires_at": tok.expires_at,
|
|
"key_prefix": _current_key_prefix(),
|
|
},
|
|
f,
|
|
)
|
|
os.replace(tmp, _TOKEN_CACHE_PATH)
|
|
# 토큰 파일은 키 동등의 secret. 0600 권한.
|
|
try:
|
|
os.chmod(_TOKEN_CACHE_PATH, 0o600)
|
|
except OSError:
|
|
pass
|
|
except OSError as exc:
|
|
# 캐시 쓰기 실패는 치명적이지 않음 — 메모리 캐시로만 동작 가능. 경고만.
|
|
logger.warning("kis token disk-cache write failed (%s): %s", _TOKEN_CACHE_PATH, exc)
|
|
|
|
|
|
@retry(
|
|
stop=stop_after_attempt(3),
|
|
wait=wait_exponential(multiplier=1, min=1, max=8),
|
|
retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
|
|
reraise=True,
|
|
)
|
|
def _issue_token() -> _Token:
|
|
if not _has_keys():
|
|
raise SkippedMissingKey("kis app_key/secret missing")
|
|
url = f"{KIS_BASE}/oauth2/tokenP"
|
|
body = {
|
|
"grant_type": "client_credentials",
|
|
"appkey": settings.kis_app_key,
|
|
"appsecret": settings.kis_app_secret,
|
|
}
|
|
with httpx.Client(timeout=10.0, headers={"User-Agent": USER_AGENT}) as cli:
|
|
resp = cli.post(url, json=body)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
access = data["access_token"]
|
|
# KIS 는 expires_in (sec) 와 access_token_token_expired (epoch-like str) 둘 다 줌
|
|
expires_in = int(data.get("expires_in", 3600))
|
|
return _Token(value=access, expires_at=time.time() + expires_in - 60)
|
|
|
|
|
|
def get_token() -> str:
|
|
"""캐시된 토큰 반환. 메모리 → 디스크 → 신규 발급 순. 키 없으면 SkippedMissingKey.
|
|
|
|
디스크 캐시는 컨테이너 재기동 시 토큰 재발급 1분 제한 (EGW00133) 회피용.
|
|
"""
|
|
global _token_cache
|
|
with _token_lock:
|
|
if _token_cache and _token_cache.expires_at > time.time():
|
|
return _token_cache.value
|
|
disk = _load_disk_cache()
|
|
if disk is not None:
|
|
_token_cache = disk
|
|
logger.info(
|
|
"kis token loaded from disk, expires_at=%s",
|
|
datetime.fromtimestamp(disk.expires_at),
|
|
)
|
|
return disk.value
|
|
_token_cache = _issue_token()
|
|
_save_disk_cache(_token_cache)
|
|
logger.info(
|
|
"kis token issued (and cached to %s), expires_at=%s",
|
|
_TOKEN_CACHE_PATH,
|
|
datetime.fromtimestamp(_token_cache.expires_at),
|
|
)
|
|
return _token_cache.value
|
|
|
|
|
|
def _headers(tr_id: str) -> dict[str, str]:
|
|
return {
|
|
"Content-Type": "application/json; charset=utf-8",
|
|
"Accept": "application/json",
|
|
"User-Agent": USER_AGENT,
|
|
"authorization": f"Bearer {get_token()}",
|
|
"appkey": settings.kis_app_key or "",
|
|
"appsecret": settings.kis_app_secret or "",
|
|
"tr_id": tr_id,
|
|
"custtype": "P",
|
|
}
|
|
|
|
|
|
@retry(
|
|
stop=stop_after_attempt(2),
|
|
wait=wait_exponential(multiplier=1, min=1, max=4),
|
|
retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
|
|
reraise=True,
|
|
)
|
|
def fetch_daily_price(
|
|
code: str,
|
|
start: date,
|
|
end: date,
|
|
*,
|
|
period: str = "D",
|
|
adjusted: bool = True,
|
|
) -> list[dict[str, Any]]:
|
|
"""일/주/월봉 시세 조회 (read-only).
|
|
|
|
Returns: [{date, open, high, low, close, volume}, ...]
|
|
"""
|
|
if not _has_keys():
|
|
raise SkippedMissingKey("kis app_key/secret missing")
|
|
url = f"{KIS_BASE}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
|
|
params = {
|
|
"FID_COND_MRKT_DIV_CODE": "J", # J=주식
|
|
"FID_INPUT_ISCD": code,
|
|
"FID_INPUT_DATE_1": start.strftime("%Y%m%d"),
|
|
"FID_INPUT_DATE_2": end.strftime("%Y%m%d"),
|
|
"FID_PERIOD_DIV_CODE": period, # D/W/M
|
|
"FID_ORG_ADJ_PRC": "0" if adjusted else "1",
|
|
}
|
|
with httpx.Client(timeout=15.0) as cli:
|
|
resp = cli.get(url, headers=_headers("FHKST03010100"), params=params)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if data.get("rt_cd") != "0":
|
|
raise RuntimeError(f"kis error: {data.get('msg1')} (rt_cd={data.get('rt_cd')})")
|
|
|
|
out: list[dict[str, Any]] = []
|
|
for row in data.get("output2", []) or []:
|
|
raw_date = row.get("stck_bsop_date")
|
|
if not raw_date:
|
|
continue
|
|
try:
|
|
day = datetime.strptime(raw_date, "%Y%m%d").date()
|
|
except ValueError:
|
|
continue
|
|
out.append(
|
|
{
|
|
"date": day,
|
|
"open": float(row.get("stck_oprc") or 0),
|
|
"high": float(row.get("stck_hgpr") or 0),
|
|
"low": float(row.get("stck_lwpr") or 0),
|
|
"close": float(row.get("stck_clpr") or 0),
|
|
"volume": int(row.get("acml_vol") or 0),
|
|
}
|
|
)
|
|
return out
|
|
|
|
|
|
@retry(
|
|
stop=stop_after_attempt(2),
|
|
wait=wait_exponential(multiplier=1, min=1, max=4),
|
|
retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
|
|
reraise=True,
|
|
)
|
|
def fetch_minute_price(
|
|
code: str,
|
|
*,
|
|
end_hour: str | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""당일 1분봉 시세 조회 (read-only). 최신 30개 캔들을 반환.
|
|
|
|
KIS 분봉 endpoint (`inquire-time-itemchartprice`) 는 base 시각 (FID_INPUT_HOUR_1) 부터
|
|
역순으로 최대 30개의 1분봉을 돌려준다. base 를 비우면 KIS 가 가장 최근 시각으로 해석.
|
|
즉 장중 호출 → 직전 30분 / 장 종료 후 호출 → 15:00~15:30 의 30분.
|
|
|
|
Returns: [{ts: datetime(KST aware), open, high, low, close, volume}, ...]
|
|
ts 오름차순 정렬.
|
|
|
|
Note: 이 endpoint 는 "당일" 분봉만 지원. 어제 이전 분봉은 별도 endpoint 가 필요한데,
|
|
이 사이트의 사용 패턴 (장중 라이브 차트) 에는 당일 데이터로 충분하다.
|
|
"""
|
|
if not _has_keys():
|
|
raise SkippedMissingKey("kis app_key/secret missing")
|
|
url = f"{KIS_BASE}/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
|
|
params = {
|
|
"FID_ETC_CLS_CODE": "",
|
|
"FID_COND_MRKT_DIV_CODE": "J",
|
|
"FID_INPUT_ISCD": code,
|
|
# 비우면 KIS 가 "지금" 으로 해석. 장 마감 후엔 15:30:00 부근 데이터.
|
|
"FID_INPUT_HOUR_1": end_hour or "",
|
|
"FID_PW_DATA_INCU_YN": "Y", # 과거 데이터 포함 (장 시작 직후 빈 데이터 방지)
|
|
}
|
|
with httpx.Client(timeout=15.0) as cli:
|
|
resp = cli.get(url, headers=_headers("FHKST03010200"), params=params)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if data.get("rt_cd") != "0":
|
|
raise RuntimeError(f"kis error: {data.get('msg1')} (rt_cd={data.get('rt_cd')})")
|
|
|
|
# KIS 응답은 KST. tz-aware 로 변환해서 DB (TIMESTAMPTZ) 에 안전 적재.
|
|
from datetime import timedelta, timezone as _tz
|
|
KST = _tz(timedelta(hours=9))
|
|
|
|
out: list[dict[str, Any]] = []
|
|
for row in data.get("output2", []) or []:
|
|
raw_date = row.get("stck_bsop_date")
|
|
raw_hour = row.get("stck_cntg_hour")
|
|
if not raw_date or not raw_hour:
|
|
continue
|
|
try:
|
|
ts = datetime.strptime(raw_date + raw_hour.zfill(6), "%Y%m%d%H%M%S").replace(tzinfo=KST)
|
|
except ValueError:
|
|
continue
|
|
out.append(
|
|
{
|
|
"ts": ts,
|
|
"open": float(row.get("stck_oprc") or 0),
|
|
"high": float(row.get("stck_hgpr") or 0),
|
|
"low": float(row.get("stck_lwpr") or 0),
|
|
# 분봉에서는 종가가 stck_prpr (현재가) 로 옴
|
|
"close": float(row.get("stck_prpr") or row.get("stck_clpr") or 0),
|
|
"volume": int(row.get("cntg_vol") or 0),
|
|
}
|
|
)
|
|
# KIS 응답은 보통 최신→과거 역순. UI/DB 적재 편의를 위해 오름차순으로 뒤집는다.
|
|
out.sort(key=lambda r: r["ts"])
|
|
return out
|
|
|
|
|
|
def fetch_minute_range(
|
|
code: str,
|
|
from_ts: datetime,
|
|
to_ts: datetime,
|
|
*,
|
|
max_pages: int = 20,
|
|
) -> list[dict[str, Any]]:
|
|
"""[from_ts, to_ts] 윈도우의 1분봉 전체. KIS 30-bar 페이지를 역순으로 반복 호출.
|
|
|
|
KIS `inquire-time-itemchartprice` 는 한 번에 최대 30개 1분봉만 주고,
|
|
`FID_INPUT_HOUR_1` 기준 그 시각 포함 이전 30분을 반환한다. 그래서 to_ts 부터
|
|
시작해서 가장 이른 응답 시각의 -1분을 다음 cursor 로 잡아 from_ts 까지 후퇴.
|
|
|
|
중복 키 (ts) 는 dict 로 자연 dedupe. 더 이상 새 행이 안 들어오거나 max_pages 도달
|
|
하면 종료 (rate-limit/무한루프 방지).
|
|
|
|
Note: 이 endpoint 는 "당일" 만 지원. from_ts/to_ts 는 같은 날짜여야 한다.
|
|
"""
|
|
if not _has_keys():
|
|
raise SkippedMissingKey("kis app_key/secret missing")
|
|
if from_ts >= to_ts:
|
|
return []
|
|
|
|
from datetime import timedelta as _td
|
|
|
|
accumulated: dict[datetime, dict[str, Any]] = {}
|
|
cursor = to_ts
|
|
pages = 0
|
|
while cursor > from_ts and pages < max_pages:
|
|
pages += 1
|
|
rows = fetch_minute_price(code, end_hour=cursor.strftime("%H%M%S"))
|
|
if not rows:
|
|
break
|
|
added = 0
|
|
for r in rows:
|
|
ts = r["ts"]
|
|
if ts < from_ts or ts > to_ts:
|
|
continue
|
|
if ts not in accumulated:
|
|
accumulated[ts] = r
|
|
added += 1
|
|
if added == 0:
|
|
# 같은 30개를 또 받았다 — 더 과거가 없거나 KIS 가 똑같은 페이지를 반환.
|
|
break
|
|
earliest_ts = min(r["ts"] for r in rows)
|
|
next_cursor = earliest_ts - _td(minutes=1)
|
|
if next_cursor >= cursor:
|
|
break
|
|
cursor = next_cursor
|
|
|
|
return sorted(accumulated.values(), key=lambda r: r["ts"])
|
|
|
|
|
|
def ping() -> dict[str, Any]:
|
|
"""토큰 발급만 시도해서 키 유효성 확인."""
|
|
if not _has_keys():
|
|
return {"status": "skipped_missing_key"}
|
|
try:
|
|
tok = get_token()
|
|
return {"status": "ok", "token_prefix": tok[:8] + "...", "len": len(tok)}
|
|
except SkippedMissingKey:
|
|
return {"status": "skipped_missing_key"}
|
|
except Exception as exc: # noqa: BLE001
|
|
return {"status": "failed", "error": str(exc)}
|