diff --git a/Cargo.toml b/Cargo.toml index ad5bac2..267e999 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,6 @@ futures-lite = "1.11.3" log = "0.4.14" env_logger = "0.8.3" regex = "1.4.3" +async-osc = "0.2.0" +tide = "0.16.0" +tide-websockets = "0.2.0" diff --git a/src/lib.rs b/src/lib.rs index e340705..716d026 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -pub mod osc; pub mod spawn; pub mod switcher; -pub mod udp; +pub mod types; +pub mod ws; diff --git a/src/main.rs b/src/main.rs index f59ccc2..ec49ae6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,19 @@ +use async_std::channel::{self, Receiver, Sender}; use async_std::stream::StreamExt; +use async_std::sync::{Arc, Mutex, RwLock}; +use async_std::task::{self, JoinHandle}; use log::*; -use rosc::{OscMessage, OscPacket, OscType}; - -use studiox::osc::OscUdpStream; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::net::SocketAddr; use studiox::spawn::FaustHandle; -use studiox::switcher::{SwitcherError, SwitcherEvent, SwitcherHandle}; +use studiox::switcher::{SwitcherEvent, SwitcherHandle}; + +use async_osc::{prelude::*, OscMessage, OscPacket, OscSender, OscSocket, OscType as Ty}; pub enum Event { - FaustRx(Result), + FaustRx(Result<(OscPacket, SocketAddr), async_osc::Error>), Switcher(OscMessage), - CommandRx(Result), + CommandRx(Result<(OscPacket, SocketAddr), async_osc::Error>), } #[async_std::main] @@ -18,11 +22,11 @@ async fn main() -> anyhow::Result<()> { let faust_handle = FaustHandle::spawn().await; match faust_handle.as_ref() { - Ok(handle) => { - info!("FAUST mixer launched: {:?}", handle) + Ok(_handle) => { + info!("FAUST mixer launched successfully.") } Err(e) => { - error!("Launching FAUST mixer failed: {:?}", e); + error!("Launching FAUST mixer failed: {}", e); } } let mut faust_handle = faust_handle?; @@ -43,14 +47,23 @@ async fn main_loop(faust_handle: &FaustHandle) -> anyhow::Result<()> { let addr_commands = format!("localhost:{}", port_commands); let mut switcher = SwitcherHandle::run(); - let switcher_rx = switcher.take_receiver().unwrap().map(Event::Switcher); + let switcher_events = switcher.take_receiver().unwrap().map(Event::Switcher); - let faust_rx = OscUdpStream::bind(addr_faust_rx).await?.map(Event::FaustRx); - let command_osc_rx = OscUdpStream::bind(addr_commands).await?; - let command_osc_tx = command_osc_rx.sender(); - let command_osc_rx = command_osc_rx.map(Event::CommandRx); + let (faust_recv_tx, faust_recv_rx) = channel::unbounded(); - let mut events = faust_rx.merge(switcher_rx).merge(command_osc_rx); + let faust_osc_socket = OscSocket::bind(addr_faust_rx).await?; + let faust_osc_events = faust_osc_socket.map(Event::FaustRx); + let command_osc_socket = OscSocket::bind(addr_commands).await?; + let command_osc_sender = command_osc_socket.sender(); + let command_osc_events = command_osc_socket.map(Event::CommandRx); + + let faust_sender = OscSocket::bind("localhost:0").await?; + faust_sender.connect(&addr_faust_tx).await?; + let ws_to_faust_task = task::spawn(ws_to_faust_loop(faust_sender.sender(), faust_recv_rx)); + + let mut events = faust_osc_events + .merge(switcher_events) + .merge(command_osc_events); info!("Listening for OSC commands on localhost:{}", port_commands); while let Some(event) = events.next().await { @@ -58,15 +71,21 @@ async fn main_loop(faust_handle: &FaustHandle) -> anyhow::Result<()> { Event::Switcher(message) => { debug!("switcher tx: {:?}", message); let packet = OscPacket::Message(message); - command_osc_tx.send_to(packet, &addr_faust_tx).await?; + command_osc_sender.send_to(packet, &addr_faust_tx).await?; } Event::FaustRx(packet) => { debug!("faust rx: {:?}", packet); - let _packet = packet?; + let (packet, _peer_addr) = packet?; + match packet { + OscPacket::Message(message) => { + faust_recv_tx.send(message).await?; + } + _ => {} + } } Event::CommandRx(packet) => { debug!("command rx: {:?}", packet); - let packet = packet?; + let (packet, _peer_addr) = packet?; let event = parse_switcher_command(packet); if let Some(event) = event { switcher.send(event).await?; @@ -76,14 +95,173 @@ async fn main_loop(faust_handle: &FaustHandle) -> anyhow::Result<()> { } switcher.join().await?; + ws_to_faust_task.await?; // child_task.await?; Ok(()) } +struct PeakThrottler { + peaks: HashMap, +} + +impl PeakThrottler { + fn new() -> Self { + Self { + peaks: HashMap::new(), + } + } + + fn encode(&self) -> Vec { + let mut buf = vec![0u8; 48 * 2]; + for offset in 0..47 { + let val = self.peaks.get(&offset).unwrap_or(&0); + let bytes = val.to_be_bytes(); + let start = offset as usize * 2; + let end = start + 2; + // let slice = &mut buf[start..end]; + buf[start..end].copy_from_slice(&bytes[..]); + } + buf + } + + fn send(&mut self, message: &OscMessage) -> bool { + let num_in = 8; + let num_out = 8; + let num_bus = 4; + // peak map: + // 8 ins, 8 out, 4 * 8 bus ins + // = 48 slots + // each slot: 1 byte uint8 (0-256) + // total: 48 bytes + + let parts: Vec<&str> = message.addr.split("/").skip(2).collect(); + let indexes = match &parts[..] { + &["in", idx, "level"] => { + let idx: u32 = idx.parse().unwrap(); + Some((0, idx)) + } + &["out", idx, "level"] => { + let idx: u32 = idx.parse().unwrap(); + Some((1, idx)) + } + &["bus", bus, "ch", idx, "level"] => { + let bus: u32 = bus.parse().unwrap(); + let idx: u32 = idx.parse().unwrap(); + Some((2 + bus, idx)) + } + _ => None, + }; + if let Some(indexes) = indexes { + if let Some(Ty::Float(peak)) = message.args.get(0) { + let val: f32 = (*peak + 70.) * 800.; + let val: u16 = val.round() as u16; + let offset: u16 = indexes.0 as u16 * 8u16 + indexes.1 as u16; + self.peaks.insert(offset, val); + // eprintln!("off {} val {}", offset, val); + return true; + } + } + false + } +} + +async fn ws_to_faust_loop( + faust_sender: OscSender, + mut faust_recv_rx: Receiver, +) -> Result<(), async_osc::Error> { + use studiox::ws::WsEvent; + use tide_websockets::Message as WsMessage; + let mut ws_handle = studiox::ws::start(); + let ws_sender = ws_handle.sender(); + let peers = Arc::new(RwLock::new(HashSet::new())); + let peaks = Arc::new(Mutex::new(PeakThrottler::new())); + let ws_rx_task = task::spawn({ + let peers = peers.clone(); + async move { + while let Some(event) = ws_handle.next().await { + // let (message, peer_addr) = event; + match event { + WsEvent::Connect(peer_addr) => { + peers.write().await.insert(peer_addr); + } + WsEvent::Disconnect(peer_addr) => { + peers.write().await.remove(&peer_addr); + } + WsEvent::Message(WsMessage::Binary(buf), peer_addr) => { + let osc_message = rosc::decoder::decode(&buf)?; + eprintln!("recv: {:?}", osc_message); + faust_sender.send(osc_message).await?; + eprintln!("did forward."); + } + _ => {} + }; + // eprintln!("recv: {:?}", event); + } + Ok::<(), async_osc::Error>(()) + } + }); + + let send_peaks_task: JoinHandle> = task::spawn({ + let sleep_time = 200; + let peaks = peaks.clone(); + let peers = peers.clone(); + let ws_sender = ws_sender.clone(); + async move { + loop { + timeout(sleep_time).await; + let buf = peaks.lock().await.encode(); + let message = OscMessage { + addr: "/peaks".into(), + args: vec![Ty::Blob(buf)], + }; + let message = rosc::encoder::encode(&OscPacket::Message(message))?; + let message = WsMessage::Binary(message); + let peers = peers.read().await; + for peer in peers.iter() { + ws_sender + .send((message.clone(), peer.clone())) + .await + .unwrap(); + } + } + } + }); + + let ws_tx_task = task::spawn(async move { + while let Some(message) = faust_recv_rx.next().await { + let is_peak = peaks.lock().await.send(&message); + if !is_peak { + let buf = rosc::encoder::encode(&OscPacket::Message(message))?; + let ws_message = WsMessage::Binary(buf); + let peers = peers.read().await; + for peer in peers.iter() { + ws_sender + .send((ws_message.clone(), peer.clone())) + .await + .unwrap(); + } + } + } + Ok::<(), async_osc::Error>(()) + }); + ws_rx_task.await?; + ws_tx_task.await?; + send_peaks_task.await?; + Ok(()) +} + +async fn timeout(ms: u64) { + let _ = async_std::future::timeout( + std::time::Duration::from_millis(ms), + futures_lite::future::pending::<()>(), + ) + .await; +} + fn parse_switcher_command(packet: OscPacket) -> Option { let event = match packet { - OscPacket::Message(message) => match (&message.addr as &str, &message.args[..]) { - ("/switcher", [OscType::Int(channel_id), OscType::Int(on_value)]) => match on_value { + OscPacket::Message(message) => match message.as_tuple() { + ("/switcher", [Ty::Int(channel_id), Ty::Int(on_value)]) => match on_value { 1 => Some(SwitcherEvent::Enable(*channel_id as usize)), 0 => Some(SwitcherEvent::Disable(*channel_id as usize)), _ => { diff --git a/src/mixer.rs b/src/mixer.rs deleted file mode 100644 index 8b19384..0000000 --- a/src/mixer.rs +++ /dev/null @@ -1,9 +0,0 @@ -pub type ChannelId = usize; -pub type BusId = usize; -pub type OutputId = usize; - -pub enum MixerCommand { - Volume(ChannelId, f32), -} - -pub fn on_command() {} diff --git a/src/osc.rs b/src/osc.rs deleted file mode 100644 index 11688e5..0000000 --- a/src/osc.rs +++ /dev/null @@ -1,155 +0,0 @@ -use async_std::net::{ToSocketAddrs, UdpSocket}; -use async_std::stream::Stream; -use futures_lite::ready; -use rosc::{OscMessage, OscPacket, OscType}; -use std::io; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use crate::switcher::SwitcherError; -use crate::udp::UdpStream; - -pub struct OscUdpStream { - stream: UdpStream, -} - -impl OscUdpStream { - pub fn new(socket: UdpSocket) -> Self { - let socket = Arc::new(socket); - let stream = UdpStream::new(socket); - - Self { stream } - } - - pub async fn bind(addr: A) -> Result { - let socket = UdpSocket::bind(addr).await?; - Ok(Self::new(socket)) - } - - pub async fn send_to( - &self, - packet: OscPacket, - addrs: A, - ) -> Result<(), SwitcherError> { - send_osc(&self.stream.get_ref(), packet, addrs).await - } - - pub fn sender(&self) -> OscSender { - OscSender::new(self.stream.clone_socket()) - } -} -impl Stream for OscUdpStream { - type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let packet = ready!(Pin::new(&mut self.stream).poll_next(cx)); - let message = match packet { - None => None, - Some(packet) => Some(match packet { - Err(err) => Err(err.into()), - Ok(buf) => rosc::decoder::decode(&buf.0[..]).map_err(|e| e.into()), - }), - }; - Poll::Ready(message) - } -} - -#[derive(Clone)] -pub struct OscSender { - socket: Arc, -} - -impl OscSender { - pub fn new(socket: Arc) -> Self { - Self { socket } - } - - pub async fn send_to( - &self, - packet: OscPacket, - addrs: A, - ) -> Result<(), SwitcherError> { - send_osc(&self.socket, packet, addrs).await - } -} - -async fn send_osc( - socket: &UdpSocket, - packet: OscPacket, - addrs: A, -) -> Result<(), SwitcherError> { - let buf = rosc::encoder::encode(&packet)?; - let n = socket.send_to(&buf[..], addrs).await?; - if n != buf.len() { - Err(io::Error::new(io::ErrorKind::Interrupted, "udp packet was not fully sent").into()) - } else { - Ok(()) - } -} - -pub trait OscMessageExt { - fn new(addr: impl ToString, args: T) -> OscMessage - where - T: IntoOscArgs; -} - -impl OscMessageExt for OscMessage { - fn new(addr: impl ToString, args: T) -> OscMessage - where - T: IntoOscArgs, - { - let args = args.into_osc_args(); - let addr = addr.to_string(); - OscMessage { addr, args } - } -} - -pub trait IntoOscArgs { - fn into_osc_args(self) -> Vec; -} - -impl IntoOscArgs for Vec -where - T: Into, -{ - fn into_osc_args(self) -> Vec { - let args: Vec = self.into_iter().map(|a| a.into()).collect(); - args - } -} - -impl IntoOscArgs for (T1,) -where - T1: Into, -{ - fn into_osc_args(self) -> Vec { - vec![self.0.into()] - } -} - -impl IntoOscArgs for (T1, T2) -where - T1: Into, - T2: Into, -{ - fn into_osc_args(self) -> Vec { - vec![self.0.into(), self.1.into()] - } -} - -impl IntoOscArgs for (T1, T2, T3) -where - T1: Into, - T2: Into, - T3: Into, -{ - fn into_osc_args(self) -> Vec { - vec![self.0.into(), self.1.into(), self.2.into()] - } -} - -impl IntoOscArgs for OscType { - fn into_osc_args(self) -> Vec { - vec![self] - } -} diff --git a/src/spawn.rs b/src/spawn.rs index ba6393d..35ed0b7 100644 --- a/src/spawn.rs +++ b/src/spawn.rs @@ -16,7 +16,7 @@ pub struct FaustHandle { impl FaustHandle { pub async fn spawn() -> Result { - let exec = "/home/bit/Code/RDL/studiox/studiox-next/mixer-jackconsole"; + let exec = "/home/bit/Code/studiox/studiox-mixer/mixer-jackconsole"; let mut child = Command::new(exec) .arg(".") .stdout(Stdio::piped()) diff --git a/src/switcher.rs b/src/switcher.rs index dbd259c..dbbb1fe 100644 --- a/src/switcher.rs +++ b/src/switcher.rs @@ -5,7 +5,7 @@ use rosc::OscMessage; use std::collections::VecDeque; use thiserror::Error; -use crate::osc::{IntoOscArgs, OscMessageExt}; +use async_osc::prelude::*; #[derive(Error, Debug)] pub enum SwitcherError { @@ -125,7 +125,7 @@ impl SwitcherState { let channel = ChannelState::new(i); self.channels.push(channel); } - // self.messages.push("/*", ("xmit", 1)); + self.messages.push("/*", ("xmit", 1)); self.messages.push("/mixer/out/0/bus/0/on", (1.0,)); self.messages.push("/mixer/bus/0/ch/0/on", (1.0,)); self.update_state(); diff --git a/src/types.rs b/src/types.rs new file mode 100644 index 0000000..0374b74 --- /dev/null +++ b/src/types.rs @@ -0,0 +1,75 @@ +use async_osc::{prelude::OscMessageExt, OscMessage, OscType}; + +pub type Id = u32; +pub type ChannelId = u32; +pub type InputId = u32; +pub type OutputId = u32; +pub type BusId = u32; + +pub enum Command { + ChannelLabel(u32, String), + BusChannelVolume(u32, u32, f32), + BusChannelActive(u32, u32, bool), +} + +pub enum Event { + Peaks(Vec), + ChannelLabel(u32, String), +} + +impl Event { + pub fn into_osc_message(self, prefix: String) -> Option { + match self { + Self::ChannelLabel(id, label) => { + let path = format!("{}/channel/{}/label", prefix, id); + Some(OscMessage::new(path, vec![OscType::String(label)])) + } + _ => None, + } + } + + pub fn from_osc_message(prefix: String, message: OscMessage) -> Option { + None + } +} + +pub struct State {} + +pub struct Bus { + id: BusId, + channels: Vec, +} + +pub struct Channel { + id: ChannelId, + input: InputId, + bus: BusId, + label: String, + active: bool, + peak: f32, + volume: f32, +} + +pub struct Mixer { + buses: Vec, + channels: Vec, +} + +impl Mixer { + pub fn emit(&mut self, event: Event) {} + pub fn save_state(&mut self) {} + pub fn command(&mut self, command: Command) { + match command { + Command::ChannelLabel(id, label) => { + if let Some(mut channel) = self.channels.get_mut(id as usize) { + channel.label = label.clone(); + self.emit(Event::ChannelLabel(id, label)); + self.save_state(); + } + } + _ => {} + } + } +} + +pub struct Output {} diff --git a/src/udp.rs b/src/udp.rs deleted file mode 100644 index bc71e30..0000000 --- a/src/udp.rs +++ /dev/null @@ -1,76 +0,0 @@ -use async_std::net::{ToSocketAddrs, UdpSocket}; -use async_std::stream::Stream; -use futures_lite::future::Future; -use futures_lite::ready; -use std::io::Result; -use std::net::SocketAddr; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -pub struct UdpStream { - socket: Arc, - fut: Option, usize, SocketAddr)>> + Send + Sync>>>, - buf: Option>, -} - -impl UdpStream { - pub fn new(socket: Arc) -> Self { - let buf = vec![0u8; 1024 * 64]; - Self { - socket, - fut: None, - buf: Some(buf), - } - } - - pub fn get_ref(&self) -> &UdpSocket { - &self.socket - } - - pub fn clone_socket(&self) -> Arc { - self.socket.clone() - } - - pub async fn send_to(&self, buf: &[u8], addrs: A) -> Result { - self.socket.send_to(buf, addrs).await - } -} - -impl Stream for UdpStream { - type Item = Result<(Vec, SocketAddr)>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - loop { - if self.fut.is_none() { - let buf = self.buf.take().unwrap(); - let fut = recv_next(self.socket.clone(), buf); - self.fut = Some(Box::pin(fut)); - } - - if let Some(f) = &mut self.fut { - let res = ready!(f.as_mut().poll(cx)); - self.fut = None; - return match res { - Err(e) => Poll::Ready(Some(Err(e))), - Ok((buf, n, addr)) => { - let res_buf = buf[..n].to_vec(); - self.buf = Some(buf); - Poll::Ready(Some(Ok((res_buf, addr)))) - } - }; - } - } - } -} - -async fn recv_next( - socket: Arc, - mut buf: Vec, -) -> Result<(Vec, usize, SocketAddr)> { - // let mut buf = vec![0u8; 1024]; - let res = socket.recv_from(&mut buf).await; - match res { - Err(e) => Err(e), - Ok((n, addr)) => Ok((buf, n, addr)), - } -} diff --git a/src/ws.rs b/src/ws.rs new file mode 100644 index 0000000..6a1db5f --- /dev/null +++ b/src/ws.rs @@ -0,0 +1,124 @@ +use async_std::channel::{self, Receiver, Sender}; +use async_std::stream::{Stream, StreamExt}; +use async_std::sync::{Arc, Mutex}; +use async_std::task::{self, JoinHandle}; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tide::prelude::*; +use tide_websockets::{Message, WebSocket, WebSocketConnection}; + +#[derive(Debug, Clone)] +pub enum WsEvent { + Message(Message, SocketAddr), + Connect(SocketAddr), + Disconnect(SocketAddr), +} + +pub struct WsHandle { + task: JoinHandle>, + outgoing_tx: Sender<(Message, SocketAddr)>, + incoming_rx: Option>, +} + +impl WsHandle { + pub async fn send_to(&self, message: Message, peer_addr: SocketAddr) { + self.outgoing_tx.send((message, peer_addr)).await.unwrap() + } + pub fn sender(&self) -> Sender<(Message, SocketAddr)> { + self.outgoing_tx.clone() + } + + pub fn take_receiver(&mut self) -> Option> { + self.incoming_rx.take() + } +} + +impl Stream for WsHandle { + type Item = WsEvent; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Some(receiver) = self.incoming_rx.as_mut() { + Pin::new(receiver).poll_next(cx) + } else { + Poll::Ready(None) + } + } +} + +pub fn start() -> WsHandle { + let (outgoing_tx, outgoing_rx) = channel::unbounded(); + let (incoming_tx, incoming_rx) = channel::unbounded(); + let task = run(outgoing_rx, incoming_tx); + WsHandle { + task, + outgoing_tx, + incoming_rx: Some(incoming_rx), + } +} + +pub fn run( + mut outgoing_rx: Receiver<(Message, SocketAddr)>, + incoming_tx: Sender, +) -> JoinHandle> { + let peers: HashMap = HashMap::new(); + let peers = Arc::new(Mutex::new(peers)); + + let mut app = tide::new(); + + let send_task = task::spawn({ + let peers = peers.clone(); + async move { + while let Some((message, peer_addr)) = outgoing_rx.next().await { + let peers = peers.lock().await; + if let Some(peer) = peers.get(&peer_addr) { + match message { + Message::Text(message) => peer.send_string(message).await?, + Message::Binary(message) => peer.send_bytes(message).await?, + _ => {} + } + } + } + Ok::<(), tide_websockets::Error>(()) + } + }); + + app.at("/") + .serve_dir("/home/bit/Code/studiox/studiox/frontend/build") + .unwrap(); + + app.at("/ws") + .get(WebSocket::new(move |request, mut stream| { + let incoming_tx = incoming_tx.clone(); + let peers = peers.clone(); + let peer_addr: SocketAddr = request.peer_addr().unwrap().parse().unwrap(); + async move { + incoming_tx + .send(WsEvent::Connect(peer_addr.clone())) + .await + .unwrap(); + peers.lock().await.insert(peer_addr, stream.clone()); + while let Some(Ok(message)) = stream.next().await { + incoming_tx + .send(WsEvent::Message(message, peer_addr.clone())) + .await?; + } + peers.lock().await.remove(&peer_addr); + incoming_tx + .send(WsEvent::Disconnect(peer_addr)) + .await + .unwrap(); + Ok(()) + } + })); + + let main_task: JoinHandle> = task::spawn(async move { + let addr = "127.0.0.1:8080"; + app.listen(addr).await?; + log::info!("ws listening on {}", addr); + send_task.await?; + Ok(()) + }); + + main_task +}