This commit is contained in:
Franz Heinzmann (Frando) 2021-10-18 21:49:41 +02:00
parent 1ad243f043
commit 392f8f8a2c
10 changed files with 405 additions and 265 deletions

View file

@ -16,3 +16,6 @@ futures-lite = "1.11.3"
log = "0.4.14"
env_logger = "0.8.3"
regex = "1.4.3"
async-osc = "0.2.0"
tide = "0.16.0"
tide-websockets = "0.2.0"

View file

@ -1,4 +1,4 @@
pub mod osc;
pub mod spawn;
pub mod switcher;
pub mod udp;
pub mod types;
pub mod ws;

View file

@ -1,15 +1,19 @@
use async_std::channel::{self, Receiver, Sender};
use async_std::stream::StreamExt;
use async_std::sync::{Arc, Mutex, RwLock};
use async_std::task::{self, JoinHandle};
use log::*;
use rosc::{OscMessage, OscPacket, OscType};
use studiox::osc::OscUdpStream;
use std::collections::{HashMap, HashSet, VecDeque};
use std::net::SocketAddr;
use studiox::spawn::FaustHandle;
use studiox::switcher::{SwitcherError, SwitcherEvent, SwitcherHandle};
use studiox::switcher::{SwitcherEvent, SwitcherHandle};
use async_osc::{prelude::*, OscMessage, OscPacket, OscSender, OscSocket, OscType as Ty};
pub enum Event {
FaustRx(Result<OscPacket, SwitcherError>),
FaustRx(Result<(OscPacket, SocketAddr), async_osc::Error>),
Switcher(OscMessage),
CommandRx(Result<OscPacket, SwitcherError>),
CommandRx(Result<(OscPacket, SocketAddr), async_osc::Error>),
}
#[async_std::main]
@ -18,11 +22,11 @@ async fn main() -> anyhow::Result<()> {
let faust_handle = FaustHandle::spawn().await;
match faust_handle.as_ref() {
Ok(handle) => {
info!("FAUST mixer launched: {:?}", handle)
Ok(_handle) => {
info!("FAUST mixer launched successfully.")
}
Err(e) => {
error!("Launching FAUST mixer failed: {:?}", e);
error!("Launching FAUST mixer failed: {}", e);
}
}
let mut faust_handle = faust_handle?;
@ -43,14 +47,23 @@ async fn main_loop(faust_handle: &FaustHandle) -> anyhow::Result<()> {
let addr_commands = format!("localhost:{}", port_commands);
let mut switcher = SwitcherHandle::run();
let switcher_rx = switcher.take_receiver().unwrap().map(Event::Switcher);
let switcher_events = switcher.take_receiver().unwrap().map(Event::Switcher);
let faust_rx = OscUdpStream::bind(addr_faust_rx).await?.map(Event::FaustRx);
let command_osc_rx = OscUdpStream::bind(addr_commands).await?;
let command_osc_tx = command_osc_rx.sender();
let command_osc_rx = command_osc_rx.map(Event::CommandRx);
let (faust_recv_tx, faust_recv_rx) = channel::unbounded();
let mut events = faust_rx.merge(switcher_rx).merge(command_osc_rx);
let faust_osc_socket = OscSocket::bind(addr_faust_rx).await?;
let faust_osc_events = faust_osc_socket.map(Event::FaustRx);
let command_osc_socket = OscSocket::bind(addr_commands).await?;
let command_osc_sender = command_osc_socket.sender();
let command_osc_events = command_osc_socket.map(Event::CommandRx);
let faust_sender = OscSocket::bind("localhost:0").await?;
faust_sender.connect(&addr_faust_tx).await?;
let ws_to_faust_task = task::spawn(ws_to_faust_loop(faust_sender.sender(), faust_recv_rx));
let mut events = faust_osc_events
.merge(switcher_events)
.merge(command_osc_events);
info!("Listening for OSC commands on localhost:{}", port_commands);
while let Some(event) = events.next().await {
@ -58,15 +71,21 @@ async fn main_loop(faust_handle: &FaustHandle) -> anyhow::Result<()> {
Event::Switcher(message) => {
debug!("switcher tx: {:?}", message);
let packet = OscPacket::Message(message);
command_osc_tx.send_to(packet, &addr_faust_tx).await?;
command_osc_sender.send_to(packet, &addr_faust_tx).await?;
}
Event::FaustRx(packet) => {
debug!("faust rx: {:?}", packet);
let _packet = packet?;
let (packet, _peer_addr) = packet?;
match packet {
OscPacket::Message(message) => {
faust_recv_tx.send(message).await?;
}
_ => {}
}
}
Event::CommandRx(packet) => {
debug!("command rx: {:?}", packet);
let packet = packet?;
let (packet, _peer_addr) = packet?;
let event = parse_switcher_command(packet);
if let Some(event) = event {
switcher.send(event).await?;
@ -76,14 +95,173 @@ async fn main_loop(faust_handle: &FaustHandle) -> anyhow::Result<()> {
}
switcher.join().await?;
ws_to_faust_task.await?;
// child_task.await?;
Ok(())
}
struct PeakThrottler {
peaks: HashMap<u16, u16>,
}
impl PeakThrottler {
fn new() -> Self {
Self {
peaks: HashMap::new(),
}
}
fn encode(&self) -> Vec<u8> {
let mut buf = vec![0u8; 48 * 2];
for offset in 0..47 {
let val = self.peaks.get(&offset).unwrap_or(&0);
let bytes = val.to_be_bytes();
let start = offset as usize * 2;
let end = start + 2;
// let slice = &mut buf[start..end];
buf[start..end].copy_from_slice(&bytes[..]);
}
buf
}
fn send(&mut self, message: &OscMessage) -> bool {
let num_in = 8;
let num_out = 8;
let num_bus = 4;
// peak map:
// 8 ins, 8 out, 4 * 8 bus ins
// = 48 slots
// each slot: 1 byte uint8 (0-256)
// total: 48 bytes
let parts: Vec<&str> = message.addr.split("/").skip(2).collect();
let indexes = match &parts[..] {
&["in", idx, "level"] => {
let idx: u32 = idx.parse().unwrap();
Some((0, idx))
}
&["out", idx, "level"] => {
let idx: u32 = idx.parse().unwrap();
Some((1, idx))
}
&["bus", bus, "ch", idx, "level"] => {
let bus: u32 = bus.parse().unwrap();
let idx: u32 = idx.parse().unwrap();
Some((2 + bus, idx))
}
_ => None,
};
if let Some(indexes) = indexes {
if let Some(Ty::Float(peak)) = message.args.get(0) {
let val: f32 = (*peak + 70.) * 800.;
let val: u16 = val.round() as u16;
let offset: u16 = indexes.0 as u16 * 8u16 + indexes.1 as u16;
self.peaks.insert(offset, val);
// eprintln!("off {} val {}", offset, val);
return true;
}
}
false
}
}
async fn ws_to_faust_loop(
faust_sender: OscSender,
mut faust_recv_rx: Receiver<OscMessage>,
) -> Result<(), async_osc::Error> {
use studiox::ws::WsEvent;
use tide_websockets::Message as WsMessage;
let mut ws_handle = studiox::ws::start();
let ws_sender = ws_handle.sender();
let peers = Arc::new(RwLock::new(HashSet::new()));
let peaks = Arc::new(Mutex::new(PeakThrottler::new()));
let ws_rx_task = task::spawn({
let peers = peers.clone();
async move {
while let Some(event) = ws_handle.next().await {
// let (message, peer_addr) = event;
match event {
WsEvent::Connect(peer_addr) => {
peers.write().await.insert(peer_addr);
}
WsEvent::Disconnect(peer_addr) => {
peers.write().await.remove(&peer_addr);
}
WsEvent::Message(WsMessage::Binary(buf), peer_addr) => {
let osc_message = rosc::decoder::decode(&buf)?;
eprintln!("recv: {:?}", osc_message);
faust_sender.send(osc_message).await?;
eprintln!("did forward.");
}
_ => {}
};
// eprintln!("recv: {:?}", event);
}
Ok::<(), async_osc::Error>(())
}
});
let send_peaks_task: JoinHandle<Result<(), async_osc::Error>> = task::spawn({
let sleep_time = 200;
let peaks = peaks.clone();
let peers = peers.clone();
let ws_sender = ws_sender.clone();
async move {
loop {
timeout(sleep_time).await;
let buf = peaks.lock().await.encode();
let message = OscMessage {
addr: "/peaks".into(),
args: vec![Ty::Blob(buf)],
};
let message = rosc::encoder::encode(&OscPacket::Message(message))?;
let message = WsMessage::Binary(message);
let peers = peers.read().await;
for peer in peers.iter() {
ws_sender
.send((message.clone(), peer.clone()))
.await
.unwrap();
}
}
}
});
let ws_tx_task = task::spawn(async move {
while let Some(message) = faust_recv_rx.next().await {
let is_peak = peaks.lock().await.send(&message);
if !is_peak {
let buf = rosc::encoder::encode(&OscPacket::Message(message))?;
let ws_message = WsMessage::Binary(buf);
let peers = peers.read().await;
for peer in peers.iter() {
ws_sender
.send((ws_message.clone(), peer.clone()))
.await
.unwrap();
}
}
}
Ok::<(), async_osc::Error>(())
});
ws_rx_task.await?;
ws_tx_task.await?;
send_peaks_task.await?;
Ok(())
}
async fn timeout(ms: u64) {
let _ = async_std::future::timeout(
std::time::Duration::from_millis(ms),
futures_lite::future::pending::<()>(),
)
.await;
}
fn parse_switcher_command(packet: OscPacket) -> Option<SwitcherEvent> {
let event = match packet {
OscPacket::Message(message) => match (&message.addr as &str, &message.args[..]) {
("/switcher", [OscType::Int(channel_id), OscType::Int(on_value)]) => match on_value {
OscPacket::Message(message) => match message.as_tuple() {
("/switcher", [Ty::Int(channel_id), Ty::Int(on_value)]) => match on_value {
1 => Some(SwitcherEvent::Enable(*channel_id as usize)),
0 => Some(SwitcherEvent::Disable(*channel_id as usize)),
_ => {

View file

@ -1,9 +0,0 @@
pub type ChannelId = usize;
pub type BusId = usize;
pub type OutputId = usize;
pub enum MixerCommand {
Volume(ChannelId, f32),
}
pub fn on_command() {}

View file

@ -1,155 +0,0 @@
use async_std::net::{ToSocketAddrs, UdpSocket};
use async_std::stream::Stream;
use futures_lite::ready;
use rosc::{OscMessage, OscPacket, OscType};
use std::io;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::switcher::SwitcherError;
use crate::udp::UdpStream;
pub struct OscUdpStream {
stream: UdpStream,
}
impl OscUdpStream {
pub fn new(socket: UdpSocket) -> Self {
let socket = Arc::new(socket);
let stream = UdpStream::new(socket);
Self { stream }
}
pub async fn bind<A: ToSocketAddrs>(addr: A) -> Result<Self, SwitcherError> {
let socket = UdpSocket::bind(addr).await?;
Ok(Self::new(socket))
}
pub async fn send_to<A: ToSocketAddrs>(
&self,
packet: OscPacket,
addrs: A,
) -> Result<(), SwitcherError> {
send_osc(&self.stream.get_ref(), packet, addrs).await
}
pub fn sender(&self) -> OscSender {
OscSender::new(self.stream.clone_socket())
}
}
impl Stream for OscUdpStream {
type Item = Result<OscPacket, SwitcherError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let packet = ready!(Pin::new(&mut self.stream).poll_next(cx));
let message = match packet {
None => None,
Some(packet) => Some(match packet {
Err(err) => Err(err.into()),
Ok(buf) => rosc::decoder::decode(&buf.0[..]).map_err(|e| e.into()),
}),
};
Poll::Ready(message)
}
}
#[derive(Clone)]
pub struct OscSender {
socket: Arc<UdpSocket>,
}
impl OscSender {
pub fn new(socket: Arc<UdpSocket>) -> Self {
Self { socket }
}
pub async fn send_to<A: ToSocketAddrs>(
&self,
packet: OscPacket,
addrs: A,
) -> Result<(), SwitcherError> {
send_osc(&self.socket, packet, addrs).await
}
}
async fn send_osc<A: ToSocketAddrs>(
socket: &UdpSocket,
packet: OscPacket,
addrs: A,
) -> Result<(), SwitcherError> {
let buf = rosc::encoder::encode(&packet)?;
let n = socket.send_to(&buf[..], addrs).await?;
if n != buf.len() {
Err(io::Error::new(io::ErrorKind::Interrupted, "udp packet was not fully sent").into())
} else {
Ok(())
}
}
pub trait OscMessageExt {
fn new<T>(addr: impl ToString, args: T) -> OscMessage
where
T: IntoOscArgs;
}
impl OscMessageExt for OscMessage {
fn new<T>(addr: impl ToString, args: T) -> OscMessage
where
T: IntoOscArgs,
{
let args = args.into_osc_args();
let addr = addr.to_string();
OscMessage { addr, args }
}
}
pub trait IntoOscArgs {
fn into_osc_args(self) -> Vec<OscType>;
}
impl<T> IntoOscArgs for Vec<T>
where
T: Into<OscType>,
{
fn into_osc_args(self) -> Vec<OscType> {
let args: Vec<OscType> = self.into_iter().map(|a| a.into()).collect();
args
}
}
impl<T1> IntoOscArgs for (T1,)
where
T1: Into<OscType>,
{
fn into_osc_args(self) -> Vec<OscType> {
vec![self.0.into()]
}
}
impl<T1, T2> IntoOscArgs for (T1, T2)
where
T1: Into<OscType>,
T2: Into<OscType>,
{
fn into_osc_args(self) -> Vec<OscType> {
vec![self.0.into(), self.1.into()]
}
}
impl<T1, T2, T3> IntoOscArgs for (T1, T2, T3)
where
T1: Into<OscType>,
T2: Into<OscType>,
T3: Into<OscType>,
{
fn into_osc_args(self) -> Vec<OscType> {
vec![self.0.into(), self.1.into(), self.2.into()]
}
}
impl IntoOscArgs for OscType {
fn into_osc_args(self) -> Vec<OscType> {
vec![self]
}
}

View file

@ -16,7 +16,7 @@ pub struct FaustHandle {
impl FaustHandle {
pub async fn spawn() -> Result<Self, std::io::Error> {
let exec = "/home/bit/Code/RDL/studiox/studiox-next/mixer-jackconsole";
let exec = "/home/bit/Code/studiox/studiox-mixer/mixer-jackconsole";
let mut child = Command::new(exec)
.arg(".")
.stdout(Stdio::piped())

View file

@ -5,7 +5,7 @@ use rosc::OscMessage;
use std::collections::VecDeque;
use thiserror::Error;
use crate::osc::{IntoOscArgs, OscMessageExt};
use async_osc::prelude::*;
#[derive(Error, Debug)]
pub enum SwitcherError {
@ -125,7 +125,7 @@ impl SwitcherState {
let channel = ChannelState::new(i);
self.channels.push(channel);
}
// self.messages.push("/*", ("xmit", 1));
self.messages.push("/*", ("xmit", 1));
self.messages.push("/mixer/out/0/bus/0/on", (1.0,));
self.messages.push("/mixer/bus/0/ch/0/on", (1.0,));
self.update_state();

75
src/types.rs Normal file
View file

@ -0,0 +1,75 @@
use async_osc::{prelude::OscMessageExt, OscMessage, OscType};
pub type Id = u32;
pub type ChannelId = u32;
pub type InputId = u32;
pub type OutputId = u32;
pub type BusId = u32;
pub enum Command {
ChannelLabel(u32, String),
BusChannelVolume(u32, u32, f32),
BusChannelActive(u32, u32, bool),
}
pub enum Event {
Peaks(Vec<u8>),
ChannelLabel(u32, String),
}
impl Event {
pub fn into_osc_message(self, prefix: String) -> Option<OscMessage> {
match self {
Self::ChannelLabel(id, label) => {
let path = format!("{}/channel/{}/label", prefix, id);
Some(OscMessage::new(path, vec![OscType::String(label)]))
}
_ => None,
}
}
pub fn from_osc_message(prefix: String, message: OscMessage) -> Option<Self> {
None
}
}
pub struct State {}
pub struct Bus {
id: BusId,
channels: Vec<u32>,
}
pub struct Channel {
id: ChannelId,
input: InputId,
bus: BusId,
label: String,
active: bool,
peak: f32,
volume: f32,
}
pub struct Mixer {
buses: Vec<Bus>,
channels: Vec<Channel>,
}
impl Mixer {
pub fn emit(&mut self, event: Event) {}
pub fn save_state(&mut self) {}
pub fn command(&mut self, command: Command) {
match command {
Command::ChannelLabel(id, label) => {
if let Some(mut channel) = self.channels.get_mut(id as usize) {
channel.label = label.clone();
self.emit(Event::ChannelLabel(id, label));
self.save_state();
}
}
_ => {}
}
}
}
pub struct Output {}

View file

@ -1,76 +0,0 @@
use async_std::net::{ToSocketAddrs, UdpSocket};
use async_std::stream::Stream;
use futures_lite::future::Future;
use futures_lite::ready;
use std::io::Result;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
pub struct UdpStream {
socket: Arc<UdpSocket>,
fut: Option<Pin<Box<dyn Future<Output = Result<(Vec<u8>, usize, SocketAddr)>> + Send + Sync>>>,
buf: Option<Vec<u8>>,
}
impl UdpStream {
pub fn new(socket: Arc<UdpSocket>) -> Self {
let buf = vec![0u8; 1024 * 64];
Self {
socket,
fut: None,
buf: Some(buf),
}
}
pub fn get_ref(&self) -> &UdpSocket {
&self.socket
}
pub fn clone_socket(&self) -> Arc<UdpSocket> {
self.socket.clone()
}
pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], addrs: A) -> Result<usize> {
self.socket.send_to(buf, addrs).await
}
}
impl Stream for UdpStream {
type Item = Result<(Vec<u8>, SocketAddr)>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if self.fut.is_none() {
let buf = self.buf.take().unwrap();
let fut = recv_next(self.socket.clone(), buf);
self.fut = Some(Box::pin(fut));
}
if let Some(f) = &mut self.fut {
let res = ready!(f.as_mut().poll(cx));
self.fut = None;
return match res {
Err(e) => Poll::Ready(Some(Err(e))),
Ok((buf, n, addr)) => {
let res_buf = buf[..n].to_vec();
self.buf = Some(buf);
Poll::Ready(Some(Ok((res_buf, addr))))
}
};
}
}
}
}
async fn recv_next(
socket: Arc<UdpSocket>,
mut buf: Vec<u8>,
) -> Result<(Vec<u8>, usize, SocketAddr)> {
// let mut buf = vec![0u8; 1024];
let res = socket.recv_from(&mut buf).await;
match res {
Err(e) => Err(e),
Ok((n, addr)) => Ok((buf, n, addr)),
}
}

124
src/ws.rs Normal file
View file

@ -0,0 +1,124 @@
use async_std::channel::{self, Receiver, Sender};
use async_std::stream::{Stream, StreamExt};
use async_std::sync::{Arc, Mutex};
use async_std::task::{self, JoinHandle};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use tide::prelude::*;
use tide_websockets::{Message, WebSocket, WebSocketConnection};
#[derive(Debug, Clone)]
pub enum WsEvent {
Message(Message, SocketAddr),
Connect(SocketAddr),
Disconnect(SocketAddr),
}
pub struct WsHandle {
task: JoinHandle<Result<(), tide::Error>>,
outgoing_tx: Sender<(Message, SocketAddr)>,
incoming_rx: Option<Receiver<WsEvent>>,
}
impl WsHandle {
pub async fn send_to(&self, message: Message, peer_addr: SocketAddr) {
self.outgoing_tx.send((message, peer_addr)).await.unwrap()
}
pub fn sender(&self) -> Sender<(Message, SocketAddr)> {
self.outgoing_tx.clone()
}
pub fn take_receiver(&mut self) -> Option<Receiver<WsEvent>> {
self.incoming_rx.take()
}
}
impl Stream for WsHandle {
type Item = WsEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(receiver) = self.incoming_rx.as_mut() {
Pin::new(receiver).poll_next(cx)
} else {
Poll::Ready(None)
}
}
}
pub fn start() -> WsHandle {
let (outgoing_tx, outgoing_rx) = channel::unbounded();
let (incoming_tx, incoming_rx) = channel::unbounded();
let task = run(outgoing_rx, incoming_tx);
WsHandle {
task,
outgoing_tx,
incoming_rx: Some(incoming_rx),
}
}
pub fn run(
mut outgoing_rx: Receiver<(Message, SocketAddr)>,
incoming_tx: Sender<WsEvent>,
) -> JoinHandle<Result<(), tide::Error>> {
let peers: HashMap<SocketAddr, WebSocketConnection> = HashMap::new();
let peers = Arc::new(Mutex::new(peers));
let mut app = tide::new();
let send_task = task::spawn({
let peers = peers.clone();
async move {
while let Some((message, peer_addr)) = outgoing_rx.next().await {
let peers = peers.lock().await;
if let Some(peer) = peers.get(&peer_addr) {
match message {
Message::Text(message) => peer.send_string(message).await?,
Message::Binary(message) => peer.send_bytes(message).await?,
_ => {}
}
}
}
Ok::<(), tide_websockets::Error>(())
}
});
app.at("/")
.serve_dir("/home/bit/Code/studiox/studiox/frontend/build")
.unwrap();
app.at("/ws")
.get(WebSocket::new(move |request, mut stream| {
let incoming_tx = incoming_tx.clone();
let peers = peers.clone();
let peer_addr: SocketAddr = request.peer_addr().unwrap().parse().unwrap();
async move {
incoming_tx
.send(WsEvent::Connect(peer_addr.clone()))
.await
.unwrap();
peers.lock().await.insert(peer_addr, stream.clone());
while let Some(Ok(message)) = stream.next().await {
incoming_tx
.send(WsEvent::Message(message, peer_addr.clone()))
.await?;
}
peers.lock().await.remove(&peer_addr);
incoming_tx
.send(WsEvent::Disconnect(peer_addr))
.await
.unwrap();
Ok(())
}
}));
let main_task: JoinHandle<Result<(), tide::Error>> = task::spawn(async move {
let addr = "127.0.0.1:8080";
app.listen(addr).await?;
log::info!("ws listening on {}", addr);
send_task.await?;
Ok(())
});
main_task
}