feat: scaffold realtime Korean voice assistant bot

This commit is contained in:
2026-04-30 02:29:18 +09:00
commit 9dee708b64
15 changed files with 1574 additions and 0 deletions

View File

@@ -0,0 +1,452 @@
import { EventEmitter } from "node:events";
import prism from "prism-media";
import { RealTimeVAD } from "avr-vad";
import {
AudioPlayerStatus,
EndBehaviorType,
NoSubscriberBehavior,
VoiceConnectionStatus,
createAudioPlayer,
entersState,
joinVoiceChannel,
type AudioPlayer,
type AudioReceiveStream,
type VoiceConnection,
} from "@discordjs/voice";
import type { Client, Guild, VoiceBasedChannel } from "discord.js";
import type { AppConfig } from "../config.js";
import { Logger } from "../logger.js";
import { float32ToPcm16Buffer, int16ArrayToFloat32, Stereo48kToMono16kDownsampler, takeFrame } from "./pcm.js";
import { ConversationMemory, type UserUtterance } from "../services/conversation.js";
import { ElevenLabsSttService } from "../services/elevenlabs-stt.js";
import { ElevenLabsTtsService, type PreparedSpeechPlayback } from "../services/elevenlabs-tts.js";
import { OpenAiLlmService } from "../services/openai-llm.js";
interface GuildVoiceSessionOptions {
client: Client;
config: AppConfig;
logger: Logger;
guild: Guild;
voiceChannel: VoiceBasedChannel;
textChannelId?: string;
stt: ElevenLabsSttService;
tts: ElevenLabsTtsService;
llm: OpenAiLlmService;
}
interface SpeechJob {
text: string;
source: "assistant" | "manual";
}
class UserAudioSession {
private readonly downsampler = new Stereo48kToMono16kDownsampler();
private readonly pendingSamples: number[] = [];
private readonly vad: RealTimeVAD;
private processing = Promise.resolve();
private constructor(
private readonly logger: Logger,
private readonly speakerId: string,
private readonly speakerName: string,
private readonly receiveStream: AudioReceiveStream,
private readonly decoder: NodeJS.ReadWriteStream & { destroy: () => void },
vad: RealTimeVAD,
private readonly onSpeechEnd: (utterance: UserUtterance, audio: Float32Array) => void,
) {
this.vad = vad;
}
static async create(options: {
logger: Logger;
speakerId: string;
speakerName: string;
receiveStream: AudioReceiveStream;
decoder: NodeJS.ReadWriteStream & { destroy: () => void };
onSpeechStart: () => void;
onSpeechEnd: (utterance: UserUtterance, audio: Float32Array) => void;
}): Promise<UserAudioSession> {
const vadInstance = await RealTimeVAD.new({
model: "v5",
sampleRate: 16000,
frameSamples: 1536,
positiveSpeechThreshold: 0.55,
negativeSpeechThreshold: 0.35,
redemptionFrames: 8,
preSpeechPadFrames: 2,
minSpeechFrames: 3,
onFrameProcessed: () => undefined,
onVADMisfire: () => undefined,
onSpeechStart: () => {
options.onSpeechStart();
},
onSpeechRealStart: () => undefined,
onSpeechEnd: (audio: Float32Array) => {
options.onSpeechEnd(
{
speakerId: options.speakerId,
speakerName: options.speakerName,
text: "",
},
audio,
);
},
});
const session = new UserAudioSession(
options.logger,
options.speakerId,
options.speakerName,
options.receiveStream,
options.decoder,
vadInstance,
options.onSpeechEnd,
);
session.decoder.on("data", (chunk: Buffer) => {
session.pushPcmChunk(chunk);
});
session.decoder.on("error", (error) => {
options.logger.warn("PCM decoder error", options.speakerId, error);
});
session.receiveStream.on("error", (error) => {
options.logger.warn("Audio receive stream error", options.speakerId, error);
});
return session;
}
private pushPcmChunk(chunk: Buffer): void {
const mono16k = this.downsampler.pushStereo48kChunk(chunk);
if (mono16k.length === 0) {
return;
}
for (const sample of mono16k) {
this.pendingSamples.push(sample);
}
while (true) {
const frame = takeFrame(this.pendingSamples, 1536);
if (!frame) {
return;
}
const floatFrame = int16ArrayToFloat32(frame);
this.processing = this.processing
.then(() => this.vad.processAudio(floatFrame))
.catch((error) => {
this.logger.warn("VAD frame processing failed", this.speakerId, this.speakerName, error);
});
}
}
destroy(): void {
this.receiveStream.destroy();
this.decoder.destroy();
void this.vad.destroy().catch((error) => {
this.logger.warn("VAD destroy failed", this.speakerId, this.speakerName, error);
});
}
}
export class GuildVoiceSession extends EventEmitter {
readonly guildId: string;
readonly voiceChannelId: string;
private readonly connection: VoiceConnection;
private readonly player: AudioPlayer;
private readonly memory: ConversationMemory;
private readonly trackedUsers = new Map<string, UserAudioSession>();
private readonly pendingUsers = new Map<string, Promise<void>>();
private readonly queue: SpeechJob[] = [];
private draining = false;
private currentAbortController: AbortController | null = null;
private currentPlayback: PreparedSpeechPlayback | null = null;
private textChannelId?: string;
private constructor(private readonly options: GuildVoiceSessionOptions) {
super();
this.guildId = options.guild.id;
this.voiceChannelId = options.voiceChannel.id;
this.textChannelId = options.textChannelId;
this.memory = new ConversationMemory(options.config.MAX_CONVERSATION_TURNS);
this.player = createAudioPlayer({
behaviors: {
noSubscriber: NoSubscriberBehavior.Pause,
},
});
this.connection = joinVoiceChannel({
guildId: options.guild.id,
channelId: options.voiceChannel.id,
adapterCreator: options.guild.voiceAdapterCreator,
selfDeaf: false,
selfMute: false,
});
}
static async create(options: GuildVoiceSessionOptions): Promise<GuildVoiceSession> {
const session = new GuildVoiceSession(options);
await session.initialize();
return session;
}
private async initialize(): Promise<void> {
this.player.on("error", (error) => {
this.options.logger.warn("Audio player error", this.guildId, error);
});
this.connection.on("stateChange", (_oldState, newState) => {
if (newState.status === VoiceConnectionStatus.Destroyed) {
this.options.logger.info("Voice connection destroyed", this.guildId);
}
});
this.connection.subscribe(this.player);
await entersState(this.connection, VoiceConnectionStatus.Ready, 30_000);
this.connection.receiver.speaking.on("start", (userId: string) => {
if (userId === this.options.client.user?.id) {
return;
}
void this.ensureTrackedUser(userId);
});
}
setTextChannel(textChannelId?: string): void {
this.textChannelId = textChannelId;
}
clearConversation(): void {
this.memory.clear();
this.interruptPlayback("conversation-reset");
}
statusSummary(): string {
const playbackState = this.player.state.status;
return [
`세션 활성: 예`,
`음성 채널: ${this.options.voiceChannel.name}`,
`추적 유저 수: ${this.trackedUsers.size}`,
`재생 상태: ${playbackState}`,
`대기열: ${this.queue.length}`,
`최근 대화 턴: ${this.memory.recentTurns().length}`,
].join("\n");
}
async speakText(text: string): Promise<void> {
this.queue.push({
text,
source: "manual",
});
await this.drainQueue();
}
interruptPlayback(reason: string): void {
if (this.queue.length > 0 || this.player.state.status !== AudioPlayerStatus.Idle) {
this.options.logger.info("Interrupting playback", this.guildId, reason);
}
this.queue.splice(0, this.queue.length);
this.currentAbortController?.abort();
this.currentAbortController = null;
this.currentPlayback?.dispose();
this.currentPlayback = null;
this.player.stop(true);
}
async destroy(): Promise<void> {
this.interruptPlayback("session-destroy");
for (const session of this.trackedUsers.values()) {
session.destroy();
}
this.trackedUsers.clear();
this.pendingUsers.clear();
this.connection.destroy();
}
private async ensureTrackedUser(userId: string): Promise<void> {
if (this.trackedUsers.has(userId)) {
return;
}
const existing = this.pendingUsers.get(userId);
if (existing) {
await existing;
return;
}
const pending = this.createTrackedUser(userId).finally(() => {
this.pendingUsers.delete(userId);
});
this.pendingUsers.set(userId, pending);
await pending;
}
private async createTrackedUser(userId: string): Promise<void> {
const speakerName = await this.resolveSpeakerName(userId);
const receiveStream = this.connection.receiver.subscribe(userId, {
end: {
behavior: EndBehaviorType.Manual,
},
});
const decoder = new prism.opus.Decoder({
rate: 48000,
channels: 2,
frameSize: 960,
}) as NodeJS.ReadWriteStream & { destroy: () => void };
receiveStream.pipe(decoder);
const session = await UserAudioSession.create({
logger: this.options.logger,
speakerId: userId,
speakerName,
receiveStream,
decoder,
onSpeechStart: () => {
this.interruptPlayback(`barge-in:${speakerName}`);
},
onSpeechEnd: (utterance, audio) => {
void this.handleSpeechEnd(utterance, audio);
},
});
this.trackedUsers.set(userId, session);
this.options.logger.info("Tracking speaker", this.guildId, userId, speakerName);
}
private async resolveSpeakerName(userId: string): Promise<string> {
try {
const user = await this.options.client.users.fetch(userId);
return user.globalName ?? user.username;
} catch {
return `user-${userId.slice(-6)}`;
}
}
private async handleSpeechEnd(utterance: UserUtterance, audio: Float32Array): Promise<void> {
if (audio.length < 16000 * 0.25) {
return;
}
const pcmBuffer = float32ToPcm16Buffer(audio);
let transcript: string | null = null;
try {
transcript = await this.options.stt.transcribePcm16(pcmBuffer);
} catch (error) {
this.options.logger.warn("STT failed", this.guildId, utterance.speakerId, error);
await this.announce(`음성 인식 실패: ${utterance.speakerName}`);
return;
}
if (!transcript || transcript.trim().length === 0) {
return;
}
const hydratedUtterance: UserUtterance = {
...utterance,
text: transcript.trim(),
};
this.options.logger.info("Transcript committed", this.guildId, hydratedUtterance.speakerName, hydratedUtterance.text);
this.memory.addUserTurn(hydratedUtterance);
if (this.options.config.DEBUG_TEXT_EVENTS) {
await this.announce(`🗣️ ${hydratedUtterance.speakerName}: ${hydratedUtterance.text}`);
}
let reply: string;
try {
reply = await this.options.llm.generateReply(this.memory, hydratedUtterance);
} catch (error) {
this.options.logger.warn("LLM failed", this.guildId, utterance.speakerId, error);
reply = "지금은 답변 생성에 실패했습니다. 잠시 후 다시 말씀해 주세요.";
}
this.memory.addAssistantTurn(reply);
if (this.options.config.DEBUG_TEXT_EVENTS) {
await this.announce(`🤖 ${reply}`);
}
this.queue.push({
text: reply,
source: "assistant",
});
await this.drainQueue();
}
private async drainQueue(): Promise<void> {
if (this.draining) {
return;
}
this.draining = true;
try {
while (this.queue.length > 0) {
const job = this.queue.shift();
if (!job) {
continue;
}
const abortController = new AbortController();
this.currentAbortController = abortController;
try {
this.currentPlayback = await this.options.tts.preparePlayback(job.text, abortController.signal);
} catch (error) {
if (abortController.signal.aborted) {
continue;
}
this.options.logger.warn("TTS synthesis failed", this.guildId, job.source, error);
await this.announce("음성 출력 생성에 실패했습니다.");
continue;
}
try {
const resource = this.currentPlayback.resource;
this.player.play(resource);
await entersState(this.player, AudioPlayerStatus.Playing, 20_000).catch(() => null);
await entersState(this.player, AudioPlayerStatus.Idle, 300_000);
} catch (error) {
if (!abortController.signal.aborted) {
this.options.logger.warn("Audio playback failed", this.guildId, error);
}
} finally {
this.currentPlayback?.dispose();
this.currentPlayback = null;
if (this.currentAbortController === abortController) {
this.currentAbortController = null;
}
}
}
} finally {
this.draining = false;
}
}
private async announce(message: string): Promise<void> {
if (!this.textChannelId) {
return;
}
const channel = await this.options.client.channels.fetch(this.textChannelId).catch(() => null);
if (!channel?.isTextBased() || !("send" in channel) || typeof channel.send !== "function") {
return;
}
await channel.send(message).catch(() => null);
}
}