Files
stock_chart_site/backend/app/fetch/news.py
tkrmagid 56f73a1f12 feat(phase-1a): external data fetchers + refresh pipeline + scheduler
10종목 시드 + pykrx OHLCV / 외인·기관 거래대금, KIS read-only EOD, OpenDART
공시, 네이버 금융 뉴스 스크레이퍼, 구글 뉴스 RSS, yfinance 거시(KOSPI/KOSDAQ/
USDKRW/US10Y) fetcher 를 추가하고 refresh_one / daily_batch / backfill /
APScheduler(16:00 KST) 파이프라인으로 묶음.

- backend/app/seed: 10종목 시드 (대형/고변동/테마/플랫폼/방어)
- backend/app/fetch: pykrx, kis, dart, news, macro, symbols_seed
- backend/app/pipelines: refresh_one, daily_batch, backfill(CLI), scheduler
- backend/app/api/refresh.py: POST /api/refresh/{code}?lookback_days=N
- backend/app/main.py: lifespan 으로 스케줄러 기동/종료, /health/keys 추가
- README: .env 복사 안내 보강

스모크 테스트 (실제 키 사용) 결과:
  KIS token  : ok (token 346자 발급)
  KIS daily  : 005930 11rows
  DART list  : 005930 30일 10건
  Naver news : 005930 12건
  Google RSS : "삼성전자" 92건

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-20 15:43:18 +09:00

142 lines
4.9 KiB
Python

"""뉴스 수집: 네이버 금융 종목 페이지 + 구글 뉴스 RSS.
차단 위험 줄이려고 User-Agent + timeout + retry. URL unique 제약으로 dedupe.
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from typing import Any
import feedparser
import httpx
from bs4 import BeautifulSoup
from sqlalchemy import text
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from app.db.connection import get_engine
logger = logging.getLogger(__name__)
USER_AGENT = "Mozilla/5.0 (compatible; stock_chart_site/0.1; +personal)"
KST = timezone(timedelta(hours=9))
@dataclass
class NewsItem:
code: str | None
source: str
title: str
url: str
published_at: datetime
body: str | None = None
@retry(
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=1, min=1, max=4),
retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
reraise=True,
)
def fetch_naver_finance_news(code: str, *, max_pages: int = 1) -> list[NewsItem]:
"""네이버 금융 종목 뉴스. https://finance.naver.com/item/news_news.naver?code=005930"""
out: list[NewsItem] = []
for page in range(1, max_pages + 1):
url = (
f"https://finance.naver.com/item/news_news.naver"
f"?code={code}&page={page}&sm=title_entity_id.basic&clusterId="
)
with httpx.Client(timeout=10.0, headers={"User-Agent": USER_AGENT, "Referer": "https://finance.naver.com/"}) as cli:
resp = cli.get(url)
resp.raise_for_status()
html = resp.text
soup = BeautifulSoup(html, "lxml")
for tr in soup.select("table.type5 tr"):
a = tr.select_one("a.tit") or tr.select_one("td.title a")
if not a:
continue
link = a.get("href") or ""
if not link:
continue
full_url = link if link.startswith("http") else f"https://finance.naver.com{link}"
title = a.get_text(strip=True)
time_td = tr.select_one("td.date")
time_text = time_td.get_text(strip=True) if time_td else ""
published_at = _parse_naver_time(time_text)
if not title or not published_at:
continue
out.append(NewsItem(code=code, source="naver_finance", title=title, url=full_url, published_at=published_at))
return out
def _parse_naver_time(s: str) -> datetime | None:
s = s.strip()
# '2026.05.20 13:24' 형태
m = re.match(r"(\d{4})[.-](\d{2})[.-](\d{2})\s+(\d{2}):(\d{2})", s)
if m:
y, mo, d, h, mi = (int(x) for x in m.groups())
return datetime(y, mo, d, h, mi, tzinfo=KST)
return None
@retry(
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=1, min=1, max=4),
retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
reraise=True,
)
def fetch_google_news_rss(query: str, *, code: str | None = None, hl: str = "ko", gl: str = "KR") -> list[NewsItem]:
"""Google News RSS 검색. 종목명(또는 코드)로 쿼리."""
url = f"https://news.google.com/rss/search?q={query}&hl={hl}&gl={gl}&ceid={gl}:{hl}"
with httpx.Client(timeout=10.0, headers={"User-Agent": USER_AGENT}) as cli:
resp = cli.get(url)
resp.raise_for_status()
body = resp.text
feed = feedparser.parse(body)
out: list[NewsItem] = []
for entry in feed.entries:
title = (entry.get("title") or "").strip()
link = entry.get("link") or ""
if not title or not link:
continue
published = entry.get("published_parsed")
if not published:
continue
pub_dt = datetime(*published[:6], tzinfo=timezone.utc)
out.append(NewsItem(code=code, source="google_rss", title=title, url=link, published_at=pub_dt))
return out
def upsert_news(items: list[NewsItem]) -> tuple[int, int]:
"""news 테이블에 upsert. (inserted, skipped) 반환."""
if not items:
return 0, 0
engine = get_engine()
inserted = skipped = 0
with engine.begin() as conn:
for item in items:
res = conn.execute(
text(
"""
INSERT INTO news (code, source, published_at, title, url, body)
VALUES (:code, :source, :published_at, :title, :url, :body)
ON CONFLICT (url) DO NOTHING
RETURNING id
"""
),
{
"code": item.code,
"source": item.source,
"published_at": item.published_at,
"title": item.title,
"url": item.url,
"body": item.body,
},
)
if res.first():
inserted += 1
else:
skipped += 1
return inserted, skipped