const connectSession = require('connect-redis') const session = require('express-session') const bodyParser = require('body-parser') const express = require('express') const request = require('request') const nunjucks = require('nunjucks') const sqlite = require('sqlite') const xml2js = require('xml2js') const path = require('path') const toml = require('toml') const http = require('http') const fs = require('fs') const WebSocket = require('ws') const uuid = require('uuid/v4') const redis = require('redis') const URL = require('url') require('express-async-errors') const SessionStore = connectSession(session) const util = require('util') const get = util.promisify(request.get) const post = util.promisify(request.post) const dev = process.env.NODE_ENV === 'development' // Load Configuration const filename = path.join(__dirname, 'config.toml') let config let cache = { _updated: 0, streamers: {}, viewers: {}, live: [] } try { config = toml.parse(fs.readFileSync(filename)) } catch (e) { console.error(e) process.exit(1) } config = Object.assign({ 'Streaming': { 'Port': '9322', 'Database': 'streaming.db', 'StreamServer': 'https://tv.icynet.eu/live/', 'ServerHost': 'icynet.eu', 'PublishAddress': 'rtmp://{host}:1935/hls-live/{streamer}', 'Secret': 'changeme' }, 'Auth': { 'Server': 'http://localhost:8282', 'Redirect': 'http://localhost:5000/auth/_callback/' }, 'OAuth2': { 'ClientID': '1', 'ClientSecret': 'changeme' } }, config) // Constants const oauthAuth = '{server}/oauth2/authorize?response_type=code&state={state}&redirect_uri={redirect}&client_id={client}&scope=image' const port = parseInt(config['Streaming']['Port']) const streamServer = config['Streaming']['StreamServer'] const streamServerHost = config['Streaming']['ServerHost'] const authServer = config['Auth']['Server'] const oauthRedirect = config['Auth']['Redirect'] const oauthId = config['OAuth2']['ClientID'].toString() const oauthSecret = config['OAuth2']['ClientSecret'] const streamAppName = streamServer.match(/\/([\w-_]+)\/$/)[1] function teval (str, obj) { let res = str + '' for (let key in obj) { if (res.indexOf('{' + key + '}') === -1) continue res = res.replace('{' + key + '}', obj[key]) } return res } // Database const dbPromise = Promise.resolve() .then(() => sqlite.open(path.join(process.cwd(), config['Streaming']['Database']), { Promise, cache: true })) .then(db => db.migrate()) // Setup server const app = express() const server = http.createServer(app) const wss = new WebSocket.Server({ clientTracking: false, noServer: true }) app.enable('trust proxy', 1) app.use(bodyParser.urlencoded({ extended: false })) app.use(bodyParser.json()) app.disable('x-powered-by') nunjucks.configure('templates', { autoescape: true, express: app }) const sessionParser = session({ key: 'Streamserver Session', secret: config['Streaming']['Secret'], resave: false, saveUninitialized: true, store: new SessionStore({ client: redis.createClient() }), cookie: { secure: !dev, maxAge: 2678400000 // 1 month } }) app.use(sessionParser) // Parse stream metrics from the stat.xml file async function pullMetrics (uuid) { let statPath = streamServer + 'stat' if (!cache.stats || cache._updated < Date.now() - 5000) { let { body } = await get(statPath) let rip = await xml2js.parseStringPromise(body) if (!rip.rtmp.server) throw new Error('Invalid response from server.') // Autofind the correct server let rtmpserver = rip.rtmp.server[0].application let rtmpapp for (let i in rtmpserver) { if (rtmpserver[i].name[0] !== streamAppName) continue rtmpapp = rtmpserver[i] } if (!rtmpapp) throw new Error('Invalid response from server.') cache.stats = rtmpapp.live cache._updated = Date.now() } // Extract applicable stream data let forUser for (let i in cache.stats) { if (!cache.stats[i].stream) continue if (cache.stats[i].stream[0].name[0] !== uuid) continue forUser = cache.stats[i].stream[0] } if (!forUser) return null // Generic data object let data = { time: forUser.time[0], bytes: forUser.bytes_in[0], video: null, audio: null } // Add video metadata, if applicable if (forUser.meta[0].video[0] !== '') { data['video'] = { width: forUser.meta[0].video[0].width[0], height: forUser.meta[0].video[0].height[0], frame_rate: forUser.meta[0].video[0].frame_rate[0], codec: forUser.meta[0].video[0].codec[0] } } // Add audio metadata, if applicable if (forUser.meta[0].audio[0] !== '') { data['audio'] = { sample_rate: forUser.meta[0].audio[0].sample_rate[0], channels: forUser.meta[0].audio[0].channels[0], codec: forUser.meta[0].audio[0].codec[0] } } return data } // Handle requests from nginx-rtmp-module app.post('/publish', async (req, res) => { if (!req.body.name) throw new Error('Invalid request.') let db = await dbPromise // Validate stream key let streamer = await db.get('SELECT * FROM channels WHERE key=?', req.body.name) if (!streamer) throw new Error('Invalid stream key.') console.log('=> Streamer %s has started streaming!', streamer.name) // Generate real publish address for the server let publishAddress = config['Streaming']['PublishAddress'] .replace('{streamer}', streamer.name) .replace('{host}', '127.0.0.1') // Set channel streaming status db.run('UPDATE channels SET live_at=? WHERE id=?', Date.now(), streamer.id) cache.live.push(streamer.name) // Redirect the streaming server to the target res.set('Location', publishAddress) res.status(302) res.end() }) app.post('/publish_done', async (req, res) => { if (!req.body.name) throw new Error('Invalid request.') let db = await dbPromise let chan = await db.get('SELECT * FROM channels WHERE key = ?', req.body.name) console.log('<= Streamer %s has stopped streaming!', chan.name) try { delete cache.viewers[chan.name] } catch (e) {} if (cache.live.indexOf(chan.name) !== -1) cache.live.splice(cache.live.indexOf(chan.name), 1) db.run('UPDATE channels SET live_at=NULL, last_stream=? WHERE key=?', Date.now(), chan.key) res.send('OK') }) // Front-end server // OAuth2 authenticator app.get('/login', async (req, res) => { if (req.session.user) return res.redirect('/') req.session.state = uuid() res.redirect(teval(oauthAuth, { state: req.session.state, redirect: oauthRedirect, client: oauthId, server: authServer })) }) app.get('/auth/_callback', async (req, res) => { let state = req.session.state if (!state) throw new Error('Something went wrong!') let code = req.query.code let provState = req.query.state if (!code || state !== provState) throw new Error('Something went wrong!') delete req.session.state // Aquire token let { body } = await post(authServer + '/oauth2/token', { form: { grant_type: 'authorization_code', code: code, redirect_uri: oauthRedirect, client_id: oauthId, client_secret: oauthSecret }, auth: { user: oauthId, pass: oauthSecret } }) if (!body) throw new Error('Could not obtain access token!') try { body = JSON.parse(body) } catch (e) { console.error(e, body) throw new Error('Authorization server gave us an invalid response!') } if (body['error']) { throw new Error(body['error'] + ': ' + body['error_description']) } let token = body.access_token // Get user information let { body: bodyNew } = await get(authServer + '/oauth2/user', { auth: { bearer: token } }) try { bodyNew = JSON.parse(bodyNew) } catch (e) { console.error(e, bodyNew) throw new Error('Authorization server gave us an invalid response for user!') } // Get user from database let db = await dbPromise let user = await db.get('SELECT * FROM signed_users WHERE uuid=?', bodyNew.uuid) if (!user) { await db.run('INSERT INTO signed_users (uuid,name) VALUES (?,?)', bodyNew.uuid, bodyNew.username) } req.session.login = bodyNew.uuid req.session.username = bodyNew.username // Lets see if this user is a streamer let streamer = await db.get('SELECT * FROM channels WHERE user_uuid = ?', bodyNew.uuid) if (streamer) cache.streamers[bodyNew.uuid] = streamer res.redirect('/') }) app.get('/logout', (req, res) => { req.session.destroy() res.redirect('/') }) // Views app.use('/dist', express.static(path.join(__dirname, 'dist'), { maxAge: dev ? 0 : 2678400000 })) app.use(async function (req, res, next) { req.isStreamer = false if (!req.session.login) return next() res.locals.session = { uuid: req.session.login, username: req.session.username } if (!cache.streamers[req.session.login]) { let db = await dbPromise let streamer = await db.get('SELECT * FROM channels WHERE user_uuid = ?', req.session.login) if (streamer) cache.streamers[req.session.login] = streamer } if (cache.streamers[req.session.login]) { req.isStreamer = true return next() } next() }) // Index app.get('/', (req, res) => { res.render('index.html', { streamer: req.isStreamer }) }) // Dashboard app.get('/dashboard', (req, res, next) => { if (!req.isStreamer) return next(new Error('Unauthorized')) let stream = cache.streamers[req.session.login] res.render('dashboard.html', { stream: stream.key, server: 'rtmp://' + streamServerHost + '/live/' }) }) // Stats app.get('/dashboard/stats', async (req, res) => { if (!req.isStreamer) return res.jsonp({ error: 'Unauthorized' }) let stream = cache.streamers[req.session.login] let data try { data = await pullMetrics(stream.key) } catch (e) { return res.jsonp({ error: e.message }) } if (!data) return res.jsonp({ error: 'No data was returned.' }) res.jsonp(data) }) // Data app.get('/dashboard/data', async (req, res) => { if (!req.isStreamer) return res.jsonp({ error: 'Unauthorized' }) let stream = cache.streamers[req.session.login] let data let db = await dbPromise try { data = await db.get('SELECT * FROM channels WHERE key=?', stream.key) } catch (e) { return res.jsonp({ error: 'Unauthorized' }) } if (!data) return res.jsonp({ error: 'Unauthorized' }) res.jsonp({ 'name': data.name, 'key': stream.key, 'uuid': req.session.login, 'live': data.live_at != null, 'live_at': new Date(parseInt(data.live_at)), 'last_stream': new Date(parseInt(data.last_stream)) }) }) // Get links app.get('/dashboard/link', async (req, res) => { if (!req.isStreamer) return res.jsonp({ error: 'Unauthorized' }) let user = req.session.login let db = await dbPromise let links = await db.all('SELECT * FROM link WHERE uuid = ?', user) res.jsonp(links) }) // Add link URL app.post('/dashboard/link', async (req, res) => { if (!req.isStreamer) return res.jsonp({ error: 'Unauthorized' }) let user = req.session.login let name = req.body.name let url = req.body.url if (name == null || url == null) return res.jsonp({ error: 'Missing parameters!' }) if (name.length > 120) return res.jsonp({ error: 'Only 120 characters are allowed in the name.' }) if (name.indexOf('<') !== -1 || name.indexOf('>') !== -1) return res.jsonp({ error: 'HTML tags are forbidden!' }) // Validate URL try { URL.parse(url) } catch (e) { return res.jsonp({ error: 'Invalid URL!' }) } // Checks let db = await dbPromise let links = await db.all('SELECT * FROM link WHERE uuid = ?', user) if (links.length > 10) return res.jsonp({ error: 'You can currently only add up to 10 links!' }) let link = await db.get('SELECT * FROM link WHERE url = ? AND uuid = ?', url, user) if (link) return res.jsonp({ error: 'This URL already exists!' }) // Add await db.run('INSERT INTO link (name,url,uuid) VALUES (?,?,?)', name, url, user) res.jsonp({ success: true }) }) // Remove link URL app.post('/dashboard/link/delete', async (req, res) => { if (!req.isStreamer) return res.jsonp({ error: 'Unauthorized' }) let user = req.session.login if (req.body.name == null && req.body.url == null) return res.jsonp({ error: 'Missing parameters!' }) // Check let db = await dbPromise let link = await db.get('SELECT * FROM link WHERE url = ? AND uuid = ?', req.body.url, user) if (!link) { link = await db.get('SELECT * FROM link WHERE name = ? AND uuid = ?', req.body.name, user) } if (!link) return res.jsonp({ error: 'Invalid link parameter!' }) // Delete await db.run('DELETE FROM link WHERE id = ?', link.id) res.jsonp({ success: true }) }) // Player app.get('/watch/:name', (req, res) => { res.render('player.html', { name: req.params.name, server: streamServer }) }) app.get('/player/:name', (req, res) => { res.redirect('/watch/' + req.params.name) }) // Public data app.get('/api/channel/:name', async (req, res) => { let name = req.params.name let db = await dbPromise let data = await db.get('SELECT user_uuid,name,live_at,last_stream FROM channels WHERE name=?', name) if (!data) return res.jsonp({ error: 'No such channel!' }) let links = await db.all('SELECT * FROM link WHERE uuid = ?', data.user_uuid) data.live = data.live_at != null data.live_at = new Date(parseInt(data.live_at)) data.last_stream = new Date(parseInt(data.last_stream)) data.links = links || [] data.viewers = Object.keys(cache.viewers[name] || {}).length for (let i in data.links) { delete data.links[i].id delete data.links[i].uuid } res.jsonp(data) }) // Error handler app.use((error, req, res, next) => { if (dev) console.error(error.stack) res.send(error.message) }) // Socket Server wss.on('connection', (ws, request, client) => { const userId = request.session.login || request.session.id const username = request.session.username dev && console.log(username || userId,'connected') ws.on('message', (msg) => { dev && console.log(userId,'said',msg) if (msg.indexOf('watch ') === 0) { let chan = msg.substring(6) dev && console.log('adding a watcher to channel',chan) if (cache.live.indexOf(chan) !== -1) { if (!cache.viewers[chan]) cache.viewers[chan] = {} cache.viewers[chan][userId] = username || 'A Friendly Guest' } } else if (msg.indexOf('stop ') === 0) { let chan = msg.substring(5) dev && console.log('removing a watcher from channel',chan) if (cache.live.indexOf(chan) !== -1) { if (cache.viewers[chan] && cache.viewers[chan][userId]) delete cache.viewers[chan][userId] } } else if (msg.indexOf('viewers ') === 0) { let chan = msg.substring(8) if (cache.viewers[chan] != null) ws.send('viewlist ' + Object.values(cache.viewers[chan]).join(',')) } }) ws.on('close', () => { dev && console.log(userId,'disconnected') for (let chan in cache.viewers) { let viewers = cache.viewers[chan] if (viewers[userId]) { delete cache.viewers[chan][userId] } } }) ws.on('error', (e) => { dev && console.error('Socket error:', e) }) }) // Handle upgrade, parse included session server.on('upgrade', (request, socket, head) => { sessionParser(request, {}, () => { wss.handleUpgrade(request, socket, head, function(ws) { wss.emit('connection', ws, request) }) }) }) // Start server const host = dev ? '0.0.0.0' : '127.0.0.1' server.listen(port, host, () => { // Get currently live channels, for example, when server restarted while someone was live (async function () { let db = await dbPromise let allLive = await db.all('SELECT name FROM channels WHERE live_at IS NOT NULL') for (let i in allLive) { cache.live.push(allLive[i].name) } console.log(`=> Found ${cache.live.length} channels still live`) })().catch(e => console.error(e.stack)) console.log('Listening on %s:%d', host, port) })