improvements and cleanup

This commit is contained in:
Franz Heinzmann (Frando) 2021-02-19 18:51:09 +01:00
parent 091e6319cd
commit 7558fd3fe9
6 changed files with 107 additions and 127 deletions

View file

@ -2,7 +2,3 @@ pub mod osc;
pub mod spawn; pub mod spawn;
pub mod switcher; pub mod switcher;
pub mod udp; pub mod udp;
pub trait Actor<C, E> {
fn send_command(command: C);
}

View file

@ -1,21 +1,10 @@
use async_std::stream::StreamExt;
use async_std::stream::{StreamExt};
use log::*; use log::*;
use rosc::{OscMessage, OscPacket, OscType}; use rosc::{OscMessage, OscPacket, OscType};
use studiox::switcher::{SwitcherError, SwitcherEvent, SwitcherHandle};
// use studiox::udp::UdpStream;
use studiox::osc::OscStream; use studiox::osc::OscStream;
use studiox::spawn::FaustHandle; use studiox::spawn::FaustHandle;
use studiox::switcher::{SwitcherError, SwitcherEvent, SwitcherHandle};
pub enum Event { pub enum Event {
FaustRx(Result<OscPacket, SwitcherError>), FaustRx(Result<OscPacket, SwitcherError>),
@ -25,34 +14,32 @@ pub enum Event {
#[async_std::main] #[async_std::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
env_logger::init(); env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
// let child_task: JoinHandle<io::Result<()>> = task::spawn(async { let faust_handle = FaustHandle::spawn().await;
// let handle = FaustHandle::spawn().await; match faust_handle.as_ref() {
// match handle {
// Ok(handle) => {
// info!("FAUST mixer launched: {:?}", handle)
// }
// Err(e) => error!("Launching FAUST mixer failed: {:?}", e),
// }
// eprintln!("spawn done");
// Ok(())
// // eprintln!("HANDLE: {:?}", handle);
// });
let handle = FaustHandle::spawn().await;
match handle {
Ok(handle) => { Ok(handle) => {
info!("FAUST mixer launched: {:?}", handle) info!("FAUST mixer launched: {:?}", handle)
} }
Err(e) => error!("Launching FAUST mixer failed: {:?}", e), Err(e) => {
error!("Launching FAUST mixer failed: {:?}", e);
} }
}
let port_faust_tx = 5510; let mut faust_handle = faust_handle?;
let port_faust_rx = 5511; let res = main_loop(&faust_handle).await;
let port_commands = 5590; match res.as_ref() {
let addr_faust_tx = format!("localhost:{}", port_faust_tx); Ok(_) => {}
let addr_faust_rx = format!("localhost:{}", port_faust_rx); Err(e) => error!("Error: {:?}", e),
}
faust_handle.child.kill()?;
res
}
async fn main_loop(faust_handle: &FaustHandle) -> anyhow::Result<()> {
// let port_faust_tx = 5510;
// let port_faust_rx = 5511;
let port_commands: u16 = std::env::var("PORT").unwrap_or("5590".into()).parse()?;
let addr_faust_tx = format!("localhost:{}", faust_handle.port_tx);
let addr_faust_rx = format!("localhost:{}", faust_handle.port_rx);
let addr_commands = format!("localhost:{}", port_commands); let addr_commands = format!("localhost:{}", port_commands);
let mut switcher = SwitcherHandle::run(); let mut switcher = SwitcherHandle::run();
@ -65,7 +52,7 @@ async fn main() -> anyhow::Result<()> {
let mut events = faust_rx.merge(switcher_rx).merge(command_osc_rx); let mut events = faust_rx.merge(switcher_rx).merge(command_osc_rx);
info!("start main loop"); info!("Listening for OSC commands on localhost:{}", port_commands);
while let Some(event) = events.next().await { while let Some(event) = events.next().await {
match event { match event {
Event::Switcher(message) => { Event::Switcher(message) => {
@ -74,12 +61,12 @@ async fn main() -> anyhow::Result<()> {
command_osc_tx.send_to(packet, &addr_faust_tx).await?; command_osc_tx.send_to(packet, &addr_faust_tx).await?;
} }
Event::FaustRx(packet) => { Event::FaustRx(packet) => {
let packet = packet?;
debug!("faust rx: {:?}", packet); debug!("faust rx: {:?}", packet);
let _packet = packet?;
} }
Event::CommandRx(packet) => { Event::CommandRx(packet) => {
let packet = packet?;
debug!("command rx: {:?}", packet); debug!("command rx: {:?}", packet);
let packet = packet?;
let event = parse_switcher_command(packet); let event = parse_switcher_command(packet);
if let Some(event) = event { if let Some(event) = event {
switcher.send(event).await?; switcher.send(event).await?;

View file

@ -1,16 +1,12 @@
use async_std::net::{ToSocketAddrs, UdpSocket}; use async_std::net::{ToSocketAddrs, UdpSocket};
use async_std::stream::{Map, Stream, StreamExt}; use async_std::stream::Stream;
use futures_lite::ready; use futures_lite::ready;
use rosc::{OscMessage, OscPacket, OscType};
use rosc::{OscError, OscMessage, OscPacket, OscType}; use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{io};
use crate::switcher::SwitcherError; use crate::switcher::SwitcherError;
use crate::udp::UdpStream; use crate::udp::UdpStream;
@ -90,3 +86,70 @@ async fn send_osc<A: ToSocketAddrs>(
Ok(()) Ok(())
} }
} }
pub trait OscMessageExt {
fn new<T>(addr: impl ToString, args: T) -> OscMessage
where
T: IntoOscArgs;
}
impl OscMessageExt for OscMessage {
fn new<T>(addr: impl ToString, args: T) -> OscMessage
where
T: IntoOscArgs,
{
let args = args.into_osc_args();
let addr = addr.to_string();
OscMessage { addr, args }
}
}
pub trait IntoOscArgs {
fn into_osc_args(self) -> Vec<OscType>;
}
impl<T> IntoOscArgs for Vec<T>
where
T: Into<OscType>,
{
fn into_osc_args(self) -> Vec<OscType> {
let args: Vec<OscType> = self.into_iter().map(|a| a.into()).collect();
args
}
}
impl<T1> IntoOscArgs for (T1,)
where
T1: Into<OscType>,
{
fn into_osc_args(self) -> Vec<OscType> {
vec![self.0.into()]
}
}
impl<T1, T2> IntoOscArgs for (T1, T2)
where
T1: Into<OscType>,
T2: Into<OscType>,
{
fn into_osc_args(self) -> Vec<OscType> {
vec![self.0.into(), self.1.into()]
}
}
impl<T1, T2, T3> IntoOscArgs for (T1, T2, T3)
where
T1: Into<OscType>,
T2: Into<OscType>,
T3: Into<OscType>,
{
fn into_osc_args(self) -> Vec<OscType> {
vec![self.0.into(), self.1.into(), self.2.into()]
}
}
impl IntoOscArgs for OscType {
fn into_osc_args(self) -> Vec<OscType> {
vec![self]
}
}

View file

@ -8,10 +8,10 @@ use std::io;
#[derive(Debug)] #[derive(Debug)]
pub struct FaustHandle { pub struct FaustHandle {
port_tx: u16, pub port_tx: u16,
port_rx: u16, pub port_rx: u16,
port_error: u16, pub port_error: u16,
child: Child, pub child: Child,
} }
impl FaustHandle { impl FaustHandle {

View file

@ -1,10 +1,12 @@
use async_channel::{self, Receiver, Sender}; use async_channel::{self, Receiver, Sender};
use async_std::prelude::*; use async_std::prelude::*;
use async_std::task::{self, JoinHandle}; use async_std::task::{self, JoinHandle};
use rosc::{OscMessage, OscType}; use rosc::OscMessage;
use std::collections::{VecDeque}; use std::collections::VecDeque;
use thiserror::Error; use thiserror::Error;
use crate::osc::{IntoOscArgs, OscMessageExt};
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum SwitcherError { pub enum SwitcherError {
#[error("Channel for switcher events full")] #[error("Channel for switcher events full")]
@ -205,76 +207,10 @@ impl Messages {
where where
T: IntoOscArgs, T: IntoOscArgs,
{ {
self.inner.push_back(osc_message(addr, args)); self.inner.push_back(OscMessage::new(addr, args));
} }
fn pop(&mut self) -> Option<OscMessage> { fn pop(&mut self) -> Option<OscMessage> {
self.inner.pop_front() self.inner.pop_front()
} }
} }
fn osc_message<T>(addr: impl ToString, args: T) -> OscMessage
where
T: IntoOscArgs,
{
// let args: Vec<OscType> = args.into_iter().map(|a| a.into()).collect();
let args = args.into_osc_args();
let addr = addr.to_string();
OscMessage { addr, args }
}
pub trait IntoOscArgs {
fn into_osc_args(self) -> Vec<OscType>;
}
impl<T> IntoOscArgs for Vec<T>
where
T: Into<OscType>,
{
fn into_osc_args(self) -> Vec<OscType> {
let args: Vec<OscType> = self.into_iter().map(|a| a.into()).collect();
args
}
}
impl<T1> IntoOscArgs for (T1,)
where
T1: Into<OscType>,
{
fn into_osc_args(self) -> Vec<OscType> {
vec![self.0.into()]
}
}
impl<T1, T2> IntoOscArgs for (T1, T2)
where
T1: Into<OscType>,
T2: Into<OscType>,
{
fn into_osc_args(self) -> Vec<OscType> {
vec![self.0.into(), self.1.into()]
}
}
impl<T1, T2, T3> IntoOscArgs for (T1, T2, T3)
where
T1: Into<OscType>,
T2: Into<OscType>,
T3: Into<OscType>,
{
fn into_osc_args(self) -> Vec<OscType> {
vec![self.0.into(), self.1.into(), self.2.into()]
}
}
impl IntoOscArgs for OscType {
fn into_osc_args(self) -> Vec<OscType> {
vec![self]
}
}
// impl IntoOscArgs for Vec<OscType> {
// fn into_osc_args(self) -> Vec<OscType> {
// self
// }
// }

View file

@ -1,15 +1,13 @@
use async_std::net::{ToSocketAddrs, UdpSocket}; use async_std::net::{ToSocketAddrs, UdpSocket};
use async_std::stream::{Stream, StreamExt}; use async_std::stream::Stream;
use futures_lite::future::Future; use futures_lite::future::Future;
use futures_lite::ready; use futures_lite::ready;
use std::io::{Result}; use std::io::Result;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
pub struct UdpStream { pub struct UdpStream {
socket: Arc<UdpSocket>, socket: Arc<UdpSocket>,
fut: Option<Pin<Box<dyn Future<Output = Result<(Vec<u8>, usize, SocketAddr)>> + Send + Sync>>>, fut: Option<Pin<Box<dyn Future<Output = Result<(Vec<u8>, usize, SocketAddr)>> + Send + Sync>>>,