Harden loopback worker shutdown
This commit is contained in:
69
src/index.ts
69
src/index.ts
@@ -12,6 +12,52 @@ async function runLoopback(): Promise<void> {
|
||||
const config = loadConfig();
|
||||
const logger = new Logger(config.LOG_LEVEL);
|
||||
const stt = new FasterWhisperSttService(config, logger);
|
||||
let capture = null as ReturnType<typeof spawnLoopbackCapture> | null;
|
||||
let shuttingDown: Promise<void> | null = null;
|
||||
|
||||
const shutdown = async (exitCode: number, reason: string, error?: unknown): Promise<void> => {
|
||||
if (shuttingDown) {
|
||||
return await shuttingDown;
|
||||
}
|
||||
|
||||
shuttingDown = (async () => {
|
||||
if (error) {
|
||||
logger.error(`Shutting down: ${reason}`, error);
|
||||
} else {
|
||||
logger.info("Shutting down", reason);
|
||||
}
|
||||
|
||||
if (capture && !capture.killed && capture.exitCode === null) {
|
||||
capture.kill("SIGTERM");
|
||||
}
|
||||
|
||||
await stt.destroy().catch((destroyError) => {
|
||||
logger.warn("STT destroy failed", destroyError);
|
||||
});
|
||||
})();
|
||||
|
||||
await shuttingDown;
|
||||
process.exit(exitCode);
|
||||
};
|
||||
|
||||
process.once("SIGINT", () => {
|
||||
void shutdown(0, "SIGINT");
|
||||
});
|
||||
process.once("SIGTERM", () => {
|
||||
void shutdown(0, "SIGTERM");
|
||||
});
|
||||
process.once("uncaughtException", (error) => {
|
||||
void shutdown(1, "uncaughtException", error);
|
||||
});
|
||||
process.once("unhandledRejection", (reason) => {
|
||||
void shutdown(1, "unhandledRejection", reason);
|
||||
});
|
||||
process.once("exit", () => {
|
||||
if (capture && !capture.killed && capture.exitCode === null) {
|
||||
capture.kill("SIGKILL");
|
||||
}
|
||||
void stt.destroy();
|
||||
});
|
||||
|
||||
await stt.warmup();
|
||||
|
||||
@@ -53,7 +99,7 @@ async function runLoopback(): Promise<void> {
|
||||
},
|
||||
});
|
||||
|
||||
const capture = spawnLoopbackCapture(config, logger);
|
||||
capture = spawnLoopbackCapture(config, logger);
|
||||
capture.stdout.on("data", (chunk: Buffer) => {
|
||||
segmenter.pushChunk(chunk);
|
||||
});
|
||||
@@ -63,29 +109,20 @@ async function runLoopback(): Promise<void> {
|
||||
logger.debug("[capture]", text);
|
||||
}
|
||||
});
|
||||
capture.on("error", (error) => {
|
||||
void shutdown(1, "capture-error", error);
|
||||
});
|
||||
capture.on("exit", (code, signal) => {
|
||||
logger.warn("capture exited", { code, signal });
|
||||
if (!shuttingDown) {
|
||||
void shutdown(1, "capture-exit");
|
||||
}
|
||||
});
|
||||
|
||||
console.log("실시간 출력장치 STT를 시작합니다. Ctrl+C 로 종료합니다.");
|
||||
console.log(`source: ${config.AUDIO_SOURCE ?? "unset"}`);
|
||||
console.log(`model: ${config.WHISPER_MODEL}`);
|
||||
console.log(`language: ${config.WHISPER_LANGUAGE}`);
|
||||
|
||||
const shutdown = async (): Promise<void> => {
|
||||
if (!capture.killed) {
|
||||
capture.kill("SIGTERM");
|
||||
}
|
||||
await stt.destroy();
|
||||
process.exit(0);
|
||||
};
|
||||
|
||||
process.on("SIGINT", () => {
|
||||
void shutdown();
|
||||
});
|
||||
process.on("SIGTERM", () => {
|
||||
void shutdown();
|
||||
});
|
||||
}
|
||||
|
||||
async function main(): Promise<void> {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process";
|
||||
import { once } from "node:events";
|
||||
import { createInterface } from "node:readline";
|
||||
|
||||
import type { AppConfig } from "../config.js";
|
||||
@@ -23,6 +24,7 @@ function isFailure<T>(value: RpcResponse<T>): value is RpcFailure {
|
||||
|
||||
export class PythonJsonWorker {
|
||||
private processRef: ChildProcessWithoutNullStreams | null = null;
|
||||
private shuttingDown = false;
|
||||
private readonly pending = new Map<
|
||||
string,
|
||||
{
|
||||
@@ -43,12 +45,16 @@ export class PythonJsonWorker {
|
||||
if (this.processRef) {
|
||||
return;
|
||||
}
|
||||
if (this.shuttingDown) {
|
||||
throw new Error(`${this.logPrefix} worker is shutting down`);
|
||||
}
|
||||
|
||||
const { command, args } = await resolveWorkerPythonCommand(this.config);
|
||||
const scriptPath = resolveWorkerScript(this.scriptName);
|
||||
|
||||
this.processRef = spawn(command, [...args, scriptPath], {
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
windowsHide: true,
|
||||
env: {
|
||||
...process.env,
|
||||
WHISPER_MODEL: this.config.WHISPER_MODEL,
|
||||
@@ -75,6 +81,10 @@ export class PythonJsonWorker {
|
||||
}
|
||||
});
|
||||
|
||||
this.processRef.stdin.on("error", (error) => {
|
||||
this.logger.debug(`${this.logPrefix} stdin error`, error);
|
||||
});
|
||||
|
||||
this.processRef.on("exit", (code, signal) => {
|
||||
const error = new Error(`${this.logPrefix} worker exited code=${code ?? "null"} signal=${signal ?? "null"}`);
|
||||
for (const entry of this.pending.values()) {
|
||||
@@ -114,8 +124,34 @@ export class PythonJsonWorker {
|
||||
if (!this.processRef) {
|
||||
return;
|
||||
}
|
||||
this.processRef.kill("SIGTERM");
|
||||
|
||||
const child = this.processRef;
|
||||
this.shuttingDown = true;
|
||||
|
||||
try {
|
||||
child.stdin.end();
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
|
||||
if (!child.killed && child.exitCode === null) {
|
||||
child.kill("SIGTERM");
|
||||
}
|
||||
|
||||
const timedWait = Promise.race([
|
||||
once(child, "exit"),
|
||||
new Promise<null>((resolve) => setTimeout(() => resolve(null), 1500)),
|
||||
]);
|
||||
|
||||
await timedWait;
|
||||
|
||||
if (child.exitCode === null && !child.killed) {
|
||||
child.kill("SIGKILL");
|
||||
await once(child, "exit").catch(() => null);
|
||||
}
|
||||
|
||||
this.processRef = null;
|
||||
this.shuttingDown = false;
|
||||
}
|
||||
|
||||
private handleStdoutLine(line: string): void {
|
||||
|
||||
Reference in New Issue
Block a user