|
|
|
@@ -30,6 +30,10 @@ use fastwebsockets::Frame; |
|
|
|
use fastwebsockets::OpCode; |
|
|
|
use fastwebsockets::Payload; |
|
|
|
use fastwebsockets::WebSocketError; |
|
|
|
use futures_concurrency::future::Race; |
|
|
|
use futures_util::future; |
|
|
|
use futures_util::future::Either; |
|
|
|
use futures_util::FutureExt; |
|
|
|
use http_body_util::Empty; |
|
|
|
use hyper::body::Bytes; |
|
|
|
use hyper::body::Incoming; |
|
|
|
@@ -45,9 +49,7 @@ use std::fs; |
|
|
|
use std::io::{self, Write}; |
|
|
|
use std::net::IpAddr; |
|
|
|
use std::net::Ipv4Addr; |
|
|
|
use std::time::Duration; |
|
|
|
use tokio::net::TcpListener; |
|
|
|
|
|
|
|
#[derive(Serialize, Deserialize, Debug)] |
|
|
|
pub struct ErrorDetails { |
|
|
|
pub code: Option<String>, |
|
|
|
@@ -310,125 +312,153 @@ async fn handle_client(fut: upgrade::UpgradeFut) -> Result<(), WebSocketError> { |
|
|
|
let frame = Frame::text(payload); |
|
|
|
ws.write_frame(frame).await?; |
|
|
|
loop { |
|
|
|
let mut frame = ws.read_frame().await?; |
|
|
|
let event_fut = events.recv_async().map(Either::Left); |
|
|
|
let frame_fut = ws.read_frame().map(Either::Right); |
|
|
|
let event_stream = (event_fut, frame_fut).race(); |
|
|
|
let mut finished = false; |
|
|
|
match frame.opcode { |
|
|
|
OpCode::Close => break, |
|
|
|
OpCode::Text | OpCode::Binary => { |
|
|
|
let data: OpenAIRealtimeMessage = serde_json::from_slice(&frame.payload).unwrap(); |
|
|
|
|
|
|
|
match data { |
|
|
|
OpenAIRealtimeMessage::InputAudioBufferAppend { audio } => { |
|
|
|
// println!("Received audio data: {}", audio); |
|
|
|
let f32_data = audio; |
|
|
|
// Decode base64 encoded audio data |
|
|
|
let f32_data = f32_data.trim(); |
|
|
|
if f32_data.is_empty() { |
|
|
|
continue; |
|
|
|
} |
|
|
|
let frame = match event_stream.await { |
|
|
|
future::Either::Left(Some(ev)) => { |
|
|
|
let frame = match ev { |
|
|
|
dora_node_api::Event::Input { |
|
|
|
id, |
|
|
|
metadata: _, |
|
|
|
data, |
|
|
|
} => { |
|
|
|
if data.data_type() == &DataType::Utf8 { |
|
|
|
let data = data.as_string::<i32>(); |
|
|
|
let str = data.value(0); |
|
|
|
let serialized_data = |
|
|
|
OpenAIRealtimeResponse::ResponseAudioTranscriptDelta { |
|
|
|
response_id: "123".to_string(), |
|
|
|
item_id: "123".to_string(), |
|
|
|
output_index: 123, |
|
|
|
content_index: 123, |
|
|
|
delta: str.to_string(), |
|
|
|
}; |
|
|
|
|
|
|
|
if let Ok(f32_data) = general_purpose::STANDARD.decode(f32_data) { |
|
|
|
let f32_data = convert_pcm16_to_f32(&f32_data); |
|
|
|
// Downsample to 16 kHz from 24 kHz |
|
|
|
let f32_data = f32_data |
|
|
|
.into_iter() |
|
|
|
.enumerate() |
|
|
|
.filter(|(i, _)| i % 3 != 0) |
|
|
|
.map(|(_, v)| v) |
|
|
|
.collect::<Vec<f32>>(); |
|
|
|
let mut parameter = MetadataParameters::default(); |
|
|
|
parameter.insert( |
|
|
|
"sample_rate".to_string(), |
|
|
|
dora_node_api::Parameter::Integer(16000), |
|
|
|
); |
|
|
|
node.send_output( |
|
|
|
DataId::from("audio".to_string()), |
|
|
|
parameter, |
|
|
|
f32_data.into_arrow(), |
|
|
|
) |
|
|
|
.unwrap(); |
|
|
|
let ev = events.recv_async_timeout(Duration::from_millis(10)).await; |
|
|
|
|
|
|
|
// println!("Received event: {:?}", ev); |
|
|
|
let frame = match ev { |
|
|
|
Some(dora_node_api::Event::Input { |
|
|
|
id, |
|
|
|
metadata: _, |
|
|
|
data, |
|
|
|
}) => { |
|
|
|
if data.data_type() == &DataType::Utf8 { |
|
|
|
let data = data.as_string::<i32>(); |
|
|
|
let str = data.value(0); |
|
|
|
let serialized_data = |
|
|
|
OpenAIRealtimeResponse::ResponseAudioTranscriptDelta { |
|
|
|
response_id: "123".to_string(), |
|
|
|
item_id: "123".to_string(), |
|
|
|
output_index: 123, |
|
|
|
content_index: 123, |
|
|
|
delta: str.to_string(), |
|
|
|
}; |
|
|
|
|
|
|
|
frame.payload = Payload::Bytes( |
|
|
|
Bytes::from( |
|
|
|
serde_json::to_string(&serialized_data).unwrap(), |
|
|
|
) |
|
|
|
.into(), |
|
|
|
); |
|
|
|
frame.opcode = OpCode::Text; |
|
|
|
frame |
|
|
|
} else if id.contains("audio") { |
|
|
|
let data: Vec<f32> = into_vec(&data).unwrap(); |
|
|
|
let data = convert_f32_to_pcm16(&data); |
|
|
|
let serialized_data = |
|
|
|
OpenAIRealtimeResponse::ResponseAudioDelta { |
|
|
|
response_id: "123".to_string(), |
|
|
|
item_id: "123".to_string(), |
|
|
|
output_index: 123, |
|
|
|
content_index: 123, |
|
|
|
delta: general_purpose::STANDARD.encode(data), |
|
|
|
}; |
|
|
|
finished = true; |
|
|
|
|
|
|
|
frame.payload = Payload::Bytes( |
|
|
|
Bytes::from( |
|
|
|
serde_json::to_string(&serialized_data).unwrap(), |
|
|
|
) |
|
|
|
.into(), |
|
|
|
); |
|
|
|
frame.opcode = OpCode::Text; |
|
|
|
frame |
|
|
|
} else { |
|
|
|
unimplemented!() |
|
|
|
} |
|
|
|
} |
|
|
|
Some(dora_node_api::Event::Error(_)) => { |
|
|
|
// println!("Error in input: {}", s); |
|
|
|
continue; |
|
|
|
} |
|
|
|
_ => break, |
|
|
|
let frame = Frame::text(Payload::Bytes( |
|
|
|
Bytes::from(serde_json::to_string(&serialized_data).unwrap()) |
|
|
|
.into(), |
|
|
|
)); |
|
|
|
frame |
|
|
|
} else if id.contains("audio") { |
|
|
|
let data: Vec<f32> = into_vec(&data).unwrap(); |
|
|
|
let data = convert_f32_to_pcm16(&data); |
|
|
|
let serialized_data = OpenAIRealtimeResponse::ResponseAudioDelta { |
|
|
|
response_id: "123".to_string(), |
|
|
|
item_id: "123".to_string(), |
|
|
|
output_index: 123, |
|
|
|
content_index: 123, |
|
|
|
delta: general_purpose::STANDARD.encode(data), |
|
|
|
}; |
|
|
|
ws.write_frame(frame).await?; |
|
|
|
if finished { |
|
|
|
let serialized_data = OpenAIRealtimeResponse::ResponseDone { |
|
|
|
response: serde_json::Value::Null, |
|
|
|
finished = true; |
|
|
|
|
|
|
|
let frame = Frame::text(Payload::Bytes( |
|
|
|
Bytes::from(serde_json::to_string(&serialized_data).unwrap()) |
|
|
|
.into(), |
|
|
|
)); |
|
|
|
frame |
|
|
|
} else if id.contains("stop") { |
|
|
|
let serialized_data = |
|
|
|
OpenAIRealtimeResponse::InputAudioBufferSpeechStopped { |
|
|
|
audio_end_ms: 123, |
|
|
|
item_id: "123".to_string(), |
|
|
|
}; |
|
|
|
finished = true; |
|
|
|
|
|
|
|
let frame = Frame::text(Payload::Bytes( |
|
|
|
Bytes::from(serde_json::to_string(&serialized_data).unwrap()) |
|
|
|
.into(), |
|
|
|
)); |
|
|
|
frame |
|
|
|
} else { |
|
|
|
unimplemented!() |
|
|
|
} |
|
|
|
} |
|
|
|
dora_node_api::Event::Error(_) => { |
|
|
|
// println!("Error in input: {}", s); |
|
|
|
continue; |
|
|
|
} |
|
|
|
_ => break, |
|
|
|
}; |
|
|
|
Some(frame) |
|
|
|
} |
|
|
|
future::Either::Left(None) => break, |
|
|
|
future::Either::Right(Ok(frame)) => { |
|
|
|
match frame.opcode { |
|
|
|
OpCode::Close => break, |
|
|
|
OpCode::Text | OpCode::Binary => { |
|
|
|
let data: OpenAIRealtimeMessage = |
|
|
|
serde_json::from_slice(&frame.payload).unwrap(); |
|
|
|
|
|
|
|
match data { |
|
|
|
OpenAIRealtimeMessage::InputAudioBufferAppend { audio } => { |
|
|
|
// println!("Received audio data: {}", audio); |
|
|
|
let f32_data = audio; |
|
|
|
// Decode base64 encoded audio data |
|
|
|
let f32_data = f32_data.trim(); |
|
|
|
if f32_data.is_empty() { |
|
|
|
continue; |
|
|
|
} |
|
|
|
|
|
|
|
let payload = Payload::Bytes( |
|
|
|
Bytes::from(serde_json::to_string(&serialized_data).unwrap()) |
|
|
|
.into(), |
|
|
|
); |
|
|
|
println!("Sending response done: {:?}", serialized_data); |
|
|
|
let frame = Frame::text(payload); |
|
|
|
ws.write_frame(frame).await?; |
|
|
|
if let Ok(f32_data) = general_purpose::STANDARD.decode(f32_data) { |
|
|
|
let f32_data = convert_pcm16_to_f32(&f32_data); |
|
|
|
// Downsample to 16 kHz from 24 kHz |
|
|
|
let f32_data = f32_data |
|
|
|
.into_iter() |
|
|
|
.enumerate() |
|
|
|
.filter(|(i, _)| i % 3 != 0) |
|
|
|
.map(|(_, v)| v) |
|
|
|
.collect::<Vec<f32>>(); |
|
|
|
|
|
|
|
let mut parameter = MetadataParameters::default(); |
|
|
|
parameter.insert( |
|
|
|
"sample_rate".to_string(), |
|
|
|
dora_node_api::Parameter::Integer(16000), |
|
|
|
); |
|
|
|
node.send_output( |
|
|
|
DataId::from("audio".to_string()), |
|
|
|
parameter, |
|
|
|
f32_data.into_arrow(), |
|
|
|
) |
|
|
|
.unwrap(); |
|
|
|
} |
|
|
|
} |
|
|
|
OpenAIRealtimeMessage::InputAudioBufferCommit => break, |
|
|
|
OpenAIRealtimeMessage::ResponseCreate { response } => { |
|
|
|
if let Some(text) = response.instructions { |
|
|
|
node.send_output( |
|
|
|
DataId::from("text".to_string()), |
|
|
|
Default::default(), |
|
|
|
text.into_arrow(), |
|
|
|
) |
|
|
|
.unwrap(); |
|
|
|
} |
|
|
|
} |
|
|
|
_ => {} |
|
|
|
} |
|
|
|
} |
|
|
|
OpenAIRealtimeMessage::InputAudioBufferCommit => break, |
|
|
|
_ => {} |
|
|
|
_ => break, |
|
|
|
} |
|
|
|
None |
|
|
|
} |
|
|
|
_ => break, |
|
|
|
future::Either::Right(Err(_)) => break, |
|
|
|
}; |
|
|
|
if let Some(frame) = frame { |
|
|
|
ws.write_frame(frame).await?; |
|
|
|
} |
|
|
|
if finished { |
|
|
|
let serialized_data = OpenAIRealtimeResponse::ResponseDone { |
|
|
|
response: serde_json::Value::Null, |
|
|
|
}; |
|
|
|
|
|
|
|
let payload = Payload::Bytes( |
|
|
|
Bytes::from(serde_json::to_string(&serialized_data).unwrap()).into(), |
|
|
|
); |
|
|
|
println!("Sending response done: {:?}", serialized_data); |
|
|
|
let frame = Frame::text(payload); |
|
|
|
ws.write_frame(frame).await?; |
|
|
|
}; |
|
|
|
} |
|
|
|
|
|
|
|
Ok(()) |
|
|
|
|