- backend/app/nlp/finbert.py: snunlp/KR-FinBert-SC 어댑터. - score = P(pos) - P(neg) ∈ [-1, +1], label = argmax (neg/neu/pos) - 768d mean-pooled last hidden state → news.embedding (VECTOR) 저장 - settings.huggingface_token 인증, lazy singleton, cuda/cpu auto - backend/app/nlp/score_news.py: news 테이블에서 sentiment_score IS NULL 행을 배치 스코어 → UPDATE (... embedding=(:e)::vector). 종목 필터 + limit 옵션. - backend/app/db/migrations/002_sentiment_view.sql: v_sentiment_daily 뷰. 종목·KST 일별 n_articles, mean_score, pos/neg/neu_ratio, weighted_score (naver_finance 1.0 / google_rss 0.7 / dart 0.5). - backend/app/db/migrate.py: 이미 실행 중인 DB 에 새 SQL 마이그레이션 적용용 CLI. 모든 SQL 파일은 idempotent. - refresh_one.py: refresh 끝에 종목당 200건까지 finbert 스코어, finbert SourceStatus 를 RefreshReport 에 추가. - daily_batch.py: 모든 종목 처리 후 score_pending_news(limit=2000) 로 mop-up. 모델 캐시는 docker-compose hf_cache 볼륨(/root/.cache/huggingface). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
151 lines
5.0 KiB
Python
151 lines
5.0 KiB
Python
"""KR-FinBERT 감성 분석 어댑터.
|
|
|
|
모델: snunlp/KR-FinBert-SC (3-class: negative / neutral / positive)
|
|
|
|
- score : prob(positive) - prob(negative) ∈ [-1, +1]
|
|
- label : argmax 결과 ('positive' / 'neutral' / 'negative')
|
|
- embedding : 마지막 hidden state mean pool (768d) — `news.embedding` (VECTOR) 저장용
|
|
|
|
디바이스: settings.model_device ('auto' → cuda 가용 시 cuda, 아니면 cpu).
|
|
인증: settings.huggingface_token (gated 모델은 아니지만 HF rate limit 우회 + 일관성).
|
|
캐시: HF_HOME=/root/.cache/huggingface (docker-compose 의 `hf_cache` 볼륨).
|
|
|
|
lazy singleton — FastAPI 기동 시점에 모델을 로드하지 않고, 첫 score_texts() 호출
|
|
또는 ping() 호출 시점에 로드.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import threading
|
|
from dataclasses import dataclass
|
|
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
MODEL_NAME = "snunlp/KR-FinBert-SC"
|
|
# KR-FinBert-SC 의 id2label : {0: 'negative', 1: 'neutral', 2: 'positive'}
|
|
_LABELS = ("negative", "neutral", "positive")
|
|
|
|
_lock = threading.Lock()
|
|
_state: dict[str, object] = {
|
|
"loaded": False,
|
|
"tokenizer": None,
|
|
"model": None,
|
|
"device": None,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class FinbertOutput:
|
|
label: str
|
|
score: float # prob_positive - prob_negative ∈ [-1, +1]
|
|
prob_negative: float
|
|
prob_neutral: float
|
|
prob_positive: float
|
|
embedding: list[float] # 768d mean-pooled last hidden state
|
|
|
|
|
|
def _resolve_device() -> str:
|
|
"""settings.model_device 값에 따라 'cuda'/'cpu' 결정."""
|
|
import torch # lazy
|
|
|
|
pref = (settings.model_device or "auto").lower()
|
|
if pref == "cuda":
|
|
return "cuda" if torch.cuda.is_available() else "cpu"
|
|
if pref == "cpu":
|
|
return "cpu"
|
|
# 'auto'
|
|
return "cuda" if torch.cuda.is_available() else "cpu"
|
|
|
|
|
|
def _load() -> None:
|
|
global _state
|
|
with _lock:
|
|
if _state["loaded"]:
|
|
return
|
|
import torch
|
|
from transformers import AutoModelForSequenceClassification, AutoTokenizer
|
|
|
|
token = settings.huggingface_token or None
|
|
if token:
|
|
# transformers/datasets 모두 이 env 를 인식.
|
|
os.environ.setdefault("HUGGINGFACE_HUB_TOKEN", token)
|
|
os.environ.setdefault("HF_TOKEN", token)
|
|
|
|
device = _resolve_device()
|
|
logger.info("loading %s on %s", MODEL_NAME, device)
|
|
tok = AutoTokenizer.from_pretrained(MODEL_NAME, token=token)
|
|
mdl = AutoModelForSequenceClassification.from_pretrained(
|
|
MODEL_NAME,
|
|
token=token,
|
|
output_hidden_states=True,
|
|
)
|
|
mdl.eval()
|
|
mdl.to(device)
|
|
_state.update({"loaded": True, "tokenizer": tok, "model": mdl, "device": device})
|
|
logger.info("KR-FinBERT loaded (device=%s)", device)
|
|
|
|
|
|
def score_texts(
|
|
texts: list[str],
|
|
*,
|
|
batch_size: int = 16,
|
|
max_length: int = 256,
|
|
) -> list[FinbertOutput]:
|
|
"""주어진 텍스트 리스트에 대해 감성 점수 + 라벨 + 768d embedding 반환.
|
|
|
|
빈 문자열은 placeholder('_')로 치환해서 라벨은 neutral 에 가깝게 나오게 함.
|
|
"""
|
|
if not texts:
|
|
return []
|
|
_load()
|
|
import torch
|
|
|
|
tok = _state["tokenizer"]
|
|
mdl = _state["model"]
|
|
device = _state["device"]
|
|
|
|
results: list[FinbertOutput] = []
|
|
with torch.no_grad():
|
|
for i in range(0, len(texts), batch_size):
|
|
chunk = [(t or "").strip() or "_" for t in texts[i : i + batch_size]]
|
|
enc = tok(
|
|
chunk,
|
|
padding=True,
|
|
truncation=True,
|
|
max_length=max_length,
|
|
return_tensors="pt",
|
|
).to(device)
|
|
out = mdl(**enc)
|
|
probs = torch.softmax(out.logits, dim=-1).cpu()
|
|
last_hidden = out.hidden_states[-1] # (B, T, H)
|
|
mask = enc["attention_mask"].unsqueeze(-1).float()
|
|
pooled = (last_hidden * mask).sum(dim=1) / mask.sum(dim=1).clamp(min=1.0)
|
|
pooled = pooled.cpu().tolist()
|
|
|
|
for row, vec in zip(probs.tolist(), pooled):
|
|
p_neg, p_neu, p_pos = row[0], row[1], row[2]
|
|
label_idx = int(max(range(3), key=lambda k: row[k]))
|
|
results.append(
|
|
FinbertOutput(
|
|
label=_LABELS[label_idx],
|
|
score=float(p_pos - p_neg),
|
|
prob_negative=float(p_neg),
|
|
prob_neutral=float(p_neu),
|
|
prob_positive=float(p_pos),
|
|
embedding=[float(x) for x in vec],
|
|
)
|
|
)
|
|
return results
|
|
|
|
|
|
def ping() -> dict[str, object]:
|
|
"""모델 로드 가능 여부 빠르게 확인. 한 번 로드되면 캐시됨."""
|
|
try:
|
|
_load()
|
|
return {"status": "ok", "model": MODEL_NAME, "device": _state["device"]}
|
|
except Exception as exc: # noqa: BLE001
|
|
return {"status": "failed", "model": MODEL_NAME, "error": str(exc)}
|