import { EventEmitter } from "node:events"; import prism from "prism-media"; import { RealTimeVAD } from "avr-vad"; import { AudioPlayerStatus, EndBehaviorType, NoSubscriberBehavior, VoiceConnectionStatus, createAudioPlayer, createAudioResource, entersState, joinVoiceChannel, StreamType, type AudioPlayer, type AudioReceiveStream, type VoiceConnection, } from "@discordjs/voice"; import type { Client, Guild, VoiceBasedChannel } from "discord.js"; import type { AppConfig } from "../config.js"; import { Logger } from "../logger.js"; import { float32ToPcm16Buffer, int16ArrayToFloat32, Stereo48kToMono16kDownsampler, takeFrame } from "./pcm.js"; import { ConversationMemory, type UserUtterance } from "../services/conversation.js"; import type { LlmService } from "../services/llm.js"; import type { SttService } from "../services/stt.js"; import type { PreparedSpeechAudio, TtsService } from "../services/tts.js"; interface GuildVoiceSessionOptions { client: Client; config: AppConfig; logger: Logger; guild: Guild; voiceChannel: VoiceBasedChannel; textChannelId?: string; stt: SttService; tts: TtsService; llm: LlmService; } interface SpeechJob { text: string; source: "assistant" | "manual"; } class UserAudioSession { private readonly downsampler = new Stereo48kToMono16kDownsampler(); private readonly pendingSamples: number[] = []; private readonly vad: RealTimeVAD; private processing = Promise.resolve(); private constructor( private readonly logger: Logger, private readonly speakerId: string, private readonly speakerName: string, private readonly receiveStream: AudioReceiveStream, private readonly decoder: NodeJS.ReadWriteStream & { destroy: () => void }, vad: RealTimeVAD, private readonly onSpeechEnd: (utterance: UserUtterance, audio: Float32Array) => void, ) { this.vad = vad; } static async create(options: { logger: Logger; speakerId: string; speakerName: string; receiveStream: AudioReceiveStream; decoder: NodeJS.ReadWriteStream & { destroy: () => void }; onSpeechStart: () => void; onSpeechEnd: (utterance: UserUtterance, audio: Float32Array) => void; }): Promise { const vadInstance = await RealTimeVAD.new({ model: "v5", sampleRate: 16000, frameSamples: 1536, positiveSpeechThreshold: 0.55, negativeSpeechThreshold: 0.35, redemptionFrames: 8, preSpeechPadFrames: 2, minSpeechFrames: 3, onFrameProcessed: () => undefined, onVADMisfire: () => undefined, onSpeechStart: () => { options.onSpeechStart(); }, onSpeechRealStart: () => undefined, onSpeechEnd: (audio: Float32Array) => { options.onSpeechEnd( { speakerId: options.speakerId, speakerName: options.speakerName, text: "", }, audio, ); }, }); const session = new UserAudioSession( options.logger, options.speakerId, options.speakerName, options.receiveStream, options.decoder, vadInstance, options.onSpeechEnd, ); session.decoder.on("data", (chunk: Buffer) => { session.pushPcmChunk(chunk); }); session.decoder.on("error", (error) => { options.logger.warn("PCM decoder error", options.speakerId, error); }); session.receiveStream.on("error", (error) => { options.logger.warn("Audio receive stream error", options.speakerId, error); }); return session; } private pushPcmChunk(chunk: Buffer): void { const mono16k = this.downsampler.pushStereo48kChunk(chunk); if (mono16k.length === 0) { return; } for (const sample of mono16k) { this.pendingSamples.push(sample); } while (true) { const frame = takeFrame(this.pendingSamples, 1536); if (!frame) { return; } const floatFrame = int16ArrayToFloat32(frame); this.processing = this.processing .then(() => this.vad.processAudio(floatFrame)) .catch((error) => { this.logger.warn("VAD frame processing failed", this.speakerId, this.speakerName, error); }); } } destroy(): void { this.receiveStream.destroy(); this.decoder.destroy(); void this.vad.destroy().catch((error) => { this.logger.warn("VAD destroy failed", this.speakerId, this.speakerName, error); }); } } export class GuildVoiceSession extends EventEmitter { readonly guildId: string; readonly voiceChannelId: string; private readonly connection: VoiceConnection; private readonly player: AudioPlayer; private readonly memory: ConversationMemory; private readonly trackedUsers = new Map(); private readonly pendingUsers = new Map>(); private readonly queue: SpeechJob[] = []; private draining = false; private currentAbortController: AbortController | null = null; private currentPlayback: PreparedSpeechAudio | null = null; private textChannelId?: string; private constructor(private readonly options: GuildVoiceSessionOptions) { super(); this.guildId = options.guild.id; this.voiceChannelId = options.voiceChannel.id; this.textChannelId = options.textChannelId; this.memory = new ConversationMemory(options.config.MAX_CONVERSATION_TURNS); this.player = createAudioPlayer({ behaviors: { noSubscriber: NoSubscriberBehavior.Pause, }, }); this.connection = joinVoiceChannel({ guildId: options.guild.id, channelId: options.voiceChannel.id, adapterCreator: options.guild.voiceAdapterCreator, selfDeaf: false, selfMute: false, }); } static async create(options: GuildVoiceSessionOptions): Promise { const session = new GuildVoiceSession(options); await session.initialize(); return session; } private async initialize(): Promise { this.player.on("error", (error) => { this.options.logger.warn("Audio player error", this.guildId, error); }); this.connection.on("stateChange", (_oldState, newState) => { if (newState.status === VoiceConnectionStatus.Destroyed) { this.options.logger.info("Voice connection destroyed", this.guildId); } }); this.connection.subscribe(this.player); await entersState(this.connection, VoiceConnectionStatus.Ready, 30_000); this.connection.receiver.speaking.on("start", (userId: string) => { if (userId === this.options.client.user?.id) { return; } void this.ensureTrackedUser(userId); }); } setTextChannel(textChannelId?: string): void { this.textChannelId = textChannelId; } clearConversation(): void { this.memory.clear(); this.interruptPlayback("conversation-reset"); } statusSummary(): string { const playbackState = this.player.state.status; return [ `세션 활성: 예`, `음성 채널: ${this.options.voiceChannel.name}`, `추적 유저 수: ${this.trackedUsers.size}`, `재생 상태: ${playbackState}`, `대기열: ${this.queue.length}`, `최근 대화 턴: ${this.memory.recentTurns().length}`, ].join("\n"); } async speakText(text: string): Promise { this.queue.push({ text, source: "manual", }); await this.drainQueue(); } interruptPlayback(reason: string): void { if (this.queue.length > 0 || this.player.state.status !== AudioPlayerStatus.Idle) { this.options.logger.info("Interrupting playback", this.guildId, reason); } this.queue.splice(0, this.queue.length); this.currentAbortController?.abort(); this.currentAbortController = null; this.currentPlayback?.dispose(); this.currentPlayback = null; this.player.stop(true); } async destroy(): Promise { this.interruptPlayback("session-destroy"); for (const session of this.trackedUsers.values()) { session.destroy(); } this.trackedUsers.clear(); this.pendingUsers.clear(); this.connection.destroy(); } private async ensureTrackedUser(userId: string): Promise { if (this.trackedUsers.has(userId)) { return; } const existing = this.pendingUsers.get(userId); if (existing) { await existing; return; } const pending = this.createTrackedUser(userId).finally(() => { this.pendingUsers.delete(userId); }); this.pendingUsers.set(userId, pending); await pending; } private async createTrackedUser(userId: string): Promise { const speakerName = await this.resolveSpeakerName(userId); const receiveStream = this.connection.receiver.subscribe(userId, { end: { behavior: EndBehaviorType.Manual, }, }); const decoder = new prism.opus.Decoder({ rate: 48000, channels: 2, frameSize: 960, }) as NodeJS.ReadWriteStream & { destroy: () => void }; receiveStream.pipe(decoder); const session = await UserAudioSession.create({ logger: this.options.logger, speakerId: userId, speakerName, receiveStream, decoder, onSpeechStart: () => { this.interruptPlayback(`barge-in:${speakerName}`); }, onSpeechEnd: (utterance, audio) => { void this.handleSpeechEnd(utterance, audio); }, }); this.trackedUsers.set(userId, session); this.options.logger.info("Tracking speaker", this.guildId, userId, speakerName); } private async resolveSpeakerName(userId: string): Promise { try { const user = await this.options.client.users.fetch(userId); return user.globalName ?? user.username; } catch { return `user-${userId.slice(-6)}`; } } private async handleSpeechEnd(utterance: UserUtterance, audio: Float32Array): Promise { if (audio.length < 16000 * 0.25) { return; } const pcmBuffer = float32ToPcm16Buffer(audio); let transcript: string | null = null; try { transcript = await this.options.stt.transcribePcm16(pcmBuffer); } catch (error) { this.options.logger.warn("STT failed", this.guildId, utterance.speakerId, error); await this.announce(`음성 인식 실패: ${utterance.speakerName}`); return; } if (!transcript || transcript.trim().length === 0) { return; } const hydratedUtterance: UserUtterance = { ...utterance, text: transcript.trim(), }; this.options.logger.info("Transcript committed", this.guildId, hydratedUtterance.speakerName, hydratedUtterance.text); if (this.options.config.DEBUG_TEXT_EVENTS) { await this.announce(`🗣️ ${hydratedUtterance.speakerName}: ${hydratedUtterance.text}`); } let reply: string; try { reply = await this.options.llm.generateReply(this.memory, hydratedUtterance); } catch (error) { this.options.logger.warn("LLM failed", this.guildId, utterance.speakerId, error); reply = "지금은 답변 생성에 실패했습니다. 잠시 후 다시 말씀해 주세요."; } this.memory.addUserTurn(hydratedUtterance); this.memory.addAssistantTurn(reply); if (this.options.config.DEBUG_TEXT_EVENTS) { await this.announce(`🤖 ${reply}`); } this.queue.push({ text: reply, source: "assistant", }); await this.drainQueue(); } private async drainQueue(): Promise { if (this.draining) { return; } this.draining = true; try { while (this.queue.length > 0) { const job = this.queue.shift(); if (!job) { continue; } const abortController = new AbortController(); this.currentAbortController = abortController; try { this.currentPlayback = await this.options.tts.preparePlayback(job.text, abortController.signal); } catch (error) { if (abortController.signal.aborted) { continue; } this.options.logger.warn("TTS synthesis failed", this.guildId, job.source, error); await this.announce("음성 출력 생성에 실패했습니다."); continue; } try { const resource = createAudioResource(this.currentPlayback.stream, { inputType: StreamType.Raw, }); this.player.play(resource); await entersState(this.player, AudioPlayerStatus.Playing, 20_000).catch(() => null); await entersState(this.player, AudioPlayerStatus.Idle, 300_000); } catch (error) { if (!abortController.signal.aborted) { this.options.logger.warn("Audio playback failed", this.guildId, error); } } finally { this.currentPlayback?.dispose(); this.currentPlayback = null; if (this.currentAbortController === abortController) { this.currentAbortController = null; } } } } finally { this.draining = false; } } private async announce(message: string): Promise { if (!this.textChannelId) { return; } const channel = await this.options.client.channels.fetch(this.textChannelId).catch(() => null); if (!channel?.isTextBased() || !("send" in channel) || typeof channel.send !== "function") { return; } await channel.send(message).catch(() => null); } }