const connectSession = require('connect-redis') const rateLimit = require("express-rate-limit") const validemail = require('email-validator') const session = require('express-session') const bodyParser = require('body-parser') const nodemailer = require('nodemailer') const nunjucks = require('nunjucks') const passport = require('passport') const express = require('express') const request = require('request') const sqlite3 = require('sqlite3') const sqlite = require('sqlite') const xml2js = require('xml2js') const crypto = require('crypto') const WebSocket = require('ws') const redis = require('redis') const path = require('path') const toml = require('toml') const http = require('http') const URL = require('url').URL const fs = require('fs') require('express-async-errors') const SessionStore = connectSession(session) const util = require('util') const get = util.promisify(request.get) const dev = process.env.NODE_ENV === 'development' // Load Configuration const filename = path.join(__dirname, 'config.toml') let config const cache = { _updated: 0, streamers: {}, viewers: {}, live: [] } try { config = toml.parse(fs.readFileSync(filename)) } catch (e) { console.error(e) process.exit(1) } config = Object.assign({ Streaming: { port: '9322', database: 'streaming.db', streamServer: 'https://tv.icynet.eu/live/', serverHost: 'icynet.eu', publishAddress: 'rtmp://{host}:1935/hls-live/{streamer}', secret: 'changeme' }, Auth: { strategy: 'passport-oauth2', callbackURL: 'http://localhost:5000/auth/_callback/', clientID: '1', clientSecret: 'changeme' }, Email: { enabled: false, from: 'no-reply@icynet.eu', host: '', port: 587, secure: false, baseURL: 'http://localhost:9321/', auth: { user: '', pass: '', }, tls: { rejectUnauthorized: false, }, } }, config) // Constants const port = parseInt(config.Streaming.port, 10) const streamServer = config.Streaming.streamServer const streamServerHost = config.Streaming.serverHost const streamAppName = streamServer.match(/\/([\w-_]+)\/$/)[1] // Database const dbPromise = sqlite.open({ filename: path.join(process.cwd(), config.Streaming.database), driver: sqlite3.cached.Database }) // Setup server const app = express() const server = http.createServer(app) const wss = new WebSocket.Server({ clientTracking: false, noServer: true }) // Rate limits const emlLimiter = rateLimit({ windowMs: 1000 * 60 * 60, max: 16, message: 'Too many subscription attempts from this IP address. Try again in an hour.' }) // Authentication const Strategy = require(config.Auth.strategy) const strategyConfig = Object.assign({}, config.Auth) if (!strategyConfig.provider) strategyConfig.provider = strategyConfig.strategy.replace('passport-', '') passport.use(new Strategy(strategyConfig, function (accessToken, refreshToken, profile, done) { process.nextTick(function () { return done(null, profile) }) })) // Email let emailTransport; if (config.Email.enabled) { emailTransport = nodemailer.createTransport({ ...config.Email, pool: true, }) } const notifQueue = [] const notifHistory = {} function now() { return Math.floor(Date.now() / 1000) } function key() { return crypto.randomBytes(32).toString('hex').slice(0, 32) } async function sendEmailPush(channel) { if (!emailTransport) { return } // Don't re-send notifications within an hour if a channel happens to go live again if (notifHistory[channel] && notifHistory[channel] > now() - 3600) { return } notifHistory[channel] = now() const db = await dbPromise const data = await db.get('SELECT name FROM channels WHERE id = ?', channel) if (!data) { return; } const subs = await db.all('SELECT email,unsubkey FROM emailsub WHERE uuid = ? AND active = 1', channel); for (const sub of subs) { const unsubURL = config.Email.baseURL + 'unsubscribe/' + sub.unsubkey const watchURL = config.Email.baseURL + 'watch/' + data.name emailTransport.sendMail({ from: config.Email.from, to: sub.email, subject: `🔴 ${data.name} has gone LIVE on IcyTV!`, text: `${data.name} has gone LIVE on IcyTV!\nWatch now: ${watchURL}` + `\n\nUnsubscribe from ${data.name}: ${unsubURL}`, html: `

${data.name} has gone LIVE on IcyTV!

Watch now: ` + `${watchURL}` + `


Unsubscribe from ${data.name}: ` + `${unsubURL}

`, }).catch(e => console.error(e)) } } async function subscribeToChannel(channel, email) { if (!emailTransport) { return } const db = await dbPromise const data = await db.get('SELECT id FROM channels WHERE name = ?', channel) if (!data) { throw new Error('Invalid channel!') } const exists = await db.get('SELECT * FROM emailsub WHERE email = ? AND uuid = ?', [email, data.id]) if (exists) { throw new Error('A subscription already exists for this email address.') } // New verification email const activateKey = key() const unsubKey = key() const activateURL = config.Email.baseURL + 'email/' + activateKey await db.run('INSERT INTO emailsub (unsubkey, activatekey, email, uuid, active, created_at) VALUES ' + '(?, ?, ?, ?, 0, ?)', [unsubKey, activateKey, email, data.id, now()]) await emailTransport.sendMail({ from: config.Email.from, to: email, subject: `Confirm IcyTV subscription to channel ${channel}`, text: `Confirm your subscription\n\nClick here to subscribe to ${channel}: ${activateURL} ` + `\n\nIf you did not subscribe to ${channel} on IcyTV, please ignore this email ` + `\nand no further action is required on your part. If these emails persist, please ` + `\ncontact us via abuse@icynet.eu and we'll be sure to help you.`, html: `

Confirm your subscription

Click here to subscribe to ${channel}: ` + `${activateURL}` + `


If you did not subscribe to ${channel} on IcyTV, please ignore this email ` + `and no further action is required on your part. If these emails persist, please contact us via ` + `abuse@icynet.eu and we'll be sure to help you.

`, }) } async function unsubscribe(key) { const db = await dbPromise await db.run('DELETE FROM emailsub WHERE unsubkey = ?', key) } async function activateSubscription(key) { const db = await dbPromise await db.run('UPDATE emailsub SET active = 1 WHERE activatekey = ?', key) } passport.serializeUser(function (user, done) { done(null, user) }) passport.deserializeUser(function (user, done) { done(null, user) }) app.enable('trust proxy', 1) app.use(bodyParser.urlencoded({ extended: false })) app.use(bodyParser.json()) app.disable('x-powered-by') nunjucks.configure('templates', { autoescape: true, express: app }) const sessionParser = session({ key: 'Streamserver Session', secret: config.Streaming.secret, resave: false, saveUninitialized: true, store: new SessionStore({ client: redis.createClient() }), cookie: { secure: !dev, maxAge: 2678400000 // 1 month } }) app.use(sessionParser) app.use(passport.initialize()) app.use(passport.session()) app.use((req, res, next) => { if (!req.session.csrf) { req.session.csrf = key() } next() }) // Parse stream metrics from the stat.xml file async function pullMetrics (uuid) { const statPath = streamServer + 'stat' if (!cache.stats || cache._updated < Date.now() - 5000) { const { body } = await get(statPath) const rip = await xml2js.parseStringPromise(body) if (!rip.rtmp.server) throw new Error('Invalid response from server.') // Autofind the correct server const rtmpserver = rip.rtmp.server[0].application let rtmpapp for (const i in rtmpserver) { if (rtmpserver[i].name[0] !== streamAppName) continue rtmpapp = rtmpserver[i] } if (!rtmpapp) throw new Error('Invalid response from server.') cache.stats = rtmpapp.live cache._updated = Date.now() } // Extract applicable stream data let forUser for (const i in cache.stats) { if (!cache.stats[i].stream) continue if (cache.stats[i].stream[0].name[0] !== uuid) continue forUser = cache.stats[i].stream[0] } if (!forUser) return null // Generic data object const data = { time: forUser.time[0], bytes: forUser.bytes_in[0], video: null, audio: null } // Add video metadata, if applicable if (forUser.meta[0].video[0] !== '') { data.video = { width: forUser.meta[0].video[0].width[0], height: forUser.meta[0].video[0].height[0], frame_rate: forUser.meta[0].video[0].frame_rate[0], codec: forUser.meta[0].video[0].codec[0] } } // Add audio metadata, if applicable if (forUser.meta[0].audio[0] !== '') { data.audio = { sample_rate: forUser.meta[0].audio[0].sample_rate[0], channels: forUser.meta[0].audio[0].channels[0], codec: forUser.meta[0].audio[0].codec[0] } } return data } // Handle requests from nginx-rtmp-module app.post('/publish', async (req, res) => { if (!req.body.name) throw new Error('Invalid request.') const db = await dbPromise // Validate stream key const streamer = await db.get('SELECT * FROM channels WHERE key=?', req.body.name) if (!streamer) throw new Error('Invalid stream key.') console.log('=> Streamer %s has started streaming!', streamer.name) // Generate real publish address for the server const publishAddress = config.Streaming.publishAddress .replace('{streamer}', streamer.name) .replace('{host}', '127.0.0.1') // Set channel streaming status db.run('UPDATE channels SET live_at=? WHERE id=?', Date.now(), streamer.id) cache.live.push(streamer.name) // Send notifications if (!notifQueue.includes(streamer.id)) { notifQueue.push(streamer.id) } // Redirect the streaming server to the target res.set('Location', publishAddress) res.status(302) res.end() }) app.post('/publish_done', async (req, res) => { if (!req.body.name) throw new Error('Invalid request.') const db = await dbPromise const chan = await db.get('SELECT * FROM channels WHERE key = ?', req.body.name) console.log('<= Streamer %s has stopped streaming!', chan.name) try { delete cache.viewers[chan.name] } catch (e) {} if (cache.live.indexOf(chan.name) !== -1) cache.live.splice(cache.live.indexOf(chan.name), 1) db.run('UPDATE channels SET live_at=NULL, last_stream=? WHERE key=?', Date.now(), chan.key) res.send('OK') }) // Front-end server // OAuth2 authenticator app.get('/login', passport.authenticate(strategyConfig.provider, Object.assign({}, strategyConfig.authOptions || {}))) app.get('/auth/_callback', passport.authenticate(strategyConfig.provider, { failureRedirect: '/' }), async (req, res) => { dev && console.log(req.user.username, 'logged in') // Get user from database const db = await dbPromise const user = await db.get('SELECT * FROM signed_users WHERE uuid=?', req.user.uuid) if (!user) { await db.run('INSERT INTO signed_users (uuid,name) VALUES (?,?)', req.user.uuid, req.user.username) } // Lets see if this user is a streamer const streamer = await db.get('SELECT * FROM channels WHERE user_uuid = ?', req.user.uuid) if (streamer) cache.streamers[req.user.uuid] = streamer res.redirect('/') }) app.get('/logout', (req, res) => { req.logout() res.redirect('/') }) function authed (req, res, next) { if (req.isAuthenticated() && req.isStreamer) return next() res.jsonp({ error: 'Unauthorized' }) } // Views app.use('/dist', express.static(path.join(__dirname, 'dist'), { maxAge: dev ? 0 : 2678400000 })) app.use(async function (req, res, next) { req.isStreamer = false if (!req.isAuthenticated()) return next() res.locals.user = req.user if (!cache.streamers[req.user.uuid]) { const db = await dbPromise const streamer = await db.get('SELECT * FROM channels WHERE user_uuid = ?', req.user.uuid) if (streamer) cache.streamers[req.user.uuid] = streamer } if (cache.streamers[req.user.uuid]) { req.isStreamer = true return next() } next() }) // Index app.get('/', (req, res) => { res.render('index.html', { streamer: req.isStreamer }) }) // Dashboard app.get('/dashboard', authed, (req, res) => { const stream = cache.streamers[req.user.uuid] res.render('dashboard.html', { server: 'rtmp://' + streamServerHost + '/live/' }) }) // Stats app.get('/dashboard/stats', authed, async (req, res) => { const stream = cache.streamers[req.user.uuid] let data try { data = await pullMetrics(stream.key) } catch (e) { return res.jsonp({ error: e.message }) } if (!data) return res.jsonp({ error: 'No data was returned.' }) res.jsonp(data) }) // Data app.get('/dashboard/data', authed, async (req, res) => { const stream = cache.streamers[req.user.uuid] let data const db = await dbPromise try { data = await db.get('SELECT * FROM channels WHERE key=?', stream.key) } catch (e) { return res.jsonp({ error: 'Unauthorized' }) } if (!data) return res.jsonp({ error: 'Unauthorized' }) res.jsonp({ name: data.name, key: stream.key, uuid: req.user.uuid, live: data.live_at != null, live_at: new Date(parseInt(data.live_at, 10)), last_stream: new Date(parseInt(data.last_stream, 10)) }) }) // Get links app.get('/dashboard/link', authed, async (req, res) => { const user = req.user.uuid const db = await dbPromise const links = await db.all('SELECT * FROM link WHERE uuid = ?', user) res.jsonp(links) }) // Add link URL app.post('/dashboard/link', authed, async (req, res) => { const user = req.user.uuid const name = req.body.name const url = req.body.url if (name == null || url == null) return res.jsonp({ error: 'Missing parameters!' }) if (name.length > 120) return res.jsonp({ error: 'Only 120 characters are allowed in the name.' }) if (name.length < 3) return res.jsonp({ error: 'Minimum name length is 3 characters.' }) if (name.indexOf('<') !== -1 || name.indexOf('>') !== -1 || url.indexOf('<') !== -1 || url.indexOf('>') !== -1) return res.jsonp({ error: 'HTML tags are forbidden!' }) // Validate URL const a = new URL(url) if (a.protocol === '' || a.host === '') return res.jsonp({ error: 'Invalid URL!' }) // Checks const db = await dbPromise const links = await db.all('SELECT * FROM link WHERE uuid = ?', user) if (links.length > 10) return res.jsonp({ error: 'You can currently only add up to 10 links!' }) const link = await db.get('SELECT * FROM link WHERE url = ? AND uuid = ?', url, user) if (link) return res.jsonp({ error: 'This URL already exists!' }) // Add await db.run('INSERT INTO link (name,url,uuid) VALUES (?,?,?)', name, url, user) res.jsonp({ success: true }) }) // Remove link URL app.post('/dashboard/link/delete', authed, async (req, res) => { const user = req.user.uuid if (req.body.name == null && req.body.url == null) return res.jsonp({ error: 'Missing parameters!' }) // Check const db = await dbPromise let link = await db.get('SELECT * FROM link WHERE url = ? AND uuid = ?', req.body.url, user) if (!link) { link = await db.get('SELECT * FROM link WHERE name = ? AND uuid = ?', req.body.name, user) } if (!link) return res.jsonp({ error: 'Invalid link parameter!' }) // Delete await db.run('DELETE FROM link WHERE id = ?', link.id) res.jsonp({ success: true }) }) // Player app.get('/watch/:name', (req, res) => { res.render('player.html', { name: req.params.name, server: streamServer, csrf: req.session.csrf, email: emailTransport != null, }) }) app.get('/player/:name', (req, res) => { res.redirect('/watch/' + req.params.name) }) // Public data app.get('/api/channel/:name', async (req, res) => { const name = req.params.name const db = await dbPromise const data = await db.get('SELECT user_uuid,name,live_at,last_stream,chat_channel FROM channels WHERE name=?', name) if (!data) return res.jsonp({ error: 'No such channel!' }) const links = await db.all('SELECT name,url FROM link WHERE uuid = ?', data.user_uuid) delete data.user_uuid data.live = data.live_at != null data.live_at = new Date(parseInt(data.live_at, 10)) data.last_stream = new Date(parseInt(data.last_stream, 10)) data.links = links || [] data.viewers = Object.keys(cache.viewers[name] || {}).length data.source = streamServer + name + '.m3u8' res.jsonp(data) }) app.post('/api/email/:channel', emlLimiter, async (req, res) => { if (!emailTransport) { return res.json({ message: 'Email transport is disabled.' }) } const csrf = req.body.csrf if (!csrf || !req.session.csrf || req.session.csrf !== csrf) { return res.status(400).json({error: true, message: 'Illegal request!'}) } const email = req.body.email if (!email || !validemail.validate(email)) { return res.status(400).json({error: true, message: 'Invalid email address!'}) } try { await subscribeToChannel(req.params.channel, email) } catch (e) { return res.status(400).json({error: true, message: e.message}) } res.json({ message: 'Confirmation email has been sent!' }) }) app.get('/email/:key', async (req, res) => { if (!emailTransport) { return res.redirect('/?activated=false') } await activateSubscription(req.params.key) res.redirect('/?activated=true') }) app.get('/unsubscribe/:key', async (req, res) => { if (!emailTransport) { return res.redirect('/?unsubscribe=false') } await unsubscribe(req.params.key) res.redirect('/?unsubscribe=true') }) // Error handler app.use((error, req, res, next) => { if (dev) console.error(error.stack) res.send(error.message) }) // Socket Server wss.on('connection', (ws, request, client) => { let userId = request.session.id let username = 'A Friendly Guest' const myChannels = [] if (request.user) { userId = request.user.uuid username = request.user.username } dev && console.log(userId, 'connected') ws.on('message', (msg) => { dev && console.log(userId, 'said', msg) const is = msg.toString().trim().split(' ') const chan = is[1] if (!chan) return switch (is[0]) { case 'watch': dev && console.log('adding watcher', userId, 'to channel', chan) if (cache.live.indexOf(chan) !== -1) { if (!cache.viewers[chan]) cache.viewers[chan] = {} cache.viewers[chan][userId] = username if (myChannels.indexOf(chan) === -1) myChannels.push(chan) } break case 'stop': dev && console.log('removing watcher', userId, 'from channel', chan) if (cache.live.indexOf(chan) !== -1) { if (cache.viewers[chan] && cache.viewers[chan][userId]) delete cache.viewers[chan][userId] if (myChannels.indexOf(chan) !== -1) myChannels.splice(myChannels.indexOf(chan), 1) } break case 'viewers': if (cache.viewers[chan] != null) ws.send('viewlist ' + Object.values(cache.viewers[chan]).join(',')) break } }) ws.on('close', () => { dev && console.log(userId, 'disconnected') for (const i in myChannels) { const chan = myChannels[i] const viewers = cache.viewers[chan] if (viewers && viewers[userId]) delete cache.viewers[chan][userId] } }) ws.on('error', (e) => { dev && console.error('Socket error:', e) }) }) // Handle upgrade, parse included session server.on('upgrade', (request, socket, head) => { sessionParser(request, {}, () => { if (!request.session || !request.session.id) return socket.destroy() if (request.session && request.session.passport) { request.user = request.session.passport.user } wss.handleUpgrade(request, socket, head, function (ws) { wss.emit('connection', ws, request) }) }) }) // Stream start notifications pump function notify() { const channel = notifQueue.pop() if (channel) { sendEmailPush(channel).catch(e => console.error(e)) } setTimeout(notify, notifQueue.length ? 1000 : 5000) } // Start server const host = dev ? '0.0.0.0' : '127.0.0.1' server.listen(port, host, () => { // Get currently live channels, for example, when server restarted while someone was live (async function () { const db = await dbPromise await db.migrate() const allLive = await db.all('SELECT name FROM channels WHERE live_at IS NOT NULL') for (const i in allLive) { cache.live.push(allLive[i].name) } console.log(`=> Found ${cache.live.length} channels still live`) })().catch(e => console.error(e.stack)) console.log('Listening on %s:%d', host, port) console.log('Authentication module: %s (%s)', strategyConfig.strategy, strategyConfig.provider) notify() })