feat(phase-2): KR-FinBERT 감성 스코어링 + 일별 집계 뷰
- backend/app/nlp/finbert.py: snunlp/KR-FinBert-SC 어댑터. - score = P(pos) - P(neg) ∈ [-1, +1], label = argmax (neg/neu/pos) - 768d mean-pooled last hidden state → news.embedding (VECTOR) 저장 - settings.huggingface_token 인증, lazy singleton, cuda/cpu auto - backend/app/nlp/score_news.py: news 테이블에서 sentiment_score IS NULL 행을 배치 스코어 → UPDATE (... embedding=(:e)::vector). 종목 필터 + limit 옵션. - backend/app/db/migrations/002_sentiment_view.sql: v_sentiment_daily 뷰. 종목·KST 일별 n_articles, mean_score, pos/neg/neu_ratio, weighted_score (naver_finance 1.0 / google_rss 0.7 / dart 0.5). - backend/app/db/migrate.py: 이미 실행 중인 DB 에 새 SQL 마이그레이션 적용용 CLI. 모든 SQL 파일은 idempotent. - refresh_one.py: refresh 끝에 종목당 200건까지 finbert 스코어, finbert SourceStatus 를 RefreshReport 에 추가. - daily_batch.py: 모든 종목 처리 후 score_pending_news(limit=2000) 로 mop-up. 모델 캐시는 docker-compose hf_cache 볼륨(/root/.cache/huggingface). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
53
backend/app/db/migrate.py
Normal file
53
backend/app/db/migrate.py
Normal file
@@ -0,0 +1,53 @@
|
||||
"""Manual migration runner.
|
||||
|
||||
docker-entrypoint-initdb.d 는 fresh DB 첫 기동 때만 동작. 이미 동작 중인 DB 에
|
||||
새 마이그레이션을 적용하려면 이 스크립트로:
|
||||
|
||||
python -m app.db.migrate
|
||||
|
||||
모든 SQL 파일은 idempotent (CREATE IF NOT EXISTS / CREATE OR REPLACE) 여야 함.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from sqlalchemy import text
|
||||
|
||||
from app.db.connection import get_engine
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MIGRATIONS_DIR = Path(__file__).parent / "migrations"
|
||||
|
||||
|
||||
def apply_all() -> dict[str, str]:
|
||||
"""migrations/ 안 .sql 들을 이름순으로 적용. 결과: {filename: 'ok'|'failed: ...'}."""
|
||||
eng = get_engine()
|
||||
results: dict[str, str] = {}
|
||||
files = sorted(MIGRATIONS_DIR.glob("*.sql"))
|
||||
if not files:
|
||||
logger.warning("no migration files in %s", MIGRATIONS_DIR)
|
||||
return results
|
||||
for path in files:
|
||||
sql = path.read_text(encoding="utf-8")
|
||||
# psql meta-command 제거 (\set ON_ERROR_STOP 등)
|
||||
cleaned = "\n".join(
|
||||
ln for ln in sql.splitlines() if not ln.strip().startswith("\\")
|
||||
)
|
||||
try:
|
||||
with eng.begin() as conn:
|
||||
conn.execute(text(cleaned))
|
||||
results[path.name] = "ok"
|
||||
logger.info("applied %s", path.name)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
results[path.name] = f"failed: {exc}"
|
||||
logger.exception("migration %s failed", path.name)
|
||||
return results
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
|
||||
out = apply_all()
|
||||
for k, v in out.items():
|
||||
print(f"{k}: {v}")
|
||||
32
backend/app/db/migrations/002_sentiment_view.sql
Normal file
32
backend/app/db/migrations/002_sentiment_view.sql
Normal file
@@ -0,0 +1,32 @@
|
||||
-- Phase 2: 일별 종목별 감성 집계 뷰.
|
||||
-- weighted_score : 소스별 가중치 적용
|
||||
-- naver_finance 1.0 (가장 직접적인 종목 페이지 뉴스)
|
||||
-- google_rss 0.7 (관련성 노이즈 있음)
|
||||
-- dart 0.5 (공시는 short title 만으로는 감성이 약함)
|
||||
|
||||
\set ON_ERROR_STOP on
|
||||
|
||||
CREATE OR REPLACE VIEW v_sentiment_daily AS
|
||||
SELECT
|
||||
code,
|
||||
(published_at AT TIME ZONE 'Asia/Seoul')::date AS date,
|
||||
COUNT(*) AS n_articles,
|
||||
AVG(sentiment_score)::REAL AS mean_score,
|
||||
AVG(CASE WHEN sentiment_label = 'positive' THEN 1.0 ELSE 0.0 END)::REAL AS pos_ratio,
|
||||
AVG(CASE WHEN sentiment_label = 'negative' THEN 1.0 ELSE 0.0 END)::REAL AS neg_ratio,
|
||||
AVG(CASE WHEN sentiment_label = 'neutral' THEN 1.0 ELSE 0.0 END)::REAL AS neu_ratio,
|
||||
AVG(
|
||||
sentiment_score * CASE source
|
||||
WHEN 'naver_finance' THEN 1.0
|
||||
WHEN 'google_rss' THEN 0.7
|
||||
WHEN 'dart' THEN 0.5
|
||||
ELSE 0.6
|
||||
END
|
||||
)::REAL AS weighted_score
|
||||
FROM news
|
||||
WHERE sentiment_score IS NOT NULL
|
||||
AND code IS NOT NULL
|
||||
GROUP BY code, (published_at AT TIME ZONE 'Asia/Seoul')::date;
|
||||
|
||||
COMMENT ON VIEW v_sentiment_daily IS
|
||||
'Phase 2: KR-FinBERT 점수를 종목·일(KST) 단위로 집계. Phase 4 LGBM 피처 + UI 차트 보조 데이터로 사용.';
|
||||
0
backend/app/nlp/__init__.py
Normal file
0
backend/app/nlp/__init__.py
Normal file
150
backend/app/nlp/finbert.py
Normal file
150
backend/app/nlp/finbert.py
Normal file
@@ -0,0 +1,150 @@
|
||||
"""KR-FinBERT 감성 분석 어댑터.
|
||||
|
||||
모델: snunlp/KR-FinBert-SC (3-class: negative / neutral / positive)
|
||||
|
||||
- score : prob(positive) - prob(negative) ∈ [-1, +1]
|
||||
- label : argmax 결과 ('positive' / 'neutral' / 'negative')
|
||||
- embedding : 마지막 hidden state mean pool (768d) — `news.embedding` (VECTOR) 저장용
|
||||
|
||||
디바이스: settings.model_device ('auto' → cuda 가용 시 cuda, 아니면 cpu).
|
||||
인증: settings.huggingface_token (gated 모델은 아니지만 HF rate limit 우회 + 일관성).
|
||||
캐시: HF_HOME=/root/.cache/huggingface (docker-compose 의 `hf_cache` 볼륨).
|
||||
|
||||
lazy singleton — FastAPI 기동 시점에 모델을 로드하지 않고, 첫 score_texts() 호출
|
||||
또는 ping() 호출 시점에 로드.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
from dataclasses import dataclass
|
||||
|
||||
from app.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MODEL_NAME = "snunlp/KR-FinBert-SC"
|
||||
# KR-FinBert-SC 의 id2label : {0: 'negative', 1: 'neutral', 2: 'positive'}
|
||||
_LABELS = ("negative", "neutral", "positive")
|
||||
|
||||
_lock = threading.Lock()
|
||||
_state: dict[str, object] = {
|
||||
"loaded": False,
|
||||
"tokenizer": None,
|
||||
"model": None,
|
||||
"device": None,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class FinbertOutput:
|
||||
label: str
|
||||
score: float # prob_positive - prob_negative ∈ [-1, +1]
|
||||
prob_negative: float
|
||||
prob_neutral: float
|
||||
prob_positive: float
|
||||
embedding: list[float] # 768d mean-pooled last hidden state
|
||||
|
||||
|
||||
def _resolve_device() -> str:
|
||||
"""settings.model_device 값에 따라 'cuda'/'cpu' 결정."""
|
||||
import torch # lazy
|
||||
|
||||
pref = (settings.model_device or "auto").lower()
|
||||
if pref == "cuda":
|
||||
return "cuda" if torch.cuda.is_available() else "cpu"
|
||||
if pref == "cpu":
|
||||
return "cpu"
|
||||
# 'auto'
|
||||
return "cuda" if torch.cuda.is_available() else "cpu"
|
||||
|
||||
|
||||
def _load() -> None:
|
||||
global _state
|
||||
with _lock:
|
||||
if _state["loaded"]:
|
||||
return
|
||||
import torch
|
||||
from transformers import AutoModelForSequenceClassification, AutoTokenizer
|
||||
|
||||
token = settings.huggingface_token or None
|
||||
if token:
|
||||
# transformers/datasets 모두 이 env 를 인식.
|
||||
os.environ.setdefault("HUGGINGFACE_HUB_TOKEN", token)
|
||||
os.environ.setdefault("HF_TOKEN", token)
|
||||
|
||||
device = _resolve_device()
|
||||
logger.info("loading %s on %s", MODEL_NAME, device)
|
||||
tok = AutoTokenizer.from_pretrained(MODEL_NAME, token=token)
|
||||
mdl = AutoModelForSequenceClassification.from_pretrained(
|
||||
MODEL_NAME,
|
||||
token=token,
|
||||
output_hidden_states=True,
|
||||
)
|
||||
mdl.eval()
|
||||
mdl.to(device)
|
||||
_state.update({"loaded": True, "tokenizer": tok, "model": mdl, "device": device})
|
||||
logger.info("KR-FinBERT loaded (device=%s)", device)
|
||||
|
||||
|
||||
def score_texts(
|
||||
texts: list[str],
|
||||
*,
|
||||
batch_size: int = 16,
|
||||
max_length: int = 256,
|
||||
) -> list[FinbertOutput]:
|
||||
"""주어진 텍스트 리스트에 대해 감성 점수 + 라벨 + 768d embedding 반환.
|
||||
|
||||
빈 문자열은 placeholder('_')로 치환해서 라벨은 neutral 에 가깝게 나오게 함.
|
||||
"""
|
||||
if not texts:
|
||||
return []
|
||||
_load()
|
||||
import torch
|
||||
|
||||
tok = _state["tokenizer"]
|
||||
mdl = _state["model"]
|
||||
device = _state["device"]
|
||||
|
||||
results: list[FinbertOutput] = []
|
||||
with torch.no_grad():
|
||||
for i in range(0, len(texts), batch_size):
|
||||
chunk = [(t or "").strip() or "_" for t in texts[i : i + batch_size]]
|
||||
enc = tok(
|
||||
chunk,
|
||||
padding=True,
|
||||
truncation=True,
|
||||
max_length=max_length,
|
||||
return_tensors="pt",
|
||||
).to(device)
|
||||
out = mdl(**enc)
|
||||
probs = torch.softmax(out.logits, dim=-1).cpu()
|
||||
last_hidden = out.hidden_states[-1] # (B, T, H)
|
||||
mask = enc["attention_mask"].unsqueeze(-1).float()
|
||||
pooled = (last_hidden * mask).sum(dim=1) / mask.sum(dim=1).clamp(min=1.0)
|
||||
pooled = pooled.cpu().tolist()
|
||||
|
||||
for row, vec in zip(probs.tolist(), pooled):
|
||||
p_neg, p_neu, p_pos = row[0], row[1], row[2]
|
||||
label_idx = int(max(range(3), key=lambda k: row[k]))
|
||||
results.append(
|
||||
FinbertOutput(
|
||||
label=_LABELS[label_idx],
|
||||
score=float(p_pos - p_neg),
|
||||
prob_negative=float(p_neg),
|
||||
prob_neutral=float(p_neu),
|
||||
prob_positive=float(p_pos),
|
||||
embedding=[float(x) for x in vec],
|
||||
)
|
||||
)
|
||||
return results
|
||||
|
||||
|
||||
def ping() -> dict[str, object]:
|
||||
"""모델 로드 가능 여부 빠르게 확인. 한 번 로드되면 캐시됨."""
|
||||
try:
|
||||
_load()
|
||||
return {"status": "ok", "model": MODEL_NAME, "device": _state["device"]}
|
||||
except Exception as exc: # noqa: BLE001
|
||||
return {"status": "failed", "model": MODEL_NAME, "error": str(exc)}
|
||||
96
backend/app/nlp/score_news.py
Normal file
96
backend/app/nlp/score_news.py
Normal file
@@ -0,0 +1,96 @@
|
||||
"""DB news 테이블에서 sentiment_score IS NULL 인 행을 배치로 스코어링.
|
||||
|
||||
refresh_one / daily_batch 에서 뉴스 upsert 직후 호출. 증분만 처리.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import text
|
||||
|
||||
from app.db.connection import get_engine
|
||||
from app.nlp.finbert import score_texts
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ScoreResult:
|
||||
fetched: int
|
||||
scored: int
|
||||
failed: int
|
||||
error: str | None = None
|
||||
|
||||
|
||||
def _vector_literal(vec: list[float]) -> str:
|
||||
"""pgvector 텍스트 리터럴: '[v1,v2,...]' 형식. (:e)::vector 로 캐스팅."""
|
||||
return "[" + ",".join(f"{x:.6f}" for x in vec) + "]"
|
||||
|
||||
|
||||
def score_pending_news(
|
||||
*,
|
||||
batch_size: int = 32,
|
||||
limit: int | None = 500,
|
||||
code: str | None = None,
|
||||
) -> ScoreResult:
|
||||
"""sentiment_score IS NULL 인 news 행에 대해 finbert score + label + embedding 채움.
|
||||
|
||||
Args:
|
||||
batch_size: finbert inference 배치 크기.
|
||||
limit: 한 번에 처리할 최대 행 수. None 이면 무제한.
|
||||
daily_batch 가 너무 무겁지 않도록 기본 500.
|
||||
code: 특정 종목만 (None 이면 전체 무점수 행).
|
||||
"""
|
||||
eng = get_engine()
|
||||
where = "sentiment_score IS NULL"
|
||||
params: dict[str, Any] = {}
|
||||
if code:
|
||||
where += " AND code = :code"
|
||||
params["code"] = code
|
||||
|
||||
sql_select = (
|
||||
"SELECT id, COALESCE(title, '') || ' ' || COALESCE(body, '') AS txt "
|
||||
f"FROM news WHERE {where} ORDER BY id"
|
||||
)
|
||||
if limit is not None:
|
||||
sql_select += f" LIMIT {int(limit)}"
|
||||
|
||||
with eng.connect() as conn:
|
||||
rows = conn.execute(text(sql_select), params).all()
|
||||
if not rows:
|
||||
return ScoreResult(fetched=0, scored=0, failed=0)
|
||||
|
||||
ids = [r[0] for r in rows]
|
||||
texts_in = [r[1] for r in rows]
|
||||
|
||||
try:
|
||||
outputs = score_texts(texts_in, batch_size=batch_size)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.exception("score_texts failed")
|
||||
return ScoreResult(fetched=len(rows), scored=0, failed=len(rows), error=str(exc))
|
||||
|
||||
update_sql = text(
|
||||
"UPDATE news SET sentiment_score = :s, sentiment_label = :l, "
|
||||
"embedding = (:e)::vector WHERE id = :id"
|
||||
)
|
||||
scored = 0
|
||||
failed = 0
|
||||
with eng.begin() as conn:
|
||||
for nid, out in zip(ids, outputs):
|
||||
try:
|
||||
conn.execute(
|
||||
update_sql,
|
||||
{
|
||||
"id": nid,
|
||||
"s": out.score,
|
||||
"l": out.label,
|
||||
"e": _vector_literal(out.embedding),
|
||||
},
|
||||
)
|
||||
scored += 1
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("update news id=%s failed: %s", nid, exc)
|
||||
failed += 1
|
||||
return ScoreResult(fetched=len(rows), scored=scored, failed=failed)
|
||||
@@ -11,6 +11,7 @@ import time
|
||||
from typing import Any
|
||||
|
||||
from app.fetch import macro as macro_mod
|
||||
from app.nlp.score_news import score_pending_news
|
||||
from app.pipelines.refresh_one import refresh_code
|
||||
from app.seed.seed_tickers import SEED_TICKERS
|
||||
|
||||
@@ -32,11 +33,26 @@ def run_daily_batch() -> dict[str, Any]:
|
||||
for m in macros
|
||||
]
|
||||
|
||||
# 시드 종목 refresh 내에서 종목당 200건만 스코어함. 잔여(여러 소스 합쳐
|
||||
# 200건 초과 또는 코드 매핑 안된 google_rss 등)는 여기서 한 번에 mop-up.
|
||||
try:
|
||||
mop = score_pending_news(limit=2000)
|
||||
sentiment_summary: dict[str, Any] = {
|
||||
"status": "ok" if mop.error is None else "failed",
|
||||
"fetched": mop.fetched,
|
||||
"scored": mop.scored,
|
||||
"failed": mop.failed,
|
||||
"error": mop.error,
|
||||
}
|
||||
except Exception as exc: # noqa: BLE001
|
||||
sentiment_summary = {"status": "failed", "error": str(exc)}
|
||||
|
||||
elapsed = time.time() - start_ts
|
||||
return {
|
||||
"duration_seconds": round(elapsed, 2),
|
||||
"tickers": reports,
|
||||
"macro": macro_summary,
|
||||
"sentiment_mop": sentiment_summary,
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -36,6 +36,7 @@ class RefreshReport:
|
||||
dart: SourceStatus
|
||||
naver_news: SourceStatus
|
||||
google_rss: SourceStatus
|
||||
finbert: SourceStatus
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
out: dict[str, Any] = {"code": self.code}
|
||||
@@ -46,6 +47,7 @@ class RefreshReport:
|
||||
"dart",
|
||||
"naver_news",
|
||||
"google_rss",
|
||||
"finbert",
|
||||
):
|
||||
v: SourceStatus = getattr(self, f)
|
||||
out[f] = asdict(v)
|
||||
@@ -132,6 +134,25 @@ def _google_rss(code: str, name: str) -> SourceStatus:
|
||||
return SourceStatus(status="failed", error=str(exc))
|
||||
|
||||
|
||||
def _finbert(code: str) -> SourceStatus:
|
||||
"""방금 upsert 된 뉴스 중 sentiment_score 가 비어있는 행을 KR-FinBERT 로 스코어."""
|
||||
try:
|
||||
from app.nlp.score_news import score_pending_news
|
||||
|
||||
# 한 종목에 대해 신규 뉴스가 매우 많아도 200건으로 컷.
|
||||
# daily_batch 끝에서 잔여분을 별도로 mop-up 한다.
|
||||
res = score_pending_news(code=code, limit=200)
|
||||
return SourceStatus(
|
||||
status="ok" if res.error is None else "failed",
|
||||
inserted=res.scored,
|
||||
skipped=res.failed,
|
||||
extra={"fetched": res.fetched},
|
||||
error=res.error,
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
return SourceStatus(status="failed", error=str(exc))
|
||||
|
||||
|
||||
def refresh_code(code: str, name: str, *, lookback_days: int = 7) -> RefreshReport:
|
||||
"""단기 갱신 (daily_batch 용). 최근 lookback_days 만 가져온다."""
|
||||
end = date.today()
|
||||
@@ -144,4 +165,5 @@ def refresh_code(code: str, name: str, *, lookback_days: int = 7) -> RefreshRepo
|
||||
dart=_dart(code, start, end),
|
||||
naver_news=_naver_news(code),
|
||||
google_rss=_google_rss(code, name),
|
||||
finbert=_finbert(code),
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user