import { useEffect, useRef, useState, useCallback } from 'react'; import { AudioConfig, ConnectionStatus, HardwareDevice } from '../types/audio'; export const useAudioPipeline = ( wsUrl: string, config: AudioConfig, onConfigSync: (sr: number, list: HardwareDevice[]) => void ) => { const [status, setStatus] = useState('disconnected'); const [rtt, setRtt] = useState(null); const [processingTime, setProcessingTime] = useState(null); const [isTalking, setIsTalking] = useState(false); const [isStreaming, setIsStreaming] = useState(false); const [playOutput, setPlayOutput] = useState(true); const socketRef = useRef(null); const audioCtxRef = useRef(null); const micStreamRef = useRef(null); const micSourceRef = useRef(null); const processorRef = useRef(null); const sampleRateRef = useRef(40000); // High-performance canvas rolling buffers const inputDisplayBuf = useRef(new Float32Array(4096)); const outputDisplayBuf = useRef(new Float32Array(4096)); const micAccumulator = useRef(new Float32Array(0)); // Playback scheduling & timing const sentTimestamps = useRef<{ id: number; sent: number }[]>([]); const nextPlaybackTime = useRef(0); const outputChunkQueue = useRef<{ data: Float32Array; startTime: number }[]>([]); // Function to stringify and sync configs const sendConfig = useCallback(() => { const socket = socketRef.current; if (!socket || socket.readyState !== WebSocket.OPEN) return; socket.send(JSON.stringify({ type: 'config', model_name: config.model_name, device: config.device, f0_method: config.f0_method, f0_up_key: config.f0_up_key, noise_gate: config.noise_gate, input_gain: config.input_gain, output_gain: config.output_gain, input_sr: audioCtxRef.current ? audioCtxRef.current.sampleRate : 44100, routing_mode: config.routing_mode, input_device: config.input_device, output_device: config.output_device, chunk_size: config.chunk_size })); }, [config]); // Decodes array buffers from Python server const handleServerAudio = useCallback((arrayBuffer: ArrayBuffer) => { if (!audioCtxRef.current) return; const now = performance.now(); if (sentTimestamps.current.length > 0) { const oldest = sentTimestamps.current.shift(); if (oldest) { setRtt(Math.round(now - oldest.sent)); } } const payload = new Float32Array(arrayBuffer); const procTime = payload[0]; const pcmData = payload.subarray(1); setProcessingTime(Math.max(0, Math.round(procTime))); const ctx = audioCtxRef.current; const audioBuf = ctx.createBuffer(1, pcmData.length, sampleRateRef.current); audioBuf.getChannelData(0).set(pcmData); const source = ctx.createBufferSource(); source.buffer = audioBuf; // Only route node to speaker output if user didn't mute local listening if (playOutput) { source.connect(ctx.destination); } // Precise schedule timelines const currentTime = ctx.currentTime; const duration = audioBuf.duration; const adaptiveBuf = Math.min(duration * 2.5, 0.50); if (nextPlaybackTime.current < currentTime) { nextPlaybackTime.current = currentTime + adaptiveBuf; } else if (nextPlaybackTime.current > currentTime + duration * 5.0) { nextPlaybackTime.current = currentTime + adaptiveBuf; // Latency Buster } const startSchedule = nextPlaybackTime.current; source.start(startSchedule); nextPlaybackTime.current += duration; // Queue for syncing waveform outputs outputChunkQueue.current.push({ data: pcmData, startTime: startSchedule }); while (outputChunkQueue.current.length > 0) { const c = outputChunkQueue.current[0]; if (c.startTime + c.data.length / sampleRateRef.current < ctx.currentTime - 2.0) { outputChunkQueue.current.shift(); } else break; } // Push output PCM samples to rolling display buffers const size = 4096; const display = outputDisplayBuf.current; if (pcmData.length >= size) { display.set(pcmData.slice(pcmData.length - size)); } else { display.copyWithin(0, pcmData.length); display.set(pcmData, size - pcmData.length); } }, [playOutput]); const disconnect = useCallback(() => { if (socketRef.current) { try { socketRef.current.close(); } catch (e) {} socketRef.current = null; } setStatus('disconnected'); }, []); const connect = useCallback(() => { disconnect(); setStatus('connecting'); try { const ws = new WebSocket(wsUrl); ws.binaryType = 'arraybuffer'; ws.onopen = () => { setStatus('connected'); socketRef.current = ws; sendConfig(); }; ws.onclose = () => { setStatus('disconnected'); socketRef.current = null; }; ws.onerror = () => { setStatus('disconnected'); socketRef.current = null; }; ws.onmessage = (event) => { if (typeof event.data === 'string') { try { const data = JSON.parse(event.data); if (data.type === 'config_success') { sampleRateRef.current = data.target_sr; } else if (data.type === 'init_devices') { onConfigSync(data.target_sr || 40000, data.devices || []); } else if (data.type === 'visualizer') { // Hardware mode visualizer data stream inputDisplayBuf.current.set(new Float32Array(data.input)); outputDisplayBuf.current.set(new Float32Array(data.output)); } } catch (e) { console.error('WS JSON parse error:', e); } } else if (event.data instanceof ArrayBuffer) { handleServerAudio(event.data); } }; } catch (e) { console.error('WS Connection failed:', e); setStatus('disconnected'); } }, [wsUrl, sendConfig, handleServerAudio, onConfigSync, disconnect]); const stopStream = useCallback(() => { setIsStreaming(false); setIsTalking(false); if (config.routing_mode === 'hardware') { const socket = socketRef.current; if (socket && socket.readyState === WebSocket.OPEN) { socket.send(JSON.stringify({ type: 'config', routing_mode: 'browser' // tells server hardware stream to stop })); } } if (micStreamRef.current) { micStreamRef.current.getTracks().forEach(t => t.stop()); micStreamRef.current = null; } if (micSourceRef.current) { micSourceRef.current.disconnect(); micSourceRef.current = null; } if (processorRef.current) { processorRef.current.disconnect(); processorRef.current = null; } micAccumulator.current = new Float32Array(0); setRtt(null); setProcessingTime(null); }, [config.routing_mode]); const startStream = useCallback(async () => { if (config.routing_mode === 'hardware') { setIsStreaming(true); sendConfig(); return; } if (!audioCtxRef.current) { audioCtxRef.current = new (window.AudioContext || (window as any).webkitAudioContext)({ latencyHint: 'interactive', }); } const ctx = audioCtxRef.current; if (ctx.state === 'suspended') { await ctx.resume(); } try { micStreamRef.current = await navigator.mediaDevices.getUserMedia({ audio: { echoCancellation: true, noiseSuppression: true, autoGainControl: true, }, }); micSourceRef.current = ctx.createMediaStreamSource(micStreamRef.current); processorRef.current = ctx.createScriptProcessor(4096, 1, 1); processorRef.current.onaudioprocess = (e) => { const inputData = e.inputBuffer.getChannelData(0); // Update input waveform display buffer const display = inputDisplayBuf.current; display.copyWithin(0, inputData.length); display.set(inputData, display.length - inputData.length); // Append to local accumulator const nextAcc = new Float32Array(micAccumulator.current.length + inputData.length); nextAcc.set(micAccumulator.current); nextAcc.set(inputData, micAccumulator.current.length); micAccumulator.current = nextAcc; const size = config.chunk_size; while (micAccumulator.current.length >= size) { const chunk = micAccumulator.current.slice(0, size); micAccumulator.current = micAccumulator.current.slice(size); // Simple RMS for Voice Activity Badge let sum = 0; for (let i = 0; i < chunk.length; i++) sum += chunk[i] * chunk[i]; const rms = Math.sqrt(sum / chunk.length); setIsTalking(rms > 0.005); // Stream raw float PCM bytes const ws = socketRef.current; if (ws && ws.readyState === WebSocket.OPEN) { const time = performance.now(); sentTimestamps.current.push({ id: time, sent: time }); ws.send(chunk.buffer); } } }; micSourceRef.current.connect(processorRef.current); processorRef.current.connect(ctx.destination); nextPlaybackTime.current = 0; setIsStreaming(true); } catch (e) { console.error('Failed to start microphone streaming:', e); alert('Microphone access failed: ' + (e instanceof Error ? e.message : String(e))); stopStream(); } }, [config.routing_mode, config.chunk_size, sendConfig, stopStream]); // Sync config whenever React props config changes useEffect(() => { sendConfig(); }, [config, sendConfig]); // Lifecycle cleanups useEffect(() => { return () => { disconnect(); stopStream(); if (audioCtxRef.current) { audioCtxRef.current.close().catch(() => {}); } }; }, [disconnect, stopStream]); return { status, rtt, processingTime, isTalking, isStreaming, playOutput, setPlayOutput, connect, disconnect, startStream, stopStream, inputBuffer: inputDisplayBuf, outputBuffer: outputDisplayBuf }; }; export default useAudioPipeline;