use async_std::stream::StreamExt; use log::*; use rosc::{OscMessage, OscPacket, OscType}; use studiox::osc::OscUdpStream; use studiox::spawn::FaustHandle; use studiox::switcher::{SwitcherError, SwitcherEvent, SwitcherHandle}; pub enum Event { FaustRx(Result), Switcher(OscMessage), CommandRx(Result), } #[async_std::main] async fn main() -> anyhow::Result<()> { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); let faust_handle = FaustHandle::spawn().await; match faust_handle.as_ref() { Ok(handle) => { info!("FAUST mixer launched: {:?}", handle) } Err(e) => { error!("Launching FAUST mixer failed: {:?}", e); } } let mut faust_handle = faust_handle?; let res = main_loop(&faust_handle).await; match res.as_ref() { Ok(_) => {} Err(e) => error!("Error: {:?}", e), } faust_handle.child.kill()?; res } async fn main_loop(faust_handle: &FaustHandle) -> anyhow::Result<()> { // let port_faust_tx = 5510; // let port_faust_rx = 5511; let port_commands: u16 = std::env::var("PORT").unwrap_or("5590".into()).parse()?; let addr_faust_tx = format!("localhost:{}", faust_handle.port_tx); let addr_faust_rx = format!("localhost:{}", faust_handle.port_rx); let addr_commands = format!("localhost:{}", port_commands); let mut switcher = SwitcherHandle::run(); let switcher_rx = switcher.take_receiver().unwrap().map(Event::Switcher); let faust_rx = OscUdpStream::bind(addr_faust_rx).await?.map(Event::FaustRx); let command_osc_rx = OscUdpStream::bind(addr_commands).await?; let command_osc_tx = command_osc_rx.sender(); let command_osc_rx = command_osc_rx.map(Event::CommandRx); let mut events = faust_rx.merge(switcher_rx).merge(command_osc_rx); info!("Listening for OSC commands on localhost:{}", port_commands); while let Some(event) = events.next().await { match event { Event::Switcher(message) => { debug!("switcher tx: {:?}", message); let packet = OscPacket::Message(message); command_osc_tx.send_to(packet, &addr_faust_tx).await?; } Event::FaustRx(packet) => { debug!("faust rx: {:?}", packet); let _packet = packet?; } Event::CommandRx(packet) => { debug!("command rx: {:?}", packet); let packet = packet?; let event = parse_switcher_command(packet); if let Some(event) = event { switcher.send(event).await?; } } } } switcher.join().await?; // child_task.await?; Ok(()) } fn parse_switcher_command(packet: OscPacket) -> Option { let event = match packet { OscPacket::Message(message) => match (&message.addr as &str, &message.args[..]) { ("/switcher", [OscType::Int(channel_id), OscType::Int(on_value)]) => match on_value { 1 => Some(SwitcherEvent::Enable(*channel_id as usize)), 0 => Some(SwitcherEvent::Disable(*channel_id as usize)), _ => { error!( "Invalid value for {}: {} (valid is 0 and 1)", message.addr, on_value ); None } }, _ => { error!( "Invalid address or arguments: {} {:?}", message.addr, message.args ); None } }, _ => { error!("Bundles are not supported"); None } }; event } // match msg { // Some(Incoming::Osc(packet, addr)) => match rosc::decoder::decode(&packet) { // Ok(packet) => { // trace!("RECV OSC from {}: {:?}", &addr, &packet); // if let OscPacket::Message(message) = &packet { // if message.addr.starts_with("/player") { // async fn osc_stream( // addr: &str, // ) -> Result< // Map, SocketAddr)) -> Result + 'static>, // SwitcherError, // > // // where // // S: Stream>, // { // // Result> { // let socket = UdpSocket::bind(addr).await?; // let socket = Arc::new(socket); // let stream = UdpStream::new(socket); // let stream = stream.map(|packet| { // packet // .map_err(|e| SwitcherError::from(e)) // .map(|buf| rosc::decoder::decode(&buf.0[..])) // .map_err(|e| SwitcherError::from(e)) // }); // Ok(stream) // // Ok(stream) // } // let addr = message.addr; // let args = message.args; // let parts: Vec<&str> = message.addr.split("/").skip(1).collect();