541 lines
16 KiB
JavaScript
541 lines
16 KiB
JavaScript
const connectSession = require('connect-redis')
|
|
const session = require('express-session')
|
|
const bodyParser = require('body-parser')
|
|
const express = require('express')
|
|
const request = require('request')
|
|
const nunjucks = require('nunjucks')
|
|
const sqlite = require('sqlite')
|
|
const xml2js = require('xml2js')
|
|
const path = require('path')
|
|
const toml = require('toml')
|
|
const http = require('http')
|
|
const fs = require('fs')
|
|
const WebSocket = require('ws')
|
|
const uuid = require('uuid/v4')
|
|
const redis = require('redis')
|
|
const URL = require('url')
|
|
|
|
require('express-async-errors')
|
|
|
|
const SessionStore = connectSession(session)
|
|
|
|
const util = require('util')
|
|
const get = util.promisify(request.get)
|
|
const post = util.promisify(request.post)
|
|
|
|
const dev = process.env.NODE_ENV === 'development'
|
|
|
|
// Load Configuration
|
|
const filename = path.join(__dirname, 'config.toml')
|
|
|
|
let config
|
|
let cache = { _updated: 0, streamers: {}, viewers: {}, live: [] }
|
|
|
|
try {
|
|
config = toml.parse(fs.readFileSync(filename))
|
|
} catch (e) {
|
|
console.error(e)
|
|
process.exit(1)
|
|
}
|
|
|
|
config = Object.assign({
|
|
'Streaming': {
|
|
'Port': '9322',
|
|
'Database': 'streaming.db',
|
|
'StreamServer': 'https://tv.icynet.eu/live/',
|
|
'ServerHost': 'icynet.eu',
|
|
'PublishAddress': 'rtmp://{host}:1935/hls-live/{streamer}',
|
|
'Secret': 'changeme'
|
|
},
|
|
'Auth': {
|
|
'Server': 'http://localhost:8282',
|
|
'Redirect': 'http://localhost:5000/auth/_callback/'
|
|
},
|
|
'OAuth2': {
|
|
'ClientID': '1',
|
|
'ClientSecret': 'changeme'
|
|
}
|
|
}, config)
|
|
|
|
// Constants
|
|
const oauthAuth = '{server}/oauth2/authorize?response_type=code&state={state}&redirect_uri={redirect}&client_id={client}&scope=image'
|
|
const port = parseInt(config['Streaming']['Port'])
|
|
const streamServer = config['Streaming']['StreamServer']
|
|
const streamServerHost = config['Streaming']['ServerHost']
|
|
const authServer = config['Auth']['Server']
|
|
const oauthRedirect = config['Auth']['Redirect']
|
|
const oauthId = config['OAuth2']['ClientID'].toString()
|
|
const oauthSecret = config['OAuth2']['ClientSecret']
|
|
const streamAppName = streamServer.match(/\/([\w-_]+)\/$/)[1]
|
|
|
|
function teval (str, obj) {
|
|
let res = str + ''
|
|
for (let key in obj) {
|
|
if (res.indexOf('{' + key + '}') === -1) continue
|
|
res = res.replace('{' + key + '}', obj[key])
|
|
}
|
|
return res
|
|
}
|
|
|
|
// Database
|
|
const dbPromise = Promise.resolve()
|
|
.then(() => sqlite.open(path.join(process.cwd(), config['Streaming']['Database']), { Promise, cache: true }))
|
|
.then(db => db.migrate())
|
|
|
|
// Setup server
|
|
const app = express()
|
|
const server = http.createServer(app)
|
|
const wss = new WebSocket.Server({ clientTracking: false, noServer: true })
|
|
|
|
app.enable('trust proxy', 1)
|
|
|
|
app.use(bodyParser.urlencoded({ extended: false }))
|
|
app.use(bodyParser.json())
|
|
|
|
app.disable('x-powered-by')
|
|
|
|
nunjucks.configure('templates', {
|
|
autoescape: true,
|
|
express: app
|
|
})
|
|
|
|
const sessionParser = session({
|
|
key: 'Streamserver Session',
|
|
secret: config['Streaming']['Secret'],
|
|
resave: false,
|
|
saveUninitialized: true,
|
|
store: new SessionStore({ client: redis.createClient() }),
|
|
cookie: {
|
|
secure: !dev,
|
|
maxAge: 2678400000 // 1 month
|
|
}
|
|
})
|
|
|
|
app.use(sessionParser)
|
|
|
|
// Parse stream metrics from the stat.xml file
|
|
async function pullMetrics (uuid) {
|
|
let statPath = streamServer + 'stat'
|
|
if (!cache.stats || cache._updated < Date.now() - 5000) {
|
|
let { body } = await get(statPath)
|
|
let rip = await xml2js.parseStringPromise(body)
|
|
if (!rip.rtmp.server) throw new Error('Invalid response from server.')
|
|
|
|
// Autofind the correct server
|
|
let rtmpserver = rip.rtmp.server[0].application
|
|
let rtmpapp
|
|
for (let i in rtmpserver) {
|
|
if (rtmpserver[i].name[0] !== streamAppName) continue
|
|
rtmpapp = rtmpserver[i]
|
|
}
|
|
|
|
if (!rtmpapp) throw new Error('Invalid response from server.')
|
|
|
|
cache.stats = rtmpapp.live
|
|
cache._updated = Date.now()
|
|
}
|
|
|
|
// Extract applicable stream data
|
|
let forUser
|
|
for (let i in cache.stats) {
|
|
if (!cache.stats[i].stream) continue
|
|
if (cache.stats[i].stream[0].name[0] !== uuid) continue
|
|
forUser = cache.stats[i].stream[0]
|
|
}
|
|
|
|
if (!forUser) return null
|
|
|
|
// Generic data object
|
|
let data = {
|
|
time: forUser.time[0],
|
|
bytes: forUser.bytes_in[0],
|
|
video: null,
|
|
audio: null
|
|
}
|
|
|
|
// Add video metadata, if applicable
|
|
if (forUser.meta[0].video[0] !== '') {
|
|
data['video'] = {
|
|
width: forUser.meta[0].video[0].width[0],
|
|
height: forUser.meta[0].video[0].height[0],
|
|
frame_rate: forUser.meta[0].video[0].frame_rate[0],
|
|
codec: forUser.meta[0].video[0].codec[0]
|
|
}
|
|
}
|
|
|
|
// Add audio metadata, if applicable
|
|
if (forUser.meta[0].audio[0] !== '') {
|
|
data['audio'] = {
|
|
sample_rate: forUser.meta[0].audio[0].sample_rate[0],
|
|
channels: forUser.meta[0].audio[0].channels[0],
|
|
codec: forUser.meta[0].audio[0].codec[0]
|
|
}
|
|
}
|
|
|
|
return data
|
|
}
|
|
|
|
// Handle requests from nginx-rtmp-module
|
|
|
|
app.post('/publish', async (req, res) => {
|
|
if (!req.body.name) throw new Error('Invalid request.')
|
|
let db = await dbPromise
|
|
|
|
// Validate stream key
|
|
let streamer = await db.get('SELECT * FROM channels WHERE key=?', req.body.name)
|
|
|
|
if (!streamer) throw new Error('Invalid stream key.')
|
|
console.log('=> Streamer %s has started streaming!', streamer.name)
|
|
|
|
// Generate real publish address for the server
|
|
let publishAddress = config['Streaming']['PublishAddress']
|
|
.replace('{streamer}', streamer.name)
|
|
.replace('{host}', '127.0.0.1')
|
|
|
|
// Set channel streaming status
|
|
db.run('UPDATE channels SET live_at=? WHERE id=?', Date.now(), streamer.id)
|
|
cache.live.push(streamer.name)
|
|
|
|
// Redirect the streaming server to the target
|
|
res.set('Location', publishAddress)
|
|
res.status(302)
|
|
res.end()
|
|
})
|
|
|
|
app.post('/publish_done', async (req, res) => {
|
|
if (!req.body.name) throw new Error('Invalid request.')
|
|
|
|
let db = await dbPromise
|
|
let chan = await db.get('SELECT * FROM channels WHERE key = ?', req.body.name)
|
|
console.log('<= Streamer %s has stopped streaming!', chan.name)
|
|
|
|
try { delete cache.viewers[chan.name] } catch (e) {}
|
|
if (cache.live.indexOf(chan.name) !== -1) cache.live.splice(cache.live.indexOf(chan.name), 1)
|
|
|
|
db.run('UPDATE channels SET live_at=NULL, last_stream=? WHERE key=?', Date.now(), chan.key)
|
|
|
|
res.send('OK')
|
|
})
|
|
|
|
// Front-end server
|
|
// OAuth2 authenticator
|
|
app.get('/login', async (req, res) => {
|
|
if (req.session.user) return res.redirect('/')
|
|
req.session.state = uuid()
|
|
res.redirect(teval(oauthAuth, { state: req.session.state, redirect: oauthRedirect, client: oauthId, server: authServer }))
|
|
})
|
|
|
|
app.get('/auth/_callback', async (req, res) => {
|
|
let state = req.session.state
|
|
if (!state) throw new Error('Something went wrong!')
|
|
|
|
let code = req.query.code
|
|
let provState = req.query.state
|
|
if (!code || state !== provState) throw new Error('Something went wrong!')
|
|
delete req.session.state
|
|
|
|
// Aquire token
|
|
let { body } = await post(authServer + '/oauth2/token', {
|
|
form: {
|
|
grant_type: 'authorization_code',
|
|
code: code,
|
|
redirect_uri: oauthRedirect,
|
|
client_id: oauthId,
|
|
client_secret: oauthSecret
|
|
},
|
|
auth: {
|
|
user: oauthId,
|
|
pass: oauthSecret
|
|
}
|
|
})
|
|
|
|
if (!body) throw new Error('Could not obtain access token!')
|
|
try {
|
|
body = JSON.parse(body)
|
|
} catch (e) {
|
|
console.error(e, body)
|
|
throw new Error('Authorization server gave us an invalid response!')
|
|
}
|
|
|
|
if (body['error']) {
|
|
throw new Error(body['error'] + ': ' + body['error_description'])
|
|
}
|
|
|
|
let token = body.access_token
|
|
|
|
// Get user information
|
|
let { body: bodyNew } = await get(authServer + '/oauth2/user', { auth: { bearer: token } })
|
|
try {
|
|
bodyNew = JSON.parse(bodyNew)
|
|
} catch (e) {
|
|
console.error(e, bodyNew)
|
|
throw new Error('Authorization server gave us an invalid response for user!')
|
|
}
|
|
|
|
// Get user from database
|
|
let db = await dbPromise
|
|
let user = await db.get('SELECT * FROM signed_users WHERE uuid=?', bodyNew.uuid)
|
|
if (!user) {
|
|
await db.run('INSERT INTO signed_users (uuid,name) VALUES (?,?)', bodyNew.uuid, bodyNew.username)
|
|
}
|
|
|
|
req.session.login = bodyNew.uuid
|
|
req.session.username = bodyNew.username
|
|
|
|
// Lets see if this user is a streamer
|
|
let streamer = await db.get('SELECT * FROM channels WHERE user_uuid = ?', bodyNew.uuid)
|
|
if (streamer) cache.streamers[bodyNew.uuid] = streamer
|
|
|
|
res.redirect('/')
|
|
})
|
|
|
|
app.get('/logout', (req, res) => {
|
|
req.session.destroy()
|
|
res.redirect('/')
|
|
})
|
|
|
|
// Views
|
|
|
|
app.use('/dist', express.static(path.join(__dirname, 'dist'), { maxAge: dev ? 0 : 2678400000 }))
|
|
app.use(async function (req, res, next) {
|
|
req.isStreamer = false
|
|
if (!req.session.login) return next()
|
|
|
|
res.locals.session = { uuid: req.session.login, username: req.session.username }
|
|
|
|
if (!cache.streamers[req.session.login]) {
|
|
let db = await dbPromise
|
|
let streamer = await db.get('SELECT * FROM channels WHERE user_uuid = ?', req.session.login)
|
|
if (streamer) cache.streamers[req.session.login] = streamer
|
|
}
|
|
|
|
if (cache.streamers[req.session.login]) {
|
|
req.isStreamer = true
|
|
return next()
|
|
}
|
|
|
|
next()
|
|
})
|
|
|
|
// Index
|
|
app.get('/', (req, res) => {
|
|
res.render('index.html', { streamer: req.isStreamer })
|
|
})
|
|
|
|
// Dashboard
|
|
app.get('/dashboard', (req, res, next) => {
|
|
if (!req.isStreamer) return next(new Error('Unauthorized'))
|
|
let stream = cache.streamers[req.session.login]
|
|
res.render('dashboard.html', { stream: stream.key, server: 'rtmp://' + streamServerHost + '/live/' })
|
|
})
|
|
|
|
// Stats
|
|
app.get('/dashboard/stats', async (req, res) => {
|
|
if (!req.isStreamer) return res.jsonp({ error: 'Unauthorized' })
|
|
let stream = cache.streamers[req.session.login]
|
|
let data
|
|
|
|
try {
|
|
data = await pullMetrics(stream.key)
|
|
} catch (e) {
|
|
return res.jsonp({ error: e.message })
|
|
}
|
|
|
|
if (!data) return res.jsonp({ error: 'No data was returned.' })
|
|
res.jsonp(data)
|
|
})
|
|
|
|
// Data
|
|
app.get('/dashboard/data', async (req, res) => {
|
|
if (!req.isStreamer) return res.jsonp({ error: 'Unauthorized' })
|
|
let stream = cache.streamers[req.session.login]
|
|
let data
|
|
|
|
let db = await dbPromise
|
|
|
|
try {
|
|
data = await db.get('SELECT * FROM channels WHERE key=?', stream.key)
|
|
} catch (e) {
|
|
return res.jsonp({ error: 'Unauthorized' })
|
|
}
|
|
|
|
if (!data) return res.jsonp({ error: 'Unauthorized' })
|
|
|
|
res.jsonp({
|
|
'name': data.name,
|
|
'key': stream.key,
|
|
'uuid': req.session.login,
|
|
'live': data.live_at != null,
|
|
'live_at': new Date(parseInt(data.live_at)),
|
|
'last_stream': new Date(parseInt(data.last_stream))
|
|
})
|
|
})
|
|
|
|
// Get links
|
|
app.get('/dashboard/link', async (req, res) => {
|
|
if (!req.isStreamer) return res.jsonp({ error: 'Unauthorized' })
|
|
let user = req.session.login
|
|
|
|
let db = await dbPromise
|
|
let links = await db.all('SELECT * FROM link WHERE uuid = ?', user)
|
|
|
|
res.jsonp(links)
|
|
})
|
|
|
|
// Add link URL
|
|
app.post('/dashboard/link', async (req, res) => {
|
|
if (!req.isStreamer) return res.jsonp({ error: 'Unauthorized' })
|
|
let user = req.session.login
|
|
let name = req.body.name
|
|
let url = req.body.url
|
|
|
|
if (name == null || url == null) return res.jsonp({ error: 'Missing parameters!' })
|
|
if (name.length > 120) return res.jsonp({ error: 'Only 120 characters are allowed in the name.' })
|
|
if (name.indexOf('<') !== -1 || name.indexOf('>') !== -1) return res.jsonp({ error: 'HTML tags are forbidden!' })
|
|
|
|
// Validate URL
|
|
try {
|
|
URL.parse(url)
|
|
} catch (e) {
|
|
return res.jsonp({ error: 'Invalid URL!' })
|
|
}
|
|
|
|
// Checks
|
|
let db = await dbPromise
|
|
let links = await db.all('SELECT * FROM link WHERE uuid = ?', user)
|
|
if (links.length > 10) return res.jsonp({ error: 'You can currently only add up to 10 links!' })
|
|
|
|
let link = await db.get('SELECT * FROM link WHERE url = ? AND uuid = ?', url, user)
|
|
if (link) return res.jsonp({ error: 'This URL already exists!' })
|
|
|
|
// Add
|
|
await db.run('INSERT INTO link (name,url,uuid) VALUES (?,?,?)', name, url, user)
|
|
res.jsonp({ success: true })
|
|
})
|
|
|
|
// Remove link URL
|
|
app.post('/dashboard/link/delete', async (req, res) => {
|
|
if (!req.isStreamer) return res.jsonp({ error: 'Unauthorized' })
|
|
let user = req.session.login
|
|
|
|
if (req.body.name == null && req.body.url == null) return res.jsonp({ error: 'Missing parameters!' })
|
|
|
|
// Check
|
|
let db = await dbPromise
|
|
let link = await db.get('SELECT * FROM link WHERE url = ? AND uuid = ?', req.body.url, user)
|
|
if (!link) {
|
|
link = await db.get('SELECT * FROM link WHERE name = ? AND uuid = ?', req.body.name, user)
|
|
}
|
|
|
|
if (!link) return res.jsonp({ error: 'Invalid link parameter!' })
|
|
|
|
// Delete
|
|
await db.run('DELETE FROM link WHERE id = ?', link.id)
|
|
res.jsonp({ success: true })
|
|
})
|
|
|
|
// Player
|
|
app.get('/watch/:name', (req, res) => {
|
|
res.render('player.html', { name: req.params.name, server: streamServer })
|
|
})
|
|
|
|
app.get('/player/:name', (req, res) => {
|
|
res.redirect('/watch/' + req.params.name)
|
|
})
|
|
|
|
// Public data
|
|
app.get('/api/channel/:name', async (req, res) => {
|
|
let name = req.params.name
|
|
let db = await dbPromise
|
|
let data = await db.get('SELECT user_uuid,name,live_at,last_stream FROM channels WHERE name=?', name)
|
|
if (!data) return res.jsonp({ error: 'No such channel!' })
|
|
let links = await db.all('SELECT * FROM link WHERE uuid = ?', data.user_uuid)
|
|
|
|
delete data.user_uuid
|
|
data.live = data.live_at != null
|
|
data.live_at = new Date(parseInt(data.live_at))
|
|
data.last_stream = new Date(parseInt(data.last_stream))
|
|
data.links = links || []
|
|
data.viewers = Object.keys(cache.viewers[name] || {}).length
|
|
|
|
for (let i in data.links) {
|
|
delete data.links[i].id
|
|
delete data.links[i].uuid
|
|
}
|
|
|
|
res.jsonp(data)
|
|
})
|
|
|
|
// Error handler
|
|
app.use((error, req, res, next) => {
|
|
if (dev) console.error(error.stack)
|
|
res.send(error.message)
|
|
})
|
|
|
|
// Socket Server
|
|
wss.on('connection', (ws, request, client) => {
|
|
const userId = request.session.login || request.session.id
|
|
const username = request.session.username
|
|
|
|
dev && console.log(username || userId,'connected')
|
|
ws.on('message', (msg) => {
|
|
dev && console.log(userId,'said',msg)
|
|
if (msg.indexOf('watch ') === 0) {
|
|
let chan = msg.substring(6)
|
|
dev && console.log('adding a watcher to channel',chan)
|
|
if (cache.live.indexOf(chan) !== -1) {
|
|
if (!cache.viewers[chan]) cache.viewers[chan] = {}
|
|
cache.viewers[chan][userId] = username || 'A Friendly Guest'
|
|
}
|
|
} else if (msg.indexOf('stop ') === 0) {
|
|
let chan = msg.substring(5)
|
|
dev && console.log('removing a watcher from channel',chan)
|
|
if (cache.live.indexOf(chan) !== -1) {
|
|
if (cache.viewers[chan] && cache.viewers[chan][userId]) delete cache.viewers[chan][userId]
|
|
}
|
|
} else if (msg.indexOf('viewers ') === 0) {
|
|
let chan = msg.substring(8)
|
|
if (cache.viewers[chan] != null) ws.send('viewlist ' + Object.values(cache.viewers[chan]).join(','))
|
|
}
|
|
})
|
|
|
|
ws.on('close', () => {
|
|
dev && console.log(userId,'disconnected')
|
|
for (let chan in cache.viewers) {
|
|
let viewers = cache.viewers[chan]
|
|
if (viewers[userId]) {
|
|
delete cache.viewers[chan][userId]
|
|
}
|
|
}
|
|
})
|
|
|
|
ws.on('error', (e) => {
|
|
dev && console.error('Socket error:', e)
|
|
})
|
|
})
|
|
|
|
// Handle upgrade, parse included session
|
|
server.on('upgrade', (request, socket, head) => {
|
|
sessionParser(request, {}, () => {
|
|
wss.handleUpgrade(request, socket, head, function(ws) {
|
|
wss.emit('connection', ws, request)
|
|
})
|
|
})
|
|
})
|
|
|
|
// Start server
|
|
const host = dev ? '0.0.0.0' : '127.0.0.1'
|
|
server.listen(port, host, () => {
|
|
// Get currently live channels, for example, when server restarted while someone was live
|
|
(async function () {
|
|
let db = await dbPromise
|
|
let allLive = await db.all('SELECT name FROM channels WHERE live_at IS NOT NULL')
|
|
for (let i in allLive) {
|
|
cache.live.push(allLive[i].name)
|
|
}
|
|
console.log(`=> Found ${cache.live.length} channels still live`)
|
|
})().catch(e => console.error(e.stack))
|
|
|
|
console.log('Listening on %s:%d', host, port)
|
|
})
|