import { spawn, type ChildProcess, type ChildProcessByStdio } from "node:child_process"; import { once } from "node:events"; import { promises as fs } from "node:fs"; import os from "node:os"; import path from "node:path"; import type { Readable, Writable } from "node:stream"; import ffmpegStatic from "ffmpeg-static"; import { RealTimeVAD } from "avr-vad"; import type { AssistantRuntimeConfig } from "../config.js"; import { Logger } from "../logger.js"; import { takeFrame, int16ArrayToFloat32, float32ToPcm16Buffer } from "./pcm.js"; import { ConversationMemory, type UserUtterance } from "../services/conversation.js"; import { ElevenLabsSttService } from "../services/elevenlabs-stt.js"; import { ElevenLabsTtsService, type PreparedSpeechAudio } from "../services/elevenlabs-tts.js"; import { OpenAiLlmService } from "../services/openai-llm.js"; interface LocalVoiceSessionOptions { config: AssistantRuntimeConfig; logger: Logger; stt: ElevenLabsSttService; tts: ElevenLabsTtsService; llm: OpenAiLlmService; } interface SpeechJob { text: string; source: "assistant" | "manual"; } export class LocalVoiceSession { private readonly memory: ConversationMemory; private readonly queue: SpeechJob[] = []; private readonly pendingSamples: number[] = []; private vad: RealTimeVAD | null = null; private recorder: ChildProcessByStdio | null = null; private currentPlayer: ChildProcess | null = null; private currentAbortController: AbortController | null = null; private currentPlayback: PreparedSpeechAudio | null = null; private processing = Promise.resolve(); private draining = false; private destroyed = false; constructor(private readonly options: LocalVoiceSessionOptions) { this.memory = new ConversationMemory(options.config.MAX_CONVERSATION_TURNS); } async start(): Promise { this.vad = await RealTimeVAD.new({ model: "v5", sampleRate: 16000, frameSamples: 1536, positiveSpeechThreshold: 0.55, negativeSpeechThreshold: 0.35, redemptionFrames: 8, preSpeechPadFrames: 2, minSpeechFrames: 3, onFrameProcessed: () => undefined, onVADMisfire: () => undefined, onSpeechStart: () => { this.interruptPlayback("local-barge-in"); }, onSpeechRealStart: () => undefined, onSpeechEnd: (audio: Float32Array) => { void this.handleSpeechEnd(audio); }, }); this.recorder = this.spawnRecorder(); this.recorder.stdout.on("data", (chunk: Buffer) => { this.pushPcm16Chunk(chunk); }); this.recorder.stderr.on("data", (chunk: Buffer) => { const text = chunk.toString().trim(); if (text.length > 0) { this.options.logger.debug("[pw-record]", text); } }); this.recorder.on("exit", (code, signal) => { if (!this.destroyed) { this.options.logger.warn("pw-record exited unexpectedly", { code, signal }); } }); } async destroy(): Promise { this.destroyed = true; this.interruptPlayback("local-shutdown"); if (this.recorder && !this.recorder.killed) { this.recorder.kill("SIGTERM"); await once(this.recorder, "exit").catch(() => null); } if (this.vad) { await this.vad.destroy().catch((error) => { this.options.logger.warn("Local VAD destroy failed", error); }); this.vad = null; } } clearConversation(): void { this.memory.clear(); this.interruptPlayback("local-reset"); } async speakText(text: string): Promise { this.queue.push({ text, source: "manual", }); await this.drainQueue(); } statusSummary(): string { return [ "모드: local", `플랫폼: ${process.platform}`, `입력 source: ${this.options.config.LOCAL_AUDIO_SOURCE ?? "default"}`, `출력 sink: ${this.describeSink()}`, `대기열: ${this.queue.length}`, `최근 대화 턴: ${this.memory.recentTurns().length}`, ].join("\n"); } private spawnRecorder(): ChildProcessByStdio { if (process.platform === "win32") { return this.spawnWindowsRecorder(); } const args = [ "--rate", "16000", "--channels", "1", "--format", "s16", "--raw", ]; if (this.options.config.LOCAL_AUDIO_SOURCE) { args.push("--target", this.options.config.LOCAL_AUDIO_SOURCE); } args.push("-"); this.options.logger.info("Starting local recorder", { source: this.options.config.LOCAL_AUDIO_SOURCE ?? "default", }); return spawn("pw-record", args, { stdio: ["ignore", "pipe", "pipe"], }); } private spawnWindowsRecorder(): ChildProcessByStdio { const ffmpegPath = this.getFfmpegPath(); const sourceName = this.options.config.LOCAL_AUDIO_SOURCE; if (!sourceName) { throw new Error("Windows 로컬 모드는 LOCAL_AUDIO_SOURCE 설정이 필요합니다. `bun run audio:devices` 로 이름을 확인해 주세요."); } const args = [ "-hide_banner", "-loglevel", "warning", "-f", "dshow", "-i", `audio=${sourceName}`, "-ac", "1", "-ar", "16000", "-f", "s16le", "pipe:1", ]; this.options.logger.info("Starting local recorder", { source: sourceName, backend: "ffmpeg-dshow", }); return spawn(ffmpegPath, args, { stdio: ["ignore", "pipe", "pipe"], }); } private pushPcm16Chunk(chunk: Buffer): void { if (this.destroyed || !this.vad) { return; } for (let offset = 0; offset + 1 < chunk.length; offset += 2) { this.pendingSamples.push(chunk.readInt16LE(offset)); } while (true) { const frame = takeFrame(this.pendingSamples, 1536); if (!frame) { return; } const floatFrame = int16ArrayToFloat32(frame); this.processing = this.processing .then(() => this.vad?.processAudio(floatFrame)) .catch((error) => { this.options.logger.warn("Local VAD processing failed", error); }); } } private async handleSpeechEnd(audio: Float32Array): Promise { if (audio.length < 16000 * 0.25) { return; } const utterance: UserUtterance = { speakerId: "local-user", speakerName: this.options.config.LOCAL_SPEAKER_NAME, text: "", }; let transcript: string | null = null; try { transcript = await this.options.stt.transcribePcm16(float32ToPcm16Buffer(audio)); } catch (error) { this.options.logger.warn("Local STT failed", error); return; } if (!transcript || transcript.trim().length === 0) { return; } utterance.text = transcript.trim(); this.memory.addUserTurn(utterance); this.options.logger.info("Local transcript", utterance.text); if (this.options.config.DEBUG_TEXT_EVENTS) { console.log(`\n[you] ${utterance.text}`); } let reply: string; try { reply = await this.options.llm.generateReply(this.memory, utterance); } catch (error) { this.options.logger.warn("Local LLM failed", error); reply = "지금은 답변 생성에 실패했습니다. 잠시 후 다시 말씀해 주세요."; } this.memory.addAssistantTurn(reply); if (this.options.config.DEBUG_TEXT_EVENTS) { console.log(`[bot] ${reply}\n`); } this.queue.push({ text: reply, source: "assistant", }); await this.drainQueue(); } private interruptPlayback(reason: string): void { if (this.queue.length > 0 || this.currentPlayer) { this.options.logger.info("Interrupting local playback", reason); } this.queue.splice(0, this.queue.length); this.currentAbortController?.abort(); this.currentAbortController = null; this.currentPlayback?.dispose(); this.currentPlayback = null; if (this.currentPlayer && !this.currentPlayer.killed) { this.currentPlayer.kill("SIGKILL"); } this.currentPlayer = null; } private async drainQueue(): Promise { if (this.draining || this.destroyed) { return; } this.draining = true; try { while (this.queue.length > 0 && !this.destroyed) { const job = this.queue.shift(); if (!job) { continue; } const abortController = new AbortController(); this.currentAbortController = abortController; try { this.currentPlayback = await this.options.tts.preparePlayback(job.text, abortController.signal); } catch (error) { if (!abortController.signal.aborted) { this.options.logger.warn("Local TTS synthesis failed", error); } continue; } try { await this.playToSink(this.currentPlayback, abortController.signal); } catch (error) { if (!abortController.signal.aborted) { this.options.logger.warn("Local playback failed", error); } } finally { this.currentPlayback?.dispose(); this.currentPlayback = null; if (this.currentAbortController === abortController) { this.currentAbortController = null; } } } } finally { this.draining = false; } } private async playToSink(playback: PreparedSpeechAudio, signal: AbortSignal): Promise { if (process.platform === "win32") { await this.playToWindowsDefaultSink(playback, signal); return; } const args = [ "--rate", "48000", "--channels", "2", "--format", "s16", "--raw", ]; if (this.options.config.LOCAL_AUDIO_SINK) { args.push("--target", this.options.config.LOCAL_AUDIO_SINK); } args.push("-"); const player = spawn("pw-play", args, { stdio: ["pipe", "ignore", "pipe"], }); this.currentPlayer = player; player.stderr.on("data", (chunk: Buffer) => { const text = chunk.toString().trim(); if (text.length > 0) { this.options.logger.debug("[pw-play]", text); } }); signal.addEventListener( "abort", () => { playback.stream.destroy(); if (!player.killed) { player.kill("SIGKILL"); } }, { once: true }, ); playback.stream.pipe(player.stdin); const [code, playSignal] = (await once(player, "exit")) as [number | null, NodeJS.Signals | null]; this.currentPlayer = null; if (signal.aborted) { return; } if (code !== 0) { throw new Error(`pw-play exited with code=${code ?? "null"} signal=${playSignal ?? "null"}`); } } private async playToWindowsDefaultSink(playback: PreparedSpeechAudio, signal: AbortSignal): Promise { const chunks: Buffer[] = []; await new Promise((resolve, reject) => { playback.stream.on("data", (chunk: Buffer) => { chunks.push(Buffer.from(chunk)); }); playback.stream.once("end", resolve); playback.stream.once("error", reject); signal.addEventListener( "abort", () => { playback.stream.destroy(); reject(new Error("playback aborted")); }, { once: true }, ); }).catch((error) => { if (signal.aborted) { return; } throw error; }); if (signal.aborted) { return; } const pcm = Buffer.concat(chunks); const wav = createWaveFileBuffer(pcm, 48000, 2, 16); const tempPath = path.join(os.tmpdir(), `realtime-voice-bot-${Date.now()}.wav`); await fs.writeFile(tempPath, wav); const psScript = [ "Add-Type -AssemblyName System;", `$player = New-Object System.Media.SoundPlayer('${tempPath.replace(/'/g, "''")}');`, "$player.PlaySync();", ].join(" "); const player = spawn("powershell", ["-NoProfile", "-Command", psScript], { stdio: ["ignore", "ignore", "pipe"], }); this.currentPlayer = player; player.stderr.on("data", (chunk: Buffer) => { const text = chunk.toString().trim(); if (text.length > 0) { this.options.logger.debug("[powershell-player]", text); } }); signal.addEventListener( "abort", () => { if (!player.killed) { player.kill("SIGKILL"); } }, { once: true }, ); const [code, playSignal] = (await once(player, "exit")) as [number | null, NodeJS.Signals | null]; this.currentPlayer = null; await fs.unlink(tempPath).catch(() => null); if (signal.aborted) { return; } if (code !== 0) { throw new Error(`powershell playback exited with code=${code ?? "null"} signal=${playSignal ?? "null"}`); } } private getFfmpegPath(): string { const ffmpegPath = ffmpegStatic as unknown as string | null; if (!ffmpegPath) { throw new Error("ffmpeg-static 경로를 찾지 못했습니다."); } return ffmpegPath; } private describeSink(): string { if (process.platform === "win32") { return this.options.config.LOCAL_AUDIO_SINK ?? "system-default"; } return this.options.config.LOCAL_AUDIO_SINK ?? "default"; } } function createWaveFileBuffer( pcm: Buffer, sampleRate: number, channels: number, bitsPerSample: number, ): Buffer { const header = Buffer.alloc(44); const byteRate = sampleRate * channels * (bitsPerSample / 8); const blockAlign = channels * (bitsPerSample / 8); header.write("RIFF", 0, 4, "ascii"); header.writeUInt32LE(36 + pcm.length, 4); header.write("WAVE", 8, 4, "ascii"); header.write("fmt ", 12, 4, "ascii"); header.writeUInt32LE(16, 16); header.writeUInt16LE(1, 20); header.writeUInt16LE(channels, 22); header.writeUInt32LE(sampleRate, 24); header.writeUInt32LE(byteRate, 28); header.writeUInt16LE(blockAlign, 32); header.writeUInt16LE(bitsPerSample, 34); header.write("data", 36, 4, "ascii"); header.writeUInt32LE(pcm.length, 40); return Buffer.concat([header, pcm]); }