From edda01adbfbd23f98d0c1890f989028a68dedcaa Mon Sep 17 00:00:00 2001 From: tkrmagid Date: Wed, 20 May 2026 15:57:34 +0900 Subject: [PATCH] =?UTF-8?q?feat(phase-2):=20KR-FinBERT=20=EA=B0=90?= =?UTF-8?q?=EC=84=B1=20=EC=8A=A4=EC=BD=94=EC=96=B4=EB=A7=81=20+=20?= =?UTF-8?q?=EC=9D=BC=EB=B3=84=20=EC=A7=91=EA=B3=84=20=EB=B7=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - backend/app/nlp/finbert.py: snunlp/KR-FinBert-SC 어댑터. - score = P(pos) - P(neg) ∈ [-1, +1], label = argmax (neg/neu/pos) - 768d mean-pooled last hidden state → news.embedding (VECTOR) 저장 - settings.huggingface_token 인증, lazy singleton, cuda/cpu auto - backend/app/nlp/score_news.py: news 테이블에서 sentiment_score IS NULL 행을 배치 스코어 → UPDATE (... embedding=(:e)::vector). 종목 필터 + limit 옵션. - backend/app/db/migrations/002_sentiment_view.sql: v_sentiment_daily 뷰. 종목·KST 일별 n_articles, mean_score, pos/neg/neu_ratio, weighted_score (naver_finance 1.0 / google_rss 0.7 / dart 0.5). - backend/app/db/migrate.py: 이미 실행 중인 DB 에 새 SQL 마이그레이션 적용용 CLI. 모든 SQL 파일은 idempotent. - refresh_one.py: refresh 끝에 종목당 200건까지 finbert 스코어, finbert SourceStatus 를 RefreshReport 에 추가. - daily_batch.py: 모든 종목 처리 후 score_pending_news(limit=2000) 로 mop-up. 모델 캐시는 docker-compose hf_cache 볼륨(/root/.cache/huggingface). Co-Authored-By: Claude Opus 4.7 --- backend/app/db/migrate.py | 53 +++++++ .../app/db/migrations/002_sentiment_view.sql | 32 ++++ backend/app/nlp/__init__.py | 0 backend/app/nlp/finbert.py | 150 ++++++++++++++++++ backend/app/nlp/score_news.py | 96 +++++++++++ backend/app/pipelines/daily_batch.py | 16 ++ backend/app/pipelines/refresh_one.py | 22 +++ 7 files changed, 369 insertions(+) create mode 100644 backend/app/db/migrate.py create mode 100644 backend/app/db/migrations/002_sentiment_view.sql create mode 100644 backend/app/nlp/__init__.py create mode 100644 backend/app/nlp/finbert.py create mode 100644 backend/app/nlp/score_news.py diff --git a/backend/app/db/migrate.py b/backend/app/db/migrate.py new file mode 100644 index 0000000..6f84f56 --- /dev/null +++ b/backend/app/db/migrate.py @@ -0,0 +1,53 @@ +"""Manual migration runner. + +docker-entrypoint-initdb.d 는 fresh DB 첫 기동 때만 동작. 이미 동작 중인 DB 에 +새 마이그레이션을 적용하려면 이 스크립트로: + + python -m app.db.migrate + +모든 SQL 파일은 idempotent (CREATE IF NOT EXISTS / CREATE OR REPLACE) 여야 함. +""" +from __future__ import annotations + +import logging +from pathlib import Path + +from sqlalchemy import text + +from app.db.connection import get_engine + +logger = logging.getLogger(__name__) + +MIGRATIONS_DIR = Path(__file__).parent / "migrations" + + +def apply_all() -> dict[str, str]: + """migrations/ 안 .sql 들을 이름순으로 적용. 결과: {filename: 'ok'|'failed: ...'}.""" + eng = get_engine() + results: dict[str, str] = {} + files = sorted(MIGRATIONS_DIR.glob("*.sql")) + if not files: + logger.warning("no migration files in %s", MIGRATIONS_DIR) + return results + for path in files: + sql = path.read_text(encoding="utf-8") + # psql meta-command 제거 (\set ON_ERROR_STOP 등) + cleaned = "\n".join( + ln for ln in sql.splitlines() if not ln.strip().startswith("\\") + ) + try: + with eng.begin() as conn: + conn.execute(text(cleaned)) + results[path.name] = "ok" + logger.info("applied %s", path.name) + except Exception as exc: # noqa: BLE001 + results[path.name] = f"failed: {exc}" + logger.exception("migration %s failed", path.name) + return results + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") + out = apply_all() + for k, v in out.items(): + print(f"{k}: {v}") diff --git a/backend/app/db/migrations/002_sentiment_view.sql b/backend/app/db/migrations/002_sentiment_view.sql new file mode 100644 index 0000000..f51379d --- /dev/null +++ b/backend/app/db/migrations/002_sentiment_view.sql @@ -0,0 +1,32 @@ +-- Phase 2: 일별 종목별 감성 집계 뷰. +-- weighted_score : 소스별 가중치 적용 +-- naver_finance 1.0 (가장 직접적인 종목 페이지 뉴스) +-- google_rss 0.7 (관련성 노이즈 있음) +-- dart 0.5 (공시는 short title 만으로는 감성이 약함) + +\set ON_ERROR_STOP on + +CREATE OR REPLACE VIEW v_sentiment_daily AS +SELECT + code, + (published_at AT TIME ZONE 'Asia/Seoul')::date AS date, + COUNT(*) AS n_articles, + AVG(sentiment_score)::REAL AS mean_score, + AVG(CASE WHEN sentiment_label = 'positive' THEN 1.0 ELSE 0.0 END)::REAL AS pos_ratio, + AVG(CASE WHEN sentiment_label = 'negative' THEN 1.0 ELSE 0.0 END)::REAL AS neg_ratio, + AVG(CASE WHEN sentiment_label = 'neutral' THEN 1.0 ELSE 0.0 END)::REAL AS neu_ratio, + AVG( + sentiment_score * CASE source + WHEN 'naver_finance' THEN 1.0 + WHEN 'google_rss' THEN 0.7 + WHEN 'dart' THEN 0.5 + ELSE 0.6 + END + )::REAL AS weighted_score +FROM news +WHERE sentiment_score IS NOT NULL + AND code IS NOT NULL +GROUP BY code, (published_at AT TIME ZONE 'Asia/Seoul')::date; + +COMMENT ON VIEW v_sentiment_daily IS + 'Phase 2: KR-FinBERT 점수를 종목·일(KST) 단위로 집계. Phase 4 LGBM 피처 + UI 차트 보조 데이터로 사용.'; diff --git a/backend/app/nlp/__init__.py b/backend/app/nlp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/nlp/finbert.py b/backend/app/nlp/finbert.py new file mode 100644 index 0000000..753fa51 --- /dev/null +++ b/backend/app/nlp/finbert.py @@ -0,0 +1,150 @@ +"""KR-FinBERT 감성 분석 어댑터. + +모델: snunlp/KR-FinBert-SC (3-class: negative / neutral / positive) + +- score : prob(positive) - prob(negative) ∈ [-1, +1] +- label : argmax 결과 ('positive' / 'neutral' / 'negative') +- embedding : 마지막 hidden state mean pool (768d) — `news.embedding` (VECTOR) 저장용 + +디바이스: settings.model_device ('auto' → cuda 가용 시 cuda, 아니면 cpu). +인증: settings.huggingface_token (gated 모델은 아니지만 HF rate limit 우회 + 일관성). +캐시: HF_HOME=/root/.cache/huggingface (docker-compose 의 `hf_cache` 볼륨). + +lazy singleton — FastAPI 기동 시점에 모델을 로드하지 않고, 첫 score_texts() 호출 +또는 ping() 호출 시점에 로드. +""" +from __future__ import annotations + +import logging +import os +import threading +from dataclasses import dataclass + +from app.config import settings + +logger = logging.getLogger(__name__) + +MODEL_NAME = "snunlp/KR-FinBert-SC" +# KR-FinBert-SC 의 id2label : {0: 'negative', 1: 'neutral', 2: 'positive'} +_LABELS = ("negative", "neutral", "positive") + +_lock = threading.Lock() +_state: dict[str, object] = { + "loaded": False, + "tokenizer": None, + "model": None, + "device": None, +} + + +@dataclass +class FinbertOutput: + label: str + score: float # prob_positive - prob_negative ∈ [-1, +1] + prob_negative: float + prob_neutral: float + prob_positive: float + embedding: list[float] # 768d mean-pooled last hidden state + + +def _resolve_device() -> str: + """settings.model_device 값에 따라 'cuda'/'cpu' 결정.""" + import torch # lazy + + pref = (settings.model_device or "auto").lower() + if pref == "cuda": + return "cuda" if torch.cuda.is_available() else "cpu" + if pref == "cpu": + return "cpu" + # 'auto' + return "cuda" if torch.cuda.is_available() else "cpu" + + +def _load() -> None: + global _state + with _lock: + if _state["loaded"]: + return + import torch + from transformers import AutoModelForSequenceClassification, AutoTokenizer + + token = settings.huggingface_token or None + if token: + # transformers/datasets 모두 이 env 를 인식. + os.environ.setdefault("HUGGINGFACE_HUB_TOKEN", token) + os.environ.setdefault("HF_TOKEN", token) + + device = _resolve_device() + logger.info("loading %s on %s", MODEL_NAME, device) + tok = AutoTokenizer.from_pretrained(MODEL_NAME, token=token) + mdl = AutoModelForSequenceClassification.from_pretrained( + MODEL_NAME, + token=token, + output_hidden_states=True, + ) + mdl.eval() + mdl.to(device) + _state.update({"loaded": True, "tokenizer": tok, "model": mdl, "device": device}) + logger.info("KR-FinBERT loaded (device=%s)", device) + + +def score_texts( + texts: list[str], + *, + batch_size: int = 16, + max_length: int = 256, +) -> list[FinbertOutput]: + """주어진 텍스트 리스트에 대해 감성 점수 + 라벨 + 768d embedding 반환. + + 빈 문자열은 placeholder('_')로 치환해서 라벨은 neutral 에 가깝게 나오게 함. + """ + if not texts: + return [] + _load() + import torch + + tok = _state["tokenizer"] + mdl = _state["model"] + device = _state["device"] + + results: list[FinbertOutput] = [] + with torch.no_grad(): + for i in range(0, len(texts), batch_size): + chunk = [(t or "").strip() or "_" for t in texts[i : i + batch_size]] + enc = tok( + chunk, + padding=True, + truncation=True, + max_length=max_length, + return_tensors="pt", + ).to(device) + out = mdl(**enc) + probs = torch.softmax(out.logits, dim=-1).cpu() + last_hidden = out.hidden_states[-1] # (B, T, H) + mask = enc["attention_mask"].unsqueeze(-1).float() + pooled = (last_hidden * mask).sum(dim=1) / mask.sum(dim=1).clamp(min=1.0) + pooled = pooled.cpu().tolist() + + for row, vec in zip(probs.tolist(), pooled): + p_neg, p_neu, p_pos = row[0], row[1], row[2] + label_idx = int(max(range(3), key=lambda k: row[k])) + results.append( + FinbertOutput( + label=_LABELS[label_idx], + score=float(p_pos - p_neg), + prob_negative=float(p_neg), + prob_neutral=float(p_neu), + prob_positive=float(p_pos), + embedding=[float(x) for x in vec], + ) + ) + return results + + +def ping() -> dict[str, object]: + """모델 로드 가능 여부 빠르게 확인. 한 번 로드되면 캐시됨.""" + try: + _load() + return {"status": "ok", "model": MODEL_NAME, "device": _state["device"]} + except Exception as exc: # noqa: BLE001 + return {"status": "failed", "model": MODEL_NAME, "error": str(exc)} diff --git a/backend/app/nlp/score_news.py b/backend/app/nlp/score_news.py new file mode 100644 index 0000000..c61fb03 --- /dev/null +++ b/backend/app/nlp/score_news.py @@ -0,0 +1,96 @@ +"""DB news 테이블에서 sentiment_score IS NULL 인 행을 배치로 스코어링. + +refresh_one / daily_batch 에서 뉴스 upsert 직후 호출. 증분만 처리. +""" +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import Any + +from sqlalchemy import text + +from app.db.connection import get_engine +from app.nlp.finbert import score_texts + +logger = logging.getLogger(__name__) + + +@dataclass +class ScoreResult: + fetched: int + scored: int + failed: int + error: str | None = None + + +def _vector_literal(vec: list[float]) -> str: + """pgvector 텍스트 리터럴: '[v1,v2,...]' 형식. (:e)::vector 로 캐스팅.""" + return "[" + ",".join(f"{x:.6f}" for x in vec) + "]" + + +def score_pending_news( + *, + batch_size: int = 32, + limit: int | None = 500, + code: str | None = None, +) -> ScoreResult: + """sentiment_score IS NULL 인 news 행에 대해 finbert score + label + embedding 채움. + + Args: + batch_size: finbert inference 배치 크기. + limit: 한 번에 처리할 최대 행 수. None 이면 무제한. + daily_batch 가 너무 무겁지 않도록 기본 500. + code: 특정 종목만 (None 이면 전체 무점수 행). + """ + eng = get_engine() + where = "sentiment_score IS NULL" + params: dict[str, Any] = {} + if code: + where += " AND code = :code" + params["code"] = code + + sql_select = ( + "SELECT id, COALESCE(title, '') || ' ' || COALESCE(body, '') AS txt " + f"FROM news WHERE {where} ORDER BY id" + ) + if limit is not None: + sql_select += f" LIMIT {int(limit)}" + + with eng.connect() as conn: + rows = conn.execute(text(sql_select), params).all() + if not rows: + return ScoreResult(fetched=0, scored=0, failed=0) + + ids = [r[0] for r in rows] + texts_in = [r[1] for r in rows] + + try: + outputs = score_texts(texts_in, batch_size=batch_size) + except Exception as exc: # noqa: BLE001 + logger.exception("score_texts failed") + return ScoreResult(fetched=len(rows), scored=0, failed=len(rows), error=str(exc)) + + update_sql = text( + "UPDATE news SET sentiment_score = :s, sentiment_label = :l, " + "embedding = (:e)::vector WHERE id = :id" + ) + scored = 0 + failed = 0 + with eng.begin() as conn: + for nid, out in zip(ids, outputs): + try: + conn.execute( + update_sql, + { + "id": nid, + "s": out.score, + "l": out.label, + "e": _vector_literal(out.embedding), + }, + ) + scored += 1 + except Exception as exc: # noqa: BLE001 + logger.warning("update news id=%s failed: %s", nid, exc) + failed += 1 + return ScoreResult(fetched=len(rows), scored=scored, failed=failed) diff --git a/backend/app/pipelines/daily_batch.py b/backend/app/pipelines/daily_batch.py index 16a85d5..0f88df2 100644 --- a/backend/app/pipelines/daily_batch.py +++ b/backend/app/pipelines/daily_batch.py @@ -11,6 +11,7 @@ import time from typing import Any from app.fetch import macro as macro_mod +from app.nlp.score_news import score_pending_news from app.pipelines.refresh_one import refresh_code from app.seed.seed_tickers import SEED_TICKERS @@ -32,11 +33,26 @@ def run_daily_batch() -> dict[str, Any]: for m in macros ] + # 시드 종목 refresh 내에서 종목당 200건만 스코어함. 잔여(여러 소스 합쳐 + # 200건 초과 또는 코드 매핑 안된 google_rss 등)는 여기서 한 번에 mop-up. + try: + mop = score_pending_news(limit=2000) + sentiment_summary: dict[str, Any] = { + "status": "ok" if mop.error is None else "failed", + "fetched": mop.fetched, + "scored": mop.scored, + "failed": mop.failed, + "error": mop.error, + } + except Exception as exc: # noqa: BLE001 + sentiment_summary = {"status": "failed", "error": str(exc)} + elapsed = time.time() - start_ts return { "duration_seconds": round(elapsed, 2), "tickers": reports, "macro": macro_summary, + "sentiment_mop": sentiment_summary, } diff --git a/backend/app/pipelines/refresh_one.py b/backend/app/pipelines/refresh_one.py index faae88c..e56b549 100644 --- a/backend/app/pipelines/refresh_one.py +++ b/backend/app/pipelines/refresh_one.py @@ -36,6 +36,7 @@ class RefreshReport: dart: SourceStatus naver_news: SourceStatus google_rss: SourceStatus + finbert: SourceStatus def to_dict(self) -> dict[str, Any]: out: dict[str, Any] = {"code": self.code} @@ -46,6 +47,7 @@ class RefreshReport: "dart", "naver_news", "google_rss", + "finbert", ): v: SourceStatus = getattr(self, f) out[f] = asdict(v) @@ -132,6 +134,25 @@ def _google_rss(code: str, name: str) -> SourceStatus: return SourceStatus(status="failed", error=str(exc)) +def _finbert(code: str) -> SourceStatus: + """방금 upsert 된 뉴스 중 sentiment_score 가 비어있는 행을 KR-FinBERT 로 스코어.""" + try: + from app.nlp.score_news import score_pending_news + + # 한 종목에 대해 신규 뉴스가 매우 많아도 200건으로 컷. + # daily_batch 끝에서 잔여분을 별도로 mop-up 한다. + res = score_pending_news(code=code, limit=200) + return SourceStatus( + status="ok" if res.error is None else "failed", + inserted=res.scored, + skipped=res.failed, + extra={"fetched": res.fetched}, + error=res.error, + ) + except Exception as exc: # noqa: BLE001 + return SourceStatus(status="failed", error=str(exc)) + + def refresh_code(code: str, name: str, *, lookback_days: int = 7) -> RefreshReport: """단기 갱신 (daily_batch 용). 최근 lookback_days 만 가져온다.""" end = date.today() @@ -144,4 +165,5 @@ def refresh_code(code: str, name: str, *, lookback_days: int = 7) -> RefreshRepo dart=_dart(code, start, end), naver_news=_naver_news(code), google_rss=_google_rss(code, name), + finbert=_finbert(code), )