import fastify from 'fastify' import debug from 'debug' import { EventEmitter } from 'events' import p from 'path' import { fileURLToPath } from 'url' import { spawn } from 'child_process' import minimist from 'minimist' import { Transform, Readable } from 'streamx' import fastifyStatic from 'fastify-static' import fastifyCors from 'fastify-cors' import fastifySSE from 'fastify-sse-v2' import fastifyProxy from 'fastify-http-proxy' //import { Darkice } from './darkice.mjs' import { Darkice } from './ices2.mjs' import { Baresip } from './baresip.mjs' import { AlsaMeter } from './lib/ffmpeg-meter.mjs' import { ConnectivityCheck } from './lib/is-online.mjs' import { reducer, action, CONNECTION_STATE, STATE_RESET, PING, METER, INITIAL_STATE } from './state.mjs' const __dirname = p.dirname(fileURLToPath(import.meta.url)) function deviceToFFMPEG (device) { const [host, ...rest] = device.split(':') return { host, device: rest.join(':') } } export async function run (config = {}) { const app = fastify() const darkice = new Darkice() const baresip = new Baresip({ destination: 901 }) const isOnline = new ConnectivityCheck() const meter = new AlsaMeter({ input: deviceToFFMPEG(config.input) }) meter.open().catch(console.error) const actions = new Readable() actions.setMaxListeners(1000) const state = { current: INITIAL_STATE, dispatch: action => (state.current = reducer(state.current, action)), } actions.on('data', action => { state.dispatch(action) // console.log('action', action) }) darkice.stream.on('data', action => actions.push(action)) baresip.stream.on('data', action => actions.push(action)) isOnline.on('data', action => actions.push(action)) meter.on('meter', data => actions.push(action(METER, data))) const streams = { darkice, baresip } // const state = new StateContainer(reducer, INITIAL_STATE) // state.ingestActionStream(actions) if (config.dev) { app.register(fastifyProxy, { upstream: 'http://localhost:3000', prefix: '/' }) } else { app.register(fastifyStatic, { root: p.join(__dirname, '..', 'frontend', 'dist'), prefix: '/' }) } app.register(fastifyCors, { origin: '*' }) app.register(fastifySSE) app.get('/sse', async (req, reply) => { console.log('sse req') const stream = new Transform({ transform (action, cb) { if (!this._id) this._id = 0 this.push({ data: JSON.stringify(action), id: ++this._id }) cb() } }) process.nextTick(() => { stream.write(action(STATE_RESET, state.current)) stream.write(action(CONNECTION_STATE, { connected: true })) actions.on('data', action => stream.write(action)) }) reply.sse(stream) }) class InvalidIdError extends Error { constructor (message, statusCode) { super(message || 'Invalid Stream ID') this.statusCode = statusCode || 400 } } app.post('/command', async (req, reply) => { const { command, args } = req.body const { id } = args if (!streams[id]) throw new InvalidIdError() const handler = streams[id] switch (command) { case 'start': await handler.start() break case 'stop': await handler.stop() break case 'restart': await handler.restart() break } return { id, error: false } }) app.setErrorHandler(function (error, request, reply) { console.error('error', error) if (!error || typeof error !== 'object') error = { message: error } reply.code(error.statusCode || 500) reply.send({ error: error.message || 'Internal server error' }) }) await app.listen(config.port, config.host) console.log(`Server running at http://${config.host}:${config.port}/`) }