initial commit

This commit is contained in:
Franz Heinzmann (Frando) 2021-02-19 14:40:58 +01:00
commit 091e6319cd
9 changed files with 700 additions and 0 deletions

8
src/lib.rs Normal file
View file

@ -0,0 +1,8 @@
pub mod osc;
pub mod spawn;
pub mod switcher;
pub mod udp;
pub trait Actor<C, E> {
fn send_command(command: C);
}

158
src/main.rs Normal file
View file

@ -0,0 +1,158 @@
use async_std::stream::{StreamExt};
use log::*;
use rosc::{OscMessage, OscPacket, OscType};
use studiox::switcher::{SwitcherError, SwitcherEvent, SwitcherHandle};
// use studiox::udp::UdpStream;
use studiox::osc::OscStream;
use studiox::spawn::FaustHandle;
pub enum Event {
FaustRx(Result<OscPacket, SwitcherError>),
Switcher(OscMessage),
CommandRx(Result<OscPacket, SwitcherError>),
}
#[async_std::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
// let child_task: JoinHandle<io::Result<()>> = task::spawn(async {
// let handle = FaustHandle::spawn().await;
// match handle {
// Ok(handle) => {
// info!("FAUST mixer launched: {:?}", handle)
// }
// Err(e) => error!("Launching FAUST mixer failed: {:?}", e),
// }
// eprintln!("spawn done");
// Ok(())
// // eprintln!("HANDLE: {:?}", handle);
// });
let handle = FaustHandle::spawn().await;
match handle {
Ok(handle) => {
info!("FAUST mixer launched: {:?}", handle)
}
Err(e) => error!("Launching FAUST mixer failed: {:?}", e),
}
let port_faust_tx = 5510;
let port_faust_rx = 5511;
let port_commands = 5590;
let addr_faust_tx = format!("localhost:{}", port_faust_tx);
let addr_faust_rx = format!("localhost:{}", port_faust_rx);
let addr_commands = format!("localhost:{}", port_commands);
let mut switcher = SwitcherHandle::run();
let switcher_rx = switcher.take_receiver().unwrap().map(Event::Switcher);
let faust_rx = OscStream::bind(addr_faust_rx).await?.map(Event::FaustRx);
let command_osc_rx = OscStream::bind(addr_commands).await?;
let command_osc_tx = command_osc_rx.sender();
let command_osc_rx = command_osc_rx.map(Event::CommandRx);
let mut events = faust_rx.merge(switcher_rx).merge(command_osc_rx);
info!("start main loop");
while let Some(event) = events.next().await {
match event {
Event::Switcher(message) => {
debug!("switcher tx: {:?}", message);
let packet = OscPacket::Message(message);
command_osc_tx.send_to(packet, &addr_faust_tx).await?;
}
Event::FaustRx(packet) => {
let packet = packet?;
debug!("faust rx: {:?}", packet);
}
Event::CommandRx(packet) => {
let packet = packet?;
debug!("command rx: {:?}", packet);
let event = parse_switcher_command(packet);
if let Some(event) = event {
switcher.send(event).await?;
}
}
}
}
switcher.join().await?;
// child_task.await?;
Ok(())
}
fn parse_switcher_command(packet: OscPacket) -> Option<SwitcherEvent> {
let event = match packet {
OscPacket::Message(message) => match (&message.addr as &str, &message.args[..]) {
("/switcher", [OscType::Int(channel_id), OscType::Int(on_value)]) => match on_value {
1 => Some(SwitcherEvent::Enable(*channel_id as usize)),
0 => Some(SwitcherEvent::Disable(*channel_id as usize)),
_ => {
error!(
"Invalid value for {}: {} (valid is 0 and 1)",
message.addr, on_value
);
None
}
},
_ => {
error!(
"Invalid address or arguments: {} {:?}",
message.addr, message.args
);
None
}
},
_ => {
error!("Bundles are not supported");
None
}
};
event
}
// match msg {
// Some(Incoming::Osc(packet, addr)) => match rosc::decoder::decode(&packet) {
// Ok(packet) => {
// trace!("RECV OSC from {}: {:?}", &addr, &packet);
// if let OscPacket::Message(message) = &packet {
// if message.addr.starts_with("/player") {
// async fn osc_stream(
// addr: &str,
// ) -> Result<
// Map<UdpStream, dyn FnMut((Vec<u8>, SocketAddr)) -> Result<OscMessage, SwitcherError> + 'static>,
// SwitcherError,
// >
// // where
// // S: Stream<Item = Result<OscPacket, rosc::OscError>>,
// {
// // Result<impl Stream<Result<OscPacket, OscError>> {
// let socket = UdpSocket::bind(addr).await?;
// let socket = Arc::new(socket);
// let stream = UdpStream::new(socket);
// let stream = stream.map(|packet| {
// packet
// .map_err(|e| SwitcherError::from(e))
// .map(|buf| rosc::decoder::decode(&buf.0[..]))
// .map_err(|e| SwitcherError::from(e))
// });
// Ok(stream)
// // Ok(stream)
// }
// let addr = message.addr;
// let args = message.args;
// let parts: Vec<&str> = message.addr.split("/").skip(1).collect();

9
src/mixer.rs Normal file
View file

@ -0,0 +1,9 @@
pub type ChannelId = usize;
pub type BusId = usize;
pub type OutputId = usize;
pub enum MixerCommand {
Volume(ChannelId, f32),
}
pub fn on_command() {}

92
src/osc.rs Normal file
View file

@ -0,0 +1,92 @@
use async_std::net::{ToSocketAddrs, UdpSocket};
use async_std::stream::{Map, Stream, StreamExt};
use futures_lite::ready;
use rosc::{OscError, OscMessage, OscPacket, OscType};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{io};
use crate::switcher::SwitcherError;
use crate::udp::UdpStream;
pub struct OscStream {
stream: UdpStream,
}
impl OscStream {
pub fn new(socket: UdpSocket) -> Self {
let socket = Arc::new(socket);
let stream = UdpStream::new(socket);
Self { stream }
}
pub async fn bind<A: ToSocketAddrs>(addr: A) -> Result<Self, SwitcherError> {
let socket = UdpSocket::bind(addr).await?;
Ok(Self::new(socket))
}
pub async fn send_to<A: ToSocketAddrs>(
&self,
packet: OscPacket,
addrs: A,
) -> Result<(), SwitcherError> {
send_osc(&self.stream.get_ref(), packet, addrs).await
}
pub fn sender(&self) -> OscSender {
OscSender::new(self.stream.clone_socket())
}
}
impl Stream for OscStream {
type Item = Result<OscPacket, SwitcherError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let packet = ready!(Pin::new(&mut self.stream).poll_next(cx));
let message = match packet {
None => None,
Some(packet) => Some(match packet {
Err(err) => Err(err.into()),
Ok(buf) => rosc::decoder::decode(&buf.0[..]).map_err(|e| e.into()),
}),
};
Poll::Ready(message)
}
}
#[derive(Clone)]
pub struct OscSender {
socket: Arc<UdpSocket>,
}
impl OscSender {
pub fn new(socket: Arc<UdpSocket>) -> Self {
Self { socket }
}
pub async fn send_to<A: ToSocketAddrs>(
&self,
packet: OscPacket,
addrs: A,
) -> Result<(), SwitcherError> {
send_osc(&self.socket, packet, addrs).await
}
}
async fn send_osc<A: ToSocketAddrs>(
socket: &UdpSocket,
packet: OscPacket,
addrs: A,
) -> Result<(), SwitcherError> {
let buf = rosc::encoder::encode(&packet)?;
let n = socket.send_to(&buf[..], addrs).await?;
if n != buf.len() {
Err(io::Error::new(io::ErrorKind::Interrupted, "udp packet was not fully sent").into())
} else {
Ok(())
}
}

56
src/spawn.rs Normal file
View file

@ -0,0 +1,56 @@
use async_std::process::{Child, ChildStdout, Command, Stdio};
use futures_lite::io::{BufReader, Lines};
use futures_lite::prelude::*;
use log::*;
use regex::Regex;
use std::io;
#[derive(Debug)]
pub struct FaustHandle {
port_tx: u16,
port_rx: u16,
port_error: u16,
child: Child,
}
impl FaustHandle {
pub async fn spawn() -> Result<Self, std::io::Error> {
let exec = "/home/bit/Code/RDL/studiox/studiox-next/mixer-jackconsole";
let mut child = Command::new(exec)
.arg(".")
.stdout(Stdio::piped())
// .stderr(Stdio::piped())
.spawn()?;
let mut lines = BufReader::new(child.stdout.take().unwrap()).lines();
let (port_tx, port_rx, port_error) = wait_for_ports(&mut lines).await?;
Ok(Self {
port_tx,
port_rx,
port_error,
child,
})
}
}
async fn wait_for_ports(lines: &mut Lines<BufReader<ChildStdout>>) -> io::Result<(u16, u16, u16)> {
let re = Regex::new(r"is running on UDP ports (\d{4}), (\d{4}), (\d{4}),").unwrap();
while let Some(line) = lines.next().await {
let line = line?;
debug!("{}", line);
let matches = re.captures(&line);
if let Some(captures) = matches {
let port_tx: u16 = captures.get(1).unwrap().as_str().parse().unwrap();
let port_rx: u16 = captures.get(2).unwrap().as_str().parse().unwrap();
let port_error: u16 = captures.get(3).unwrap().as_str().parse().unwrap();
return Ok((port_tx, port_rx, port_error));
}
}
Err(io::Error::new(
io::ErrorKind::Other,
"OSC ports not found in output",
))
}

280
src/switcher.rs Normal file
View file

@ -0,0 +1,280 @@
use async_channel::{self, Receiver, Sender};
use async_std::prelude::*;
use async_std::task::{self, JoinHandle};
use rosc::{OscMessage, OscType};
use std::collections::{VecDeque};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum SwitcherError {
#[error("Channel for switcher events full")]
CannotSendEvent(#[from] async_channel::SendError<SwitcherEvent>),
#[error("Channel for OSC messages full")]
CannotSendMessage(#[from] async_channel::SendError<OscMessage>),
#[error("Decode OSC packet failed")]
Osc(rosc::OscError),
#[error("IO error")]
Io(#[from] std::io::Error),
}
impl From<rosc::OscError> for SwitcherError {
fn from(error: rosc::OscError) -> Self {
Self::Osc(error)
}
}
pub type Result<T> = std::result::Result<T, SwitcherError>;
pub type ChannelId = usize;
#[derive(Debug)]
pub enum SwitcherEvent {
Enable(ChannelId),
Disable(ChannelId),
}
#[derive(Default)]
pub struct SwitcherState {
// osc_tx: Sender<OscMessage>,
channels: Vec<ChannelState>,
active_channel: usize,
messages: Messages,
}
#[derive(Default)]
pub struct ChannelState {
id: ChannelId,
pub name: String,
pub locked: bool,
pub enabled: bool,
}
impl ChannelState {
pub fn new(id: ChannelId) -> Self {
Self {
id,
..Default::default()
}
}
}
pub struct SwitcherHandle {
events_tx: Sender<SwitcherEvent>,
osc_rx: Option<Receiver<OscMessage>>,
task: JoinHandle<Result<()>>,
}
impl SwitcherHandle {
pub fn run() -> Self {
let (osc_tx, osc_rx) = async_channel::unbounded();
let (events_tx, events_rx) = async_channel::unbounded();
let state = SwitcherState::new();
let task = task::spawn(run_loop(state, events_rx, osc_tx));
SwitcherHandle {
events_tx,
osc_rx: Some(osc_rx),
task,
}
}
pub async fn join(self) -> Result<()> {
self.task.await
}
pub async fn send(&self, event: SwitcherEvent) -> Result<()> {
log::debug!("switcher got event: {:?}", event);
self.events_tx.send(event).await?;
Ok(())
}
pub fn sender(&self) -> Sender<SwitcherEvent> {
self.events_tx.clone()
}
pub fn take_receiver(&mut self) -> Option<Receiver<OscMessage>> {
self.osc_rx.take()
}
}
pub async fn run_loop(
mut state: SwitcherState,
mut events_rx: Receiver<SwitcherEvent>,
osc_tx: Sender<OscMessage>,
) -> Result<()> {
state.init();
loop {
while let Some(message) = state.pop_message() {
osc_tx.send(message).await?;
}
if let Some(event) = events_rx.next().await {
state.on_event(event);
}
}
}
impl SwitcherState {
pub fn new() -> Self {
Default::default()
}
pub fn init(&mut self) {
let num_channels = 4;
for i in 0..num_channels {
let channel = ChannelState::new(i);
self.channels.push(channel);
}
// self.messages.push("/*", ("xmit", 1));
self.messages.push("/mixer/out/0/bus/0/on", (1.0,));
self.messages.push("/mixer/bus/0/ch/0/on", (1.0,));
self.update_state();
}
pub fn on_event(&mut self, event: SwitcherEvent) {
match event {
SwitcherEvent::Enable(channel) => self.enable_channel(channel),
SwitcherEvent::Disable(channel) => self.disable_channel(channel),
}
self.update_state();
}
pub fn enable_channel(&mut self, channel_id: ChannelId) {
if let Some(channel) = self.channels.iter_mut().find(|c| c.id == channel_id) {
if !channel.locked {
channel.enabled = true;
}
}
}
pub fn disable_channel(&mut self, channel_id: ChannelId) {
if let Some(channel) = self.channels.iter_mut().find(|c| c.id == channel_id) {
if !channel.locked {
channel.enabled = false;
}
}
}
fn update_state(&mut self) {
let next_active_channel = self
.channels
.iter()
.rev()
.find(|c| c.enabled)
.or(self.channels.get(0))
.unwrap()
.id;
if next_active_channel != self.active_channel {
self.active_channel = next_active_channel;
self.on_channel_change();
}
}
fn on_channel_change(&mut self) {
for channel in self.channels.iter() {
if channel.id == self.active_channel {
self.messages
.push(format!("/mixer/bus/0/ch/{}/on", channel.id), (1.0,));
} else {
self.messages
.push(format!("/mixer/bus/0/ch/{}/on", channel.id), (0.0,));
}
}
}
fn pop_message(&mut self) -> Option<OscMessage> {
self.messages.pop()
}
}
pub struct Messages {
inner: VecDeque<OscMessage>,
}
impl Default for Messages {
fn default() -> Self {
Self::new()
}
}
impl Messages {
pub fn new() -> Self {
Self {
inner: VecDeque::new(),
}
}
fn push<T>(&mut self, addr: impl ToString, args: T)
where
T: IntoOscArgs,
{
self.inner.push_back(osc_message(addr, args));
}
fn pop(&mut self) -> Option<OscMessage> {
self.inner.pop_front()
}
}
fn osc_message<T>(addr: impl ToString, args: T) -> OscMessage
where
T: IntoOscArgs,
{
// let args: Vec<OscType> = args.into_iter().map(|a| a.into()).collect();
let args = args.into_osc_args();
let addr = addr.to_string();
OscMessage { addr, args }
}
pub trait IntoOscArgs {
fn into_osc_args(self) -> Vec<OscType>;
}
impl<T> IntoOscArgs for Vec<T>
where
T: Into<OscType>,
{
fn into_osc_args(self) -> Vec<OscType> {
let args: Vec<OscType> = self.into_iter().map(|a| a.into()).collect();
args
}
}
impl<T1> IntoOscArgs for (T1,)
where
T1: Into<OscType>,
{
fn into_osc_args(self) -> Vec<OscType> {
vec![self.0.into()]
}
}
impl<T1, T2> IntoOscArgs for (T1, T2)
where
T1: Into<OscType>,
T2: Into<OscType>,
{
fn into_osc_args(self) -> Vec<OscType> {
vec![self.0.into(), self.1.into()]
}
}
impl<T1, T2, T3> IntoOscArgs for (T1, T2, T3)
where
T1: Into<OscType>,
T2: Into<OscType>,
T3: Into<OscType>,
{
fn into_osc_args(self) -> Vec<OscType> {
vec![self.0.into(), self.1.into(), self.2.into()]
}
}
impl IntoOscArgs for OscType {
fn into_osc_args(self) -> Vec<OscType> {
vec![self]
}
}
// impl IntoOscArgs for Vec<OscType> {
// fn into_osc_args(self) -> Vec<OscType> {
// self
// }
// }

78
src/udp.rs Normal file
View file

@ -0,0 +1,78 @@
use async_std::net::{ToSocketAddrs, UdpSocket};
use async_std::stream::{Stream, StreamExt};
use futures_lite::future::Future;
use futures_lite::ready;
use std::io::{Result};
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
pub struct UdpStream {
socket: Arc<UdpSocket>,
fut: Option<Pin<Box<dyn Future<Output = Result<(Vec<u8>, usize, SocketAddr)>> + Send + Sync>>>,
buf: Option<Vec<u8>>,
}
impl UdpStream {
pub fn new(socket: Arc<UdpSocket>) -> Self {
let buf = vec![0u8; 1024 * 64];
Self {
socket,
fut: None,
buf: Some(buf),
}
}
pub fn get_ref(&self) -> &UdpSocket {
&self.socket
}
pub fn clone_socket(&self) -> Arc<UdpSocket> {
self.socket.clone()
}
pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], addrs: A) -> Result<usize> {
self.socket.send_to(buf, addrs).await
}
}
impl Stream for UdpStream {
type Item = Result<(Vec<u8>, SocketAddr)>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if self.fut.is_none() {
let buf = self.buf.take().unwrap();
let fut = recv_next(self.socket.clone(), buf);
self.fut = Some(Box::pin(fut));
}
if let Some(f) = &mut self.fut {
let res = ready!(f.as_mut().poll(cx));
self.fut = None;
return match res {
Err(e) => Poll::Ready(Some(Err(e))),
Ok((buf, n, addr)) => {
let res_buf = buf[..n].to_vec();
self.buf = Some(buf);
Poll::Ready(Some(Ok((res_buf, addr))))
}
};
}
}
}
}
async fn recv_next(
socket: Arc<UdpSocket>,
mut buf: Vec<u8>,
) -> Result<(Vec<u8>, usize, SocketAddr)> {
// let mut buf = vec![0u8; 1024];
let res = socket.recv_from(&mut buf).await;
match res {
Err(e) => Err(e),
Ok((n, addr)) => Ok((buf, n, addr)),
}
}