From 091e6319cd1d9f2899300e3a98b6a2dc5dc12ef9 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Fri, 19 Feb 2021 14:40:58 +0100 Subject: [PATCH] initial commit --- .gitignore | 1 + Cargo.toml | 18 ++++ src/lib.rs | 8 ++ src/main.rs | 158 +++++++++++++++++++++++++++ src/mixer.rs | 9 ++ src/osc.rs | 92 ++++++++++++++++ src/spawn.rs | 56 ++++++++++ src/switcher.rs | 280 ++++++++++++++++++++++++++++++++++++++++++++++++ src/udp.rs | 78 ++++++++++++++ 9 files changed, 700 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 src/lib.rs create mode 100644 src/main.rs create mode 100644 src/mixer.rs create mode 100644 src/osc.rs create mode 100644 src/spawn.rs create mode 100644 src/switcher.rs create mode 100644 src/udp.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..ad5bac2 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "studiox" +version = "0.1.0" +authors = ["Franz Heinzmann (Frando) "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +thiserror = "1.0.23" +anyhow = "1.0.38" +async-std = { version = "1.9.0", features = ["unstable", "attributes"] } +async-channel = "1.6.1" +rosc = "0.4.2" +futures-lite = "1.11.3" +log = "0.4.14" +env_logger = "0.8.3" +regex = "1.4.3" diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..df773f5 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,8 @@ +pub mod osc; +pub mod spawn; +pub mod switcher; +pub mod udp; + +pub trait Actor { + fn send_command(command: C); +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..5ec354e --- /dev/null +++ b/src/main.rs @@ -0,0 +1,158 @@ + +use async_std::stream::{StreamExt}; + + + +use log::*; +use rosc::{OscMessage, OscPacket, OscType}; + + + + + + + +use studiox::switcher::{SwitcherError, SwitcherEvent, SwitcherHandle}; +// use studiox::udp::UdpStream; +use studiox::osc::OscStream; +use studiox::spawn::FaustHandle; + +pub enum Event { + FaustRx(Result), + Switcher(OscMessage), + CommandRx(Result), +} + +#[async_std::main] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + + // let child_task: JoinHandle> = task::spawn(async { + // let handle = FaustHandle::spawn().await; + // match handle { + // Ok(handle) => { + // info!("FAUST mixer launched: {:?}", handle) + // } + // Err(e) => error!("Launching FAUST mixer failed: {:?}", e), + // } + // eprintln!("spawn done"); + // Ok(()) + // // eprintln!("HANDLE: {:?}", handle); + // }); + + let handle = FaustHandle::spawn().await; + match handle { + Ok(handle) => { + info!("FAUST mixer launched: {:?}", handle) + } + Err(e) => error!("Launching FAUST mixer failed: {:?}", e), + } + + let port_faust_tx = 5510; + let port_faust_rx = 5511; + let port_commands = 5590; + let addr_faust_tx = format!("localhost:{}", port_faust_tx); + let addr_faust_rx = format!("localhost:{}", port_faust_rx); + let addr_commands = format!("localhost:{}", port_commands); + + let mut switcher = SwitcherHandle::run(); + let switcher_rx = switcher.take_receiver().unwrap().map(Event::Switcher); + + let faust_rx = OscStream::bind(addr_faust_rx).await?.map(Event::FaustRx); + let command_osc_rx = OscStream::bind(addr_commands).await?; + let command_osc_tx = command_osc_rx.sender(); + let command_osc_rx = command_osc_rx.map(Event::CommandRx); + + let mut events = faust_rx.merge(switcher_rx).merge(command_osc_rx); + + info!("start main loop"); + while let Some(event) = events.next().await { + match event { + Event::Switcher(message) => { + debug!("switcher tx: {:?}", message); + let packet = OscPacket::Message(message); + command_osc_tx.send_to(packet, &addr_faust_tx).await?; + } + Event::FaustRx(packet) => { + let packet = packet?; + debug!("faust rx: {:?}", packet); + } + Event::CommandRx(packet) => { + let packet = packet?; + debug!("command rx: {:?}", packet); + let event = parse_switcher_command(packet); + if let Some(event) = event { + switcher.send(event).await?; + } + } + } + } + + switcher.join().await?; + // child_task.await?; + Ok(()) +} + +fn parse_switcher_command(packet: OscPacket) -> Option { + let event = match packet { + OscPacket::Message(message) => match (&message.addr as &str, &message.args[..]) { + ("/switcher", [OscType::Int(channel_id), OscType::Int(on_value)]) => match on_value { + 1 => Some(SwitcherEvent::Enable(*channel_id as usize)), + 0 => Some(SwitcherEvent::Disable(*channel_id as usize)), + _ => { + error!( + "Invalid value for {}: {} (valid is 0 and 1)", + message.addr, on_value + ); + None + } + }, + _ => { + error!( + "Invalid address or arguments: {} {:?}", + message.addr, message.args + ); + None + } + }, + _ => { + error!("Bundles are not supported"); + None + } + }; + event +} + +// match msg { +// Some(Incoming::Osc(packet, addr)) => match rosc::decoder::decode(&packet) { +// Ok(packet) => { +// trace!("RECV OSC from {}: {:?}", &addr, &packet); +// if let OscPacket::Message(message) = &packet { +// if message.addr.starts_with("/player") { +// async fn osc_stream( +// addr: &str, +// ) -> Result< +// Map, SocketAddr)) -> Result + 'static>, +// SwitcherError, +// > +// // where +// // S: Stream>, +// { +// // Result> { +// let socket = UdpSocket::bind(addr).await?; +// let socket = Arc::new(socket); + +// let stream = UdpStream::new(socket); +// let stream = stream.map(|packet| { +// packet +// .map_err(|e| SwitcherError::from(e)) +// .map(|buf| rosc::decoder::decode(&buf.0[..])) +// .map_err(|e| SwitcherError::from(e)) +// }); +// Ok(stream) +// // Ok(stream) +// } + +// let addr = message.addr; +// let args = message.args; +// let parts: Vec<&str> = message.addr.split("/").skip(1).collect(); diff --git a/src/mixer.rs b/src/mixer.rs new file mode 100644 index 0000000..8b19384 --- /dev/null +++ b/src/mixer.rs @@ -0,0 +1,9 @@ +pub type ChannelId = usize; +pub type BusId = usize; +pub type OutputId = usize; + +pub enum MixerCommand { + Volume(ChannelId, f32), +} + +pub fn on_command() {} diff --git a/src/osc.rs b/src/osc.rs new file mode 100644 index 0000000..8dfc6b7 --- /dev/null +++ b/src/osc.rs @@ -0,0 +1,92 @@ +use async_std::net::{ToSocketAddrs, UdpSocket}; +use async_std::stream::{Map, Stream, StreamExt}; + + +use futures_lite::ready; + +use rosc::{OscError, OscMessage, OscPacket, OscType}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use std::{io}; + +use crate::switcher::SwitcherError; +use crate::udp::UdpStream; + +pub struct OscStream { + stream: UdpStream, +} + +impl OscStream { + pub fn new(socket: UdpSocket) -> Self { + let socket = Arc::new(socket); + let stream = UdpStream::new(socket); + + Self { stream } + } + + pub async fn bind(addr: A) -> Result { + let socket = UdpSocket::bind(addr).await?; + Ok(Self::new(socket)) + } + + pub async fn send_to( + &self, + packet: OscPacket, + addrs: A, + ) -> Result<(), SwitcherError> { + send_osc(&self.stream.get_ref(), packet, addrs).await + } + + pub fn sender(&self) -> OscSender { + OscSender::new(self.stream.clone_socket()) + } +} +impl Stream for OscStream { + type Item = Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let packet = ready!(Pin::new(&mut self.stream).poll_next(cx)); + let message = match packet { + None => None, + Some(packet) => Some(match packet { + Err(err) => Err(err.into()), + Ok(buf) => rosc::decoder::decode(&buf.0[..]).map_err(|e| e.into()), + }), + }; + Poll::Ready(message) + } +} + +#[derive(Clone)] +pub struct OscSender { + socket: Arc, +} + +impl OscSender { + pub fn new(socket: Arc) -> Self { + Self { socket } + } + + pub async fn send_to( + &self, + packet: OscPacket, + addrs: A, + ) -> Result<(), SwitcherError> { + send_osc(&self.socket, packet, addrs).await + } +} + +async fn send_osc( + socket: &UdpSocket, + packet: OscPacket, + addrs: A, +) -> Result<(), SwitcherError> { + let buf = rosc::encoder::encode(&packet)?; + let n = socket.send_to(&buf[..], addrs).await?; + if n != buf.len() { + Err(io::Error::new(io::ErrorKind::Interrupted, "udp packet was not fully sent").into()) + } else { + Ok(()) + } +} diff --git a/src/spawn.rs b/src/spawn.rs new file mode 100644 index 0000000..703517b --- /dev/null +++ b/src/spawn.rs @@ -0,0 +1,56 @@ +use async_std::process::{Child, ChildStdout, Command, Stdio}; +use futures_lite::io::{BufReader, Lines}; +use futures_lite::prelude::*; +use log::*; +use regex::Regex; + +use std::io; + +#[derive(Debug)] +pub struct FaustHandle { + port_tx: u16, + port_rx: u16, + port_error: u16, + child: Child, +} + +impl FaustHandle { + pub async fn spawn() -> Result { + let exec = "/home/bit/Code/RDL/studiox/studiox-next/mixer-jackconsole"; + let mut child = Command::new(exec) + .arg(".") + .stdout(Stdio::piped()) + // .stderr(Stdio::piped()) + .spawn()?; + + let mut lines = BufReader::new(child.stdout.take().unwrap()).lines(); + + let (port_tx, port_rx, port_error) = wait_for_ports(&mut lines).await?; + + Ok(Self { + port_tx, + port_rx, + port_error, + child, + }) + } +} + +async fn wait_for_ports(lines: &mut Lines>) -> io::Result<(u16, u16, u16)> { + let re = Regex::new(r"is running on UDP ports (\d{4}), (\d{4}), (\d{4}),").unwrap(); + while let Some(line) = lines.next().await { + let line = line?; + debug!("{}", line); + let matches = re.captures(&line); + if let Some(captures) = matches { + let port_tx: u16 = captures.get(1).unwrap().as_str().parse().unwrap(); + let port_rx: u16 = captures.get(2).unwrap().as_str().parse().unwrap(); + let port_error: u16 = captures.get(3).unwrap().as_str().parse().unwrap(); + return Ok((port_tx, port_rx, port_error)); + } + } + Err(io::Error::new( + io::ErrorKind::Other, + "OSC ports not found in output", + )) +} diff --git a/src/switcher.rs b/src/switcher.rs new file mode 100644 index 0000000..c7a598e --- /dev/null +++ b/src/switcher.rs @@ -0,0 +1,280 @@ +use async_channel::{self, Receiver, Sender}; +use async_std::prelude::*; +use async_std::task::{self, JoinHandle}; +use rosc::{OscMessage, OscType}; +use std::collections::{VecDeque}; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum SwitcherError { + #[error("Channel for switcher events full")] + CannotSendEvent(#[from] async_channel::SendError), + #[error("Channel for OSC messages full")] + CannotSendMessage(#[from] async_channel::SendError), + #[error("Decode OSC packet failed")] + Osc(rosc::OscError), + + #[error("IO error")] + Io(#[from] std::io::Error), +} + +impl From for SwitcherError { + fn from(error: rosc::OscError) -> Self { + Self::Osc(error) + } +} + +pub type Result = std::result::Result; + +pub type ChannelId = usize; + +#[derive(Debug)] +pub enum SwitcherEvent { + Enable(ChannelId), + Disable(ChannelId), +} + +#[derive(Default)] +pub struct SwitcherState { + // osc_tx: Sender, + channels: Vec, + active_channel: usize, + messages: Messages, +} + +#[derive(Default)] +pub struct ChannelState { + id: ChannelId, + pub name: String, + pub locked: bool, + pub enabled: bool, +} + +impl ChannelState { + pub fn new(id: ChannelId) -> Self { + Self { + id, + ..Default::default() + } + } +} + +pub struct SwitcherHandle { + events_tx: Sender, + osc_rx: Option>, + task: JoinHandle>, +} + +impl SwitcherHandle { + pub fn run() -> Self { + let (osc_tx, osc_rx) = async_channel::unbounded(); + let (events_tx, events_rx) = async_channel::unbounded(); + let state = SwitcherState::new(); + let task = task::spawn(run_loop(state, events_rx, osc_tx)); + SwitcherHandle { + events_tx, + osc_rx: Some(osc_rx), + task, + } + } + pub async fn join(self) -> Result<()> { + self.task.await + } + + pub async fn send(&self, event: SwitcherEvent) -> Result<()> { + log::debug!("switcher got event: {:?}", event); + self.events_tx.send(event).await?; + Ok(()) + } + + pub fn sender(&self) -> Sender { + self.events_tx.clone() + } + + pub fn take_receiver(&mut self) -> Option> { + self.osc_rx.take() + } +} + +pub async fn run_loop( + mut state: SwitcherState, + mut events_rx: Receiver, + osc_tx: Sender, +) -> Result<()> { + state.init(); + loop { + while let Some(message) = state.pop_message() { + osc_tx.send(message).await?; + } + if let Some(event) = events_rx.next().await { + state.on_event(event); + } + } +} + +impl SwitcherState { + pub fn new() -> Self { + Default::default() + } + + pub fn init(&mut self) { + let num_channels = 4; + for i in 0..num_channels { + let channel = ChannelState::new(i); + self.channels.push(channel); + } + // self.messages.push("/*", ("xmit", 1)); + self.messages.push("/mixer/out/0/bus/0/on", (1.0,)); + self.messages.push("/mixer/bus/0/ch/0/on", (1.0,)); + self.update_state(); + } + + pub fn on_event(&mut self, event: SwitcherEvent) { + match event { + SwitcherEvent::Enable(channel) => self.enable_channel(channel), + SwitcherEvent::Disable(channel) => self.disable_channel(channel), + } + self.update_state(); + } + pub fn enable_channel(&mut self, channel_id: ChannelId) { + if let Some(channel) = self.channels.iter_mut().find(|c| c.id == channel_id) { + if !channel.locked { + channel.enabled = true; + } + } + } + + pub fn disable_channel(&mut self, channel_id: ChannelId) { + if let Some(channel) = self.channels.iter_mut().find(|c| c.id == channel_id) { + if !channel.locked { + channel.enabled = false; + } + } + } + + fn update_state(&mut self) { + let next_active_channel = self + .channels + .iter() + .rev() + .find(|c| c.enabled) + .or(self.channels.get(0)) + .unwrap() + .id; + if next_active_channel != self.active_channel { + self.active_channel = next_active_channel; + self.on_channel_change(); + } + } + + fn on_channel_change(&mut self) { + for channel in self.channels.iter() { + if channel.id == self.active_channel { + self.messages + .push(format!("/mixer/bus/0/ch/{}/on", channel.id), (1.0,)); + } else { + self.messages + .push(format!("/mixer/bus/0/ch/{}/on", channel.id), (0.0,)); + } + } + } + + fn pop_message(&mut self) -> Option { + self.messages.pop() + } +} + +pub struct Messages { + inner: VecDeque, +} + +impl Default for Messages { + fn default() -> Self { + Self::new() + } +} + +impl Messages { + pub fn new() -> Self { + Self { + inner: VecDeque::new(), + } + } + + fn push(&mut self, addr: impl ToString, args: T) + where + T: IntoOscArgs, + { + self.inner.push_back(osc_message(addr, args)); + } + + fn pop(&mut self) -> Option { + self.inner.pop_front() + } +} + +fn osc_message(addr: impl ToString, args: T) -> OscMessage +where + T: IntoOscArgs, +{ + // let args: Vec = args.into_iter().map(|a| a.into()).collect(); + let args = args.into_osc_args(); + let addr = addr.to_string(); + OscMessage { addr, args } +} + +pub trait IntoOscArgs { + fn into_osc_args(self) -> Vec; +} + +impl IntoOscArgs for Vec +where + T: Into, +{ + fn into_osc_args(self) -> Vec { + let args: Vec = self.into_iter().map(|a| a.into()).collect(); + args + } +} + +impl IntoOscArgs for (T1,) +where + T1: Into, +{ + fn into_osc_args(self) -> Vec { + vec![self.0.into()] + } +} + +impl IntoOscArgs for (T1, T2) +where + T1: Into, + T2: Into, +{ + fn into_osc_args(self) -> Vec { + vec![self.0.into(), self.1.into()] + } +} + +impl IntoOscArgs for (T1, T2, T3) +where + T1: Into, + T2: Into, + T3: Into, +{ + fn into_osc_args(self) -> Vec { + vec![self.0.into(), self.1.into(), self.2.into()] + } +} + +impl IntoOscArgs for OscType { + fn into_osc_args(self) -> Vec { + vec![self] + } +} + +// impl IntoOscArgs for Vec { +// fn into_osc_args(self) -> Vec { +// self +// } +// } diff --git a/src/udp.rs b/src/udp.rs new file mode 100644 index 0000000..ffb2e37 --- /dev/null +++ b/src/udp.rs @@ -0,0 +1,78 @@ +use async_std::net::{ToSocketAddrs, UdpSocket}; +use async_std::stream::{Stream, StreamExt}; + +use futures_lite::future::Future; +use futures_lite::ready; +use std::io::{Result}; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + + +pub struct UdpStream { + socket: Arc, + fut: Option, usize, SocketAddr)>> + Send + Sync>>>, + buf: Option>, +} + +impl UdpStream { + pub fn new(socket: Arc) -> Self { + let buf = vec![0u8; 1024 * 64]; + Self { + socket, + fut: None, + buf: Some(buf), + } + } + + pub fn get_ref(&self) -> &UdpSocket { + &self.socket + } + + pub fn clone_socket(&self) -> Arc { + self.socket.clone() + } + + pub async fn send_to(&self, buf: &[u8], addrs: A) -> Result { + self.socket.send_to(buf, addrs).await + } +} + +impl Stream for UdpStream { + type Item = Result<(Vec, SocketAddr)>; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + if self.fut.is_none() { + let buf = self.buf.take().unwrap(); + let fut = recv_next(self.socket.clone(), buf); + self.fut = Some(Box::pin(fut)); + } + + if let Some(f) = &mut self.fut { + let res = ready!(f.as_mut().poll(cx)); + self.fut = None; + return match res { + Err(e) => Poll::Ready(Some(Err(e))), + Ok((buf, n, addr)) => { + let res_buf = buf[..n].to_vec(); + self.buf = Some(buf); + Poll::Ready(Some(Ok((res_buf, addr)))) + } + }; + } + } + } +} + +async fn recv_next( + socket: Arc, + mut buf: Vec, +) -> Result<(Vec, usize, SocketAddr)> { + // let mut buf = vec![0u8; 1024]; + let res = socket.recv_from(&mut buf).await; + match res { + Err(e) => Err(e), + Ok((n, addr)) => Ok((buf, n, addr)), + } +}