initial commit

This commit is contained in:
Franz Heinzmann (Frando) 2021-06-07 16:22:13 +02:00
commit 0024f44c80
34 changed files with 1991 additions and 0 deletions

103
backend/baresip.mjs Normal file
View file

@ -0,0 +1,103 @@
import p from 'path'
import split2 from 'split2'
import { Readable } from 'streamx'
import { spawn } from 'child_process'
import { fileURLToPath } from 'url';
import BaresipWrapper from './lib/baresip-wrapper.mjs'
const __dirname = p.dirname(fileURLToPath(import.meta.url))
import { action, STREAM_STATUS, LOG_MESSAGE } from './state.mjs'
export class Baresip {
constructor (opts = {}) {
this.command = opts.command || 'baresip'
this.config = opts.config || p.join(__dirname, '..', 'etc', 'baresip')
this.stream = new Readable()
this.log = []
this.stream.on('data', row => this.log.push(row))
this.id = 'baresip'
this.destinationNumber = opts.destination || 901
}
restart () {
if (!this.baresip) return this.start()
this.stop(() => {
process.nextTick(() => {
this.start()
})
})
}
stop (cb) {
// this.process.kill()
if (!this.baresip) return
this.baresip.kill(() => {
this.stream.push(action(STREAM_STATUS, { id: this.id, status: 'stopped' }))
if (cb) cb()
})
}
_onready () {
console.log('baresip onready')
}
_oncallestablished (number) {
console.log('call established to', number)
this.stream.push(action(STREAM_STATUS, { id: this.id, status: 'started' }))
}
_onhangup (number) {
console.log('remote hangup', number)
this.stream.push(action(STREAM_STATUS, { id: this.id, status: 'stopped' }))
}
_onserverconnected (...args) {
console.log('onserverconnected', args)
setTimeout(() => {
this.baresip.dial(this.destinationNumber)
}, 1000)
}
start () {
const args = ['-f', this.config]
this.baresip = new BaresipWrapper({
command: this.command,
args,
callbacks: {
ready: this._onready.bind(this),
hangUp: this._onhangup.bind(this),
callEstablished: this._oncallestablished.bind(this),
serverConnected: this._onserverconnected.bind(this)
}
})
this.baresip.stream.on('data', message => {
// console.log('BARESIP', message)
this.stream.push(action(LOG_MESSAGE, { id: this.id, type: 'stdout', message }))
})
this.baresip.connect()
// if (this.process) return
// const args = ['-c', this.config, '-v', this.logLevel]
// this.process = spawn(this.command, args)
// this.process.stdout.pipe(split2()).on('data', message => {
// this.stream.push(action(LOG_MESSAGE, { id: this.id, type: 'stdout', message }))
// // this.stream.push({ type: 'stdout', data })
// })
// this.process.stderr.pipe(split2()).on('data', message => {
// this.stream.push(action(LOG_MESSAGE, { id: this.id, type: 'stdout', message }))
// // this.stream.push({ type: 'stderr', data })
// })
// this.process.on('close', () => {
// this.stream.push(action(STREAM_STATUS, { id: this.id, status: 'stopped' }))
// this.process = null
// })
// this.stream.push(action(STREAM_STATUS, { id: this.id, status: 'started' }))
// return new Promise((resolve, reject) => {
// resolve()
// })
}
}

32
backend/bin.mjs Executable file
View file

@ -0,0 +1,32 @@
import minimist from 'minimist'
import { run } from './server.mjs'
import createDebug from 'debug'
const debug = createDebug('streamer')
main().catch(onerror)
async function main () {
const args = minimist(process.argv.slice(2), {
default: {
port: 3030,
host: '0.0.0.0',
input: 'alsa:default',
output: 'alsa:default'
},
alias: {
p: 'port',
h: 'host',
i: 'input',
o: 'output'
}
})
await run(args)
}
function onerror (err) {
if (err) console.error(err instanceof Error ? err.message : String(err))
debug(err)
process.exit(1)
}

60
backend/darkice.mjs Normal file
View file

@ -0,0 +1,60 @@
import p from 'path'
import split2 from 'split2'
import { Readable } from 'streamx'
import { spawn } from 'child_process'
import { fileURLToPath } from 'url';
const __dirname = p.dirname(fileURLToPath(import.meta.url))
import { action, STREAM_STATUS, LOG_MESSAGE } from './state.mjs'
export class Darkice {
constructor (opts = {}) {
this.command = opts.command || 'darkice'
this.config = opts.config || p.join(__dirname, 'etc', '..', 'darkice.cfg')
this.logLevel = 5
this.stream = new Readable()
this.log = []
this.stream.on('data', row => this.log.push(row))
this.id = 'darkice'
}
restart () {
if (!this.process) return this.start()
this.process.once('close', () => {
process.nextTick(() => {
this.start()
})
})
this.stop()
}
stop () {
this.process.kill()
}
start () {
if (this.process) return
const args = ['-c', this.config, '-v', this.logLevel]
this.process = spawn(this.command, args)
this.process.stdout.pipe(split2()).on('data', message => {
this.stream.push(action(LOG_MESSAGE, { id: this.id, type: 'stdout', message }))
// this.stream.push({ type: 'stdout', data })
})
this.process.stderr.pipe(split2()).on('data', message => {
this.stream.push(action(LOG_MESSAGE, { id: this.id, type: 'stdout', message }))
// this.stream.push({ type: 'stderr', data })
})
this.process.on('close', () => {
this.stream.push(action(STREAM_STATUS, { id: this.id, status: 'stopped' }))
this.process = null
})
this.stream.push(action(STREAM_STATUS, { id: this.id, status: 'started' }))
return new Promise((resolve, reject) => {
resolve()
})
}
}

View file

@ -0,0 +1,104 @@
import { spawn } from 'child_process'
import split2 from 'split2'
import { get } from 'http'
import { Readable } from 'streamx'
import { fixPath } from 'os-dependent-path-delimiter'
import kill from 'tree-kill'
const eventRegexps = {
callEstablished: /Call established: (.+)/,
callReceived: /Incoming call from: ([\+\w]+ )?(\S+) -/,
hangUp: /(.+): session closed/,
ready: /baresip is ready/,
serverConnected: /\[\d+ bindings?\]/
}
const options = { host: '127.0.0.1', port: '8000', agent: false }
const nop = () => {}
const executeCommand = (command) => {
options.path = `/?${command}`
get(options, nop)
}
export default class Baresip {
constructor (opts) {
const { command, args, callbacks } = opts
this.connected = false
this.processPath = fixPath(command)
this.args = args
this.callbacks = {}
this.stream = new Readable()
Object.keys(eventRegexps).forEach((event) => {
this.on(event, callbacks[event] === undefined ? () => {} : callbacks[event])
});
[
'on',
'connect',
'kill',
'reload'
].forEach((method) => {
this[method] = this[method].bind(this)
})
}
accept () {
executeCommand('a')
}
dial (phoneNumber) {
executeCommand(`d${phoneNumber}`)
}
hangUp () {
executeCommand('b')
}
toggleCallMuted () {
executeCommand('m')
}
on (event, callback) {
this.callbacks[event] = callback
}
kill (callback) {
kill(this.process.pid, 'SIGKILL', (err) => {
if (!err) {
this.connected = false
if (callback !== undefined) {
callback()
}
}
})
}
reload () {
this.kill(() => this.connect())
}
connect () {
this.connected = true
this.process = spawn(this.processPath, this.args)
this.process.stdout.pipe(split2()).on('data', (data) => {
const parsedData = `${data}`
Object.keys(eventRegexps).forEach((event) => {
const matches = parsedData.match(eventRegexps[event])
if ((matches !== null) && (matches.length > 0)) {
this.callbacks[event](matches[matches.length - 1])
}
})
this.stream.push(parsedData)
// console.log(parsedData)
})
// this.process.stderr.pipe(split2()).on('data', (data) => this.stream.push(data))
}
}

View file

@ -0,0 +1,100 @@
import { spawn } from 'child_process'
import createDebug from 'debug'
import { EventEmitter } from 'events'
import process from 'process'
const debug = createDebug('meter')
export class AlsaMeter extends EventEmitter {
constructor (opts = {}) {
super()
if (!opts.input) this.input = { host: 'alsa', device: 'default' }
else if (typeof opts.input === 'string') this.input = { host: 'alsa', input: opts.input }
else this.input = opts.input
// setInterval(() => console.log(this._lastMessage), 1000)
}
open () {
if (!this._opening) this._opening = this._open()
return this._opening
}
async _open () {
const cmd = 'ffmpeg'
let args = [
'-nostats',
'-f',
this.input.host,
'-i',
this.input.device,
'-filter_complex',
'ebur128=peak=true',
'-f',
'null',
'-'
]
debug('spawn: ' + `${cmd} ${args.map(a => `"${a}"`).join(' ')}`)
this.proc = spawn(cmd, args)
// const onclose = (ev) => {
// this._closed = true
// this.proc.kill()
// }
// process.on('exit', onclose.bind('exit'))
// process.on('SIGINT', onclose.bind('SIGINT'))
// process.on('SIGTERM', onclose.bind('SIGTERM'))
this.proc.stderr.on('data', msg => {
this.parseMessage(msg)
})
return new Promise((resolve, reject) => {
this.proc.stderr.once('data', msg => {
this._open = true
resolve()
})
})
}
parseMessage (msg) {
let str = msg.toString('utf-8')
let match = str.match(/\[.*\](.*)/)
if (!match || match.length !== 2) return null
str = match[1].trim()
let parts = str.split(/([a-zA-Z]+:)/).filter(f => f)
let last = null
let map = parts.reduce((agg, token, i) => {
token = token.trim()
if (last) {
agg[last] = token
last = null
} else {
last = token.replace(/[:\s]/g, '')
}
return agg
}, {})
let res = {}
if (map.LRA) res.LRA = parseFloat(map.LRA.replace('LU', ''))
if (map.t) res.t = parseFloat(map.t)
if (map.TARGET) res.TARGET = parseFloat(map.TARGET.replace('LUFS', ''))
if (map.I) res.I = parseFloat(map.I.replace('LUFS', ''))
if (map.M) res.M = parseFloat(map.M)
if (map.S) res.S = parseFloat(map.S)
for (let key of ['FTPK', 'TPK']) {
if (map[key]) {
let parts = map[key].split(/\s+/)
if (parts.length >= 2) {
res[key] = [parseFloat(parts[0]), parseFloat(parts[1])]
} else {
res[key] = null
}
}
}
if (Object.keys(res).length) {
// debug(Object.entries(res).map(([k, v]) => `${k}: ${v}`).join(' | '))
this._lastMessage = res
this.emit('meter', res)
}
}
}

34
backend/lib/is-online.mjs Normal file
View file

@ -0,0 +1,34 @@
import isOnline from 'is-online'
import { Readable } from 'streamx'
import { action, CONNECTIVITY } from '../state.mjs'
export class ConnectivityCheck extends Readable {
constructor (opts = {}) {
super()
this.opts = opts
this.retryInterval = opts.retryInterval || 2000
this._action({ internet: false })
this._update()
}
async _update () {
try {
const hasInternet = await isOnline(this.opts)
this._action({ internet: hasInternet })
} catch (err) {
this._action({ internet: false })
} finally {
this._timeout = setTimeout(() => this._update(), this.retryInterval)
}
}
_destroy () {
if (this._timeout) clearTimeout(this._timeout)
}
_action (data) {
if (this.destroyed) return
this.push(action(CONNECTIVITY, data))
}
}

37
backend/package.json Normal file
View file

@ -0,0 +1,37 @@
{
"name": "streamer",
"version": "1.0.0",
"main": "server.mjs",
"type": "module",
"license": "MIT",
"bin": "bin.mjs",
"scripts": {
"start": "node bin.mjs"
},
"dependencies": {
"baresip-wrapper": "^1.0.10",
"debug": "^4.3.1",
"fastify": "^3.15.1",
"fastify-cors": "^6.0.1",
"fastify-sse-v2": "^2.0.4",
"fastify-static": "^4.0.1",
"is-online": "^9.0.0",
"minimist": "^1.2.5",
"split2": "^3.2.2",
"streamx": "^2.10.3"
},
"pkg": {
"assets": [
"etc/**",
"frontend/dist/**"
],
"targets": [
"node14-linux-arm64",
"node14-linux-x64"
],
"outputPath": "dist"
},
"devDependencies": {
"caxa": "^1.0.0"
}
}

125
backend/server.mjs Normal file
View file

@ -0,0 +1,125 @@
import fastify from 'fastify'
import debug from 'debug'
import { EventEmitter } from 'events'
import p from 'path'
import { fileURLToPath } from 'url'
import { spawn } from 'child_process'
import minimist from 'minimist'
import { Transform, Readable } from 'streamx'
import fastifyStatic from 'fastify-static'
import fastifyCors from 'fastify-cors'
import fastifySSE from 'fastify-sse-v2'
import { Darkice } from './darkice.mjs'
import { Baresip } from './baresip.mjs'
import { AlsaMeter } from './lib/ffmpeg-meter.mjs'
import { ConnectivityCheck } from './lib/is-online.mjs'
import { reducer, action, CONNECTION_STATE, STATE_RESET, PING, METER, INITIAL_STATE } from './state.mjs'
const __dirname = p.dirname(fileURLToPath(import.meta.url))
function deviceToFFMPEG (device) {
const [host, ...rest] = device.split(':')
return { host, device: rest.join(':') }
}
export async function run (config = {}) {
const app = fastify()
const darkice = new Darkice()
const baresip = new Baresip()
const isOnline = new ConnectivityCheck()
const meter = new AlsaMeter({
input: deviceToFFMPEG(config.input)
})
meter.open().catch(console.error)
const actions = new Readable()
actions.setMaxListeners(1000)
const state = {
current: INITIAL_STATE,
dispatch: action => (state.current = reducer(state.current, action)),
}
actions.on('data', action => {
state.dispatch(action)
// console.log('action', action)
})
darkice.stream.on('data', action => actions.push(action))
baresip.stream.on('data', action => actions.push(action))
isOnline.on('data', action => actions.push(action))
meter.on('meter', data => actions.push(action(METER, data)))
const streams = { darkice, baresip }
// const state = new StateContainer(reducer, INITIAL_STATE)
// state.ingestActionStream(actions)
app.register(fastifyStatic, {
root: p.join(__dirname, '..', 'frontend', 'dist'),
prefix: '/'
})
app.register(fastifyCors, {
origin: '*'
})
app.register(fastifySSE)
app.get('/sse', async (req, reply) => {
console.log('sse req')
const stream = new Transform({
transform (action, cb) {
if (!this._id) this._id = 0
this.push({ data: JSON.stringify(action), id: ++this._id })
cb()
}
})
process.nextTick(() => {
stream.write(action(STATE_RESET, state.current))
stream.write(action(CONNECTION_STATE, { connected: true }))
actions.on('data', action => stream.write(action))
})
reply.sse(stream)
})
class InvalidIdError extends Error {
constructor (message, statusCode) {
super(message || 'Invalid Stream ID')
this.statusCode = statusCode || 400
}
}
app.post('/command', async (req, reply) => {
const { command, args } = req.body
const { id } = args
if (!streams[id]) throw new InvalidIdError()
const handler = streams[id]
switch (command) {
case 'start':
await handler.start()
break
case 'stop':
await handler.stop()
break
case 'restart':
await handler.restart()
break
}
return {
id,
error: false
}
})
app.setErrorHandler(function (error, request, reply) {
console.error('error', error)
if (!error || typeof error !== 'object') error = { message: error }
reply.code(error.statusCode || 500)
reply.send({ error: error.message || 'Internal server error' })
})
await app.listen(config.port, config.host)
console.log(`Server running at http://${config.host}:${config.port}/`)
}

1
backend/state.mjs Normal file
View file

@ -0,0 +1 @@
export * from '../common/state.mjs'