707 lines
20 KiB
JavaScript
707 lines
20 KiB
JavaScript
const connectSession = require('connect-redis')
|
|
const rateLimit = require("express-rate-limit")
|
|
const validemail = require('email-validator')
|
|
const session = require('express-session')
|
|
const bodyParser = require('body-parser')
|
|
const nodemailer = require('nodemailer')
|
|
const nunjucks = require('nunjucks')
|
|
const passport = require('passport')
|
|
const express = require('express')
|
|
const request = require('request')
|
|
const sqlite3 = require('sqlite3')
|
|
const sqlite = require('sqlite')
|
|
const xml2js = require('xml2js')
|
|
const crypto = require('crypto')
|
|
const WebSocket = require('ws')
|
|
const redis = require('redis')
|
|
const path = require('path')
|
|
const toml = require('toml')
|
|
const http = require('http')
|
|
const URL = require('url').URL
|
|
const fs = require('fs')
|
|
|
|
require('express-async-errors')
|
|
|
|
const SessionStore = connectSession(session)
|
|
|
|
const util = require('util')
|
|
const get = util.promisify(request.get)
|
|
|
|
const dev = process.env.NODE_ENV === 'development'
|
|
|
|
// Load Configuration
|
|
const filename = path.join(__dirname, 'config.toml')
|
|
|
|
let config
|
|
const cache = { _updated: 0, streamers: {}, viewers: {}, live: [] }
|
|
|
|
try {
|
|
config = toml.parse(fs.readFileSync(filename))
|
|
} catch (e) {
|
|
console.error(e)
|
|
process.exit(1)
|
|
}
|
|
|
|
config = Object.assign({
|
|
Streaming: {
|
|
port: '9322',
|
|
database: 'streaming.db',
|
|
streamServer: 'https://tv.icynet.eu/live/',
|
|
serverHost: 'icynet.eu',
|
|
publishAddress: 'rtmp://{host}:1935/hls-live/{streamer}',
|
|
secret: 'changeme'
|
|
},
|
|
Auth: {
|
|
strategy: 'passport-oauth2',
|
|
callbackURL: 'http://localhost:5000/auth/_callback/',
|
|
clientID: '1',
|
|
clientSecret: 'changeme'
|
|
},
|
|
Email: {
|
|
enabled: false,
|
|
from: 'no-reply@icynet.eu',
|
|
host: '',
|
|
port: 587,
|
|
secure: false,
|
|
baseURL: 'http://localhost:9321/',
|
|
auth: {
|
|
user: '',
|
|
pass: '',
|
|
},
|
|
tls: {
|
|
rejectUnauthorized: false,
|
|
},
|
|
}
|
|
}, config)
|
|
|
|
// Constants
|
|
const port = parseInt(config.Streaming.port, 10)
|
|
const streamServer = config.Streaming.streamServer
|
|
const streamServerHost = config.Streaming.serverHost
|
|
const streamAppName = streamServer.match(/\/([\w-_]+)\/$/)[1]
|
|
|
|
// Database
|
|
const dbPromise = sqlite.open({
|
|
filename: path.join(process.cwd(), config.Streaming.database),
|
|
driver: sqlite3.cached.Database
|
|
})
|
|
|
|
// Setup server
|
|
const app = express()
|
|
const server = http.createServer(app)
|
|
const wss = new WebSocket.Server({ clientTracking: false, noServer: true })
|
|
|
|
// Rate limits
|
|
const emlLimiter = rateLimit({
|
|
windowMs: 1000 * 60 * 60,
|
|
max: 16,
|
|
message: 'Too many subscription attempts from this IP address. Try again in an hour.'
|
|
})
|
|
|
|
// Authentication
|
|
const Strategy = require(config.Auth.strategy)
|
|
const strategyConfig = Object.assign({}, config.Auth)
|
|
if (!strategyConfig.provider) strategyConfig.provider = strategyConfig.strategy.replace('passport-', '')
|
|
passport.use(new Strategy(strategyConfig, function (accessToken, refreshToken, profile, done) {
|
|
process.nextTick(function () {
|
|
return done(null, profile)
|
|
})
|
|
}))
|
|
|
|
// Email
|
|
let emailTransport;
|
|
if (config.Email.enabled) {
|
|
emailTransport = nodemailer.createTransport({
|
|
...config.Email,
|
|
pool: true,
|
|
})
|
|
}
|
|
|
|
const notifQueue = []
|
|
const notifHistory = {}
|
|
|
|
function now() {
|
|
return Math.floor(Date.now() / 1000)
|
|
}
|
|
|
|
function key() {
|
|
return crypto.randomBytes(32).toString('hex').slice(0, 32)
|
|
}
|
|
|
|
async function sendEmailPush(channel) {
|
|
if (!emailTransport) {
|
|
return
|
|
}
|
|
|
|
// Don't re-send notifications within an hour if a channel happens to go live again
|
|
if (notifHistory[channel] && notifHistory[channel] > now() - 3600) {
|
|
return
|
|
}
|
|
notifHistory[channel] = now()
|
|
|
|
const db = await dbPromise
|
|
const data = await db.get('SELECT name FROM channels WHERE id = ?', channel)
|
|
if (!data) {
|
|
return;
|
|
}
|
|
|
|
const subs = await db.all('SELECT email,unsubkey FROM emailsub WHERE uuid = ? AND active = 1', channel);
|
|
for (const sub of subs) {
|
|
const unsubURL = config.Email.baseURL + 'unsubscribe/' + sub.unsubkey
|
|
const watchURL = config.Email.baseURL + 'watch/' + data.name
|
|
emailTransport.sendMail({
|
|
from: config.Email.from,
|
|
to: sub.email,
|
|
subject: `🔴 ${data.name} has gone LIVE on IcyTV!`,
|
|
text: `${data.name} has gone LIVE on IcyTV!\nWatch now: ${watchURL}`
|
|
+ `\n\nUnsubscribe from ${data.name}: ${unsubURL}`,
|
|
html: `<h1>${data.name} has gone LIVE on IcyTV!</h1><p>Watch now: `
|
|
+ `<a href="${watchURL}" target="_blank" rel="nofollow">${watchURL}</a>`
|
|
+ `</p><br/><p>Unsubscribe from ${data.name}: `
|
|
+ `<a href="${unsubURL}" target="_blank" rel="nofollow">${unsubURL}</a></p>`,
|
|
}).catch(e => console.error(e))
|
|
}
|
|
}
|
|
|
|
async function subscribeToChannel(channel, email) {
|
|
if (!emailTransport) {
|
|
return
|
|
}
|
|
|
|
const db = await dbPromise
|
|
const data = await db.get('SELECT id FROM channels WHERE name = ?', channel)
|
|
if (!data) {
|
|
throw new Error('Invalid channel!')
|
|
}
|
|
|
|
const exists = await db.get('SELECT * FROM emailsub WHERE email = ? AND uuid = ?', [email, data.id])
|
|
if (exists) {
|
|
throw new Error('A subscription already exists for this email address.')
|
|
}
|
|
|
|
// New verification email
|
|
const activateKey = key()
|
|
const unsubKey = key()
|
|
const activateURL = config.Email.baseURL + 'email/' + activateKey
|
|
await db.run('INSERT INTO emailsub (unsubkey, activatekey, email, uuid, active, created_at) VALUES '
|
|
+ '(?, ?, ?, ?, 0, ?)', [unsubKey, activateKey, email, data.id, now()])
|
|
|
|
await emailTransport.sendMail({
|
|
from: config.Email.from,
|
|
to: email,
|
|
subject: `Confirm IcyTV subscription to channel ${channel}`,
|
|
text: `Confirm your subscription\n\nClick here to subscribe to ${channel}: ${activateURL} `
|
|
+ `\n\nIf you did not subscribe to ${channel} on IcyTV, please ignore this email `
|
|
+ `\nand no further action is required on your part. If these emails persist, please `
|
|
+ `\ncontact us via abuse@icynet.eu and we'll be sure to help you.`,
|
|
html: `<h1>Confirm your subscription</h1><p>Click here to subscribe to ${channel}: `
|
|
+ `<a href="${activateURL}" target="_blank" rel="nofollow">${activateURL}</a>`
|
|
+ `</p><br/><p>If you did not subscribe to ${channel} on IcyTV, please ignore this email `
|
|
+ `and no further action is required on your part. If these emails persist, please contact us via `
|
|
+ `<a href="mailto:abuse@icynet.eu">abuse@icynet.eu</a> and we'll be sure to help you.</p>`,
|
|
})
|
|
}
|
|
|
|
async function unsubscribe(key) {
|
|
const db = await dbPromise
|
|
await db.run('DELETE FROM emailsub WHERE unsubkey = ?', key)
|
|
}
|
|
|
|
async function activateSubscription(key) {
|
|
const db = await dbPromise
|
|
await db.run('UPDATE emailsub SET active = 1 WHERE activatekey = ?', key)
|
|
}
|
|
|
|
passport.serializeUser(function (user, done) {
|
|
done(null, user)
|
|
})
|
|
|
|
passport.deserializeUser(function (user, done) {
|
|
done(null, user)
|
|
})
|
|
|
|
app.enable('trust proxy', 1)
|
|
|
|
app.use(bodyParser.urlencoded({ extended: false }))
|
|
app.use(bodyParser.json())
|
|
|
|
app.disable('x-powered-by')
|
|
|
|
nunjucks.configure('templates', {
|
|
autoescape: true,
|
|
express: app
|
|
})
|
|
|
|
const sessionParser = session({
|
|
key: 'Streamserver Session',
|
|
secret: config.Streaming.secret,
|
|
resave: false,
|
|
saveUninitialized: true,
|
|
store: new SessionStore({ client: redis.createClient() }),
|
|
cookie: {
|
|
secure: !dev,
|
|
maxAge: 2678400000 // 1 month
|
|
}
|
|
})
|
|
|
|
app.use(sessionParser)
|
|
|
|
app.use(passport.initialize())
|
|
app.use(passport.session())
|
|
|
|
app.use((req, res, next) => {
|
|
if (!req.session.csrf) {
|
|
req.session.csrf = key()
|
|
}
|
|
next()
|
|
})
|
|
|
|
// Parse stream metrics from the stat.xml file
|
|
async function pullMetrics (uuid) {
|
|
const statPath = streamServer + 'stat'
|
|
if (!cache.stats || cache._updated < Date.now() - 5000) {
|
|
const { body } = await get(statPath)
|
|
const rip = await xml2js.parseStringPromise(body)
|
|
if (!rip.rtmp.server) throw new Error('Invalid response from server.')
|
|
|
|
// Autofind the correct server
|
|
const rtmpserver = rip.rtmp.server[0].application
|
|
let rtmpapp
|
|
for (const i in rtmpserver) {
|
|
if (rtmpserver[i].name[0] !== streamAppName) continue
|
|
rtmpapp = rtmpserver[i]
|
|
}
|
|
|
|
if (!rtmpapp) throw new Error('Invalid response from server.')
|
|
|
|
cache.stats = rtmpapp.live
|
|
cache._updated = Date.now()
|
|
}
|
|
|
|
// Extract applicable stream data
|
|
let forUser
|
|
for (const i in cache.stats) {
|
|
if (!cache.stats[i].stream) continue
|
|
if (cache.stats[i].stream[0].name[0] !== uuid) continue
|
|
forUser = cache.stats[i].stream[0]
|
|
}
|
|
|
|
if (!forUser) return null
|
|
|
|
// Generic data object
|
|
const data = {
|
|
time: forUser.time[0],
|
|
bytes: forUser.bytes_in[0],
|
|
video: null,
|
|
audio: null
|
|
}
|
|
|
|
// Add video metadata, if applicable
|
|
if (forUser.meta[0].video[0] !== '') {
|
|
data.video = {
|
|
width: forUser.meta[0].video[0].width[0],
|
|
height: forUser.meta[0].video[0].height[0],
|
|
frame_rate: forUser.meta[0].video[0].frame_rate[0],
|
|
codec: forUser.meta[0].video[0].codec[0]
|
|
}
|
|
}
|
|
|
|
// Add audio metadata, if applicable
|
|
if (forUser.meta[0].audio[0] !== '') {
|
|
data.audio = {
|
|
sample_rate: forUser.meta[0].audio[0].sample_rate[0],
|
|
channels: forUser.meta[0].audio[0].channels[0],
|
|
codec: forUser.meta[0].audio[0].codec[0]
|
|
}
|
|
}
|
|
|
|
return data
|
|
}
|
|
|
|
// Handle requests from nginx-rtmp-module
|
|
|
|
app.post('/publish', async (req, res) => {
|
|
if (!req.body.name) throw new Error('Invalid request.')
|
|
const db = await dbPromise
|
|
|
|
// Validate stream key
|
|
const streamer = await db.get('SELECT * FROM channels WHERE key=?', req.body.name)
|
|
|
|
if (!streamer) throw new Error('Invalid stream key.')
|
|
console.log('=> Streamer %s has started streaming!', streamer.name)
|
|
|
|
// Generate real publish address for the server
|
|
const publishAddress = config.Streaming.publishAddress
|
|
.replace('{streamer}', streamer.name)
|
|
.replace('{host}', '127.0.0.1')
|
|
|
|
// Set channel streaming status
|
|
db.run('UPDATE channels SET live_at=? WHERE id=?', Date.now(), streamer.id)
|
|
cache.live.push(streamer.name)
|
|
|
|
// Send notifications
|
|
if (!notifQueue.includes(streamer.id)) {
|
|
notifQueue.push(streamer.id)
|
|
}
|
|
|
|
// Redirect the streaming server to the target
|
|
res.set('Location', publishAddress)
|
|
res.status(302)
|
|
res.end()
|
|
})
|
|
|
|
app.post('/publish_done', async (req, res) => {
|
|
if (!req.body.name) throw new Error('Invalid request.')
|
|
|
|
const db = await dbPromise
|
|
const chan = await db.get('SELECT * FROM channels WHERE key = ?', req.body.name)
|
|
console.log('<= Streamer %s has stopped streaming!', chan.name)
|
|
|
|
try { delete cache.viewers[chan.name] } catch (e) {}
|
|
if (cache.live.indexOf(chan.name) !== -1) cache.live.splice(cache.live.indexOf(chan.name), 1)
|
|
|
|
db.run('UPDATE channels SET live_at=NULL, last_stream=? WHERE key=?', Date.now(), chan.key)
|
|
|
|
res.send('OK')
|
|
})
|
|
|
|
// Front-end server
|
|
// OAuth2 authenticator
|
|
app.get('/login', passport.authenticate(strategyConfig.provider, Object.assign({}, strategyConfig.authOptions || {})))
|
|
|
|
app.get('/auth/_callback', passport.authenticate(strategyConfig.provider, { failureRedirect: '/' }), async (req, res) => {
|
|
dev && console.log(req.user.username, 'logged in')
|
|
// Get user from database
|
|
const db = await dbPromise
|
|
const user = await db.get('SELECT * FROM signed_users WHERE uuid=?', req.user.uuid)
|
|
if (!user) {
|
|
await db.run('INSERT INTO signed_users (uuid,name) VALUES (?,?)', req.user.uuid, req.user.username)
|
|
}
|
|
|
|
// Lets see if this user is a streamer
|
|
const streamer = await db.get('SELECT * FROM channels WHERE user_uuid = ?', req.user.uuid)
|
|
if (streamer) cache.streamers[req.user.uuid] = streamer
|
|
|
|
res.redirect('/')
|
|
})
|
|
|
|
app.get('/logout', (req, res) => {
|
|
req.logout()
|
|
res.redirect('/')
|
|
})
|
|
|
|
function authed (req, res, next) {
|
|
if (req.isAuthenticated() && req.isStreamer) return next()
|
|
res.jsonp({ error: 'Unauthorized' })
|
|
}
|
|
|
|
// Views
|
|
|
|
app.use('/dist', express.static(path.join(__dirname, 'dist'), { maxAge: dev ? 0 : 2678400000 }))
|
|
app.use(async function (req, res, next) {
|
|
req.isStreamer = false
|
|
if (!req.isAuthenticated()) return next()
|
|
|
|
res.locals.user = req.user
|
|
|
|
if (!cache.streamers[req.user.uuid]) {
|
|
const db = await dbPromise
|
|
const streamer = await db.get('SELECT * FROM channels WHERE user_uuid = ?', req.user.uuid)
|
|
if (streamer) cache.streamers[req.user.uuid] = streamer
|
|
}
|
|
|
|
if (cache.streamers[req.user.uuid]) {
|
|
req.isStreamer = true
|
|
return next()
|
|
}
|
|
|
|
next()
|
|
})
|
|
|
|
// Index
|
|
app.get('/', (req, res) => {
|
|
res.render('index.html', { streamer: req.isStreamer })
|
|
})
|
|
|
|
// Dashboard
|
|
app.get('/dashboard', authed, (req, res) => {
|
|
const stream = cache.streamers[req.user.uuid]
|
|
res.render('dashboard.html', { server: 'rtmp://' + streamServerHost + '/live/' })
|
|
})
|
|
|
|
// Stats
|
|
app.get('/dashboard/stats', authed, async (req, res) => {
|
|
const stream = cache.streamers[req.user.uuid]
|
|
let data
|
|
|
|
try {
|
|
data = await pullMetrics(stream.key)
|
|
} catch (e) {
|
|
return res.jsonp({ error: e.message })
|
|
}
|
|
|
|
if (!data) return res.jsonp({ error: 'No data was returned.' })
|
|
res.jsonp(data)
|
|
})
|
|
|
|
// Data
|
|
app.get('/dashboard/data', authed, async (req, res) => {
|
|
const stream = cache.streamers[req.user.uuid]
|
|
let data
|
|
|
|
const db = await dbPromise
|
|
|
|
try {
|
|
data = await db.get('SELECT * FROM channels WHERE key=?', stream.key)
|
|
} catch (e) {
|
|
return res.jsonp({ error: 'Unauthorized' })
|
|
}
|
|
|
|
if (!data) return res.jsonp({ error: 'Unauthorized' })
|
|
|
|
res.jsonp({
|
|
name: data.name,
|
|
key: stream.key,
|
|
uuid: req.user.uuid,
|
|
live: data.live_at != null,
|
|
live_at: new Date(parseInt(data.live_at, 10)),
|
|
last_stream: new Date(parseInt(data.last_stream, 10))
|
|
})
|
|
})
|
|
|
|
// Get links
|
|
app.get('/dashboard/link', authed, async (req, res) => {
|
|
const user = req.user.uuid
|
|
|
|
const db = await dbPromise
|
|
const links = await db.all('SELECT * FROM link WHERE uuid = ?', user)
|
|
|
|
res.jsonp(links)
|
|
})
|
|
|
|
// Add link URL
|
|
app.post('/dashboard/link', authed, async (req, res) => {
|
|
const user = req.user.uuid
|
|
const name = req.body.name
|
|
const url = req.body.url
|
|
|
|
if (name == null || url == null) return res.jsonp({ error: 'Missing parameters!' })
|
|
if (name.length > 120) return res.jsonp({ error: 'Only 120 characters are allowed in the name.' })
|
|
if (name.length < 3) return res.jsonp({ error: 'Minimum name length is 3 characters.' })
|
|
if (name.indexOf('<') !== -1 || name.indexOf('>') !== -1 ||
|
|
url.indexOf('<') !== -1 || url.indexOf('>') !== -1) return res.jsonp({ error: 'HTML tags are forbidden!' })
|
|
|
|
// Validate URL
|
|
const a = new URL(url)
|
|
if (a.protocol === '' || a.host === '') return res.jsonp({ error: 'Invalid URL!' })
|
|
|
|
// Checks
|
|
const db = await dbPromise
|
|
const links = await db.all('SELECT * FROM link WHERE uuid = ?', user)
|
|
if (links.length > 10) return res.jsonp({ error: 'You can currently only add up to 10 links!' })
|
|
|
|
const link = await db.get('SELECT * FROM link WHERE url = ? AND uuid = ?', url, user)
|
|
if (link) return res.jsonp({ error: 'This URL already exists!' })
|
|
|
|
// Add
|
|
await db.run('INSERT INTO link (name,url,uuid) VALUES (?,?,?)', name, url, user)
|
|
res.jsonp({ success: true })
|
|
})
|
|
|
|
// Remove link URL
|
|
app.post('/dashboard/link/delete', authed, async (req, res) => {
|
|
const user = req.user.uuid
|
|
|
|
if (req.body.name == null && req.body.url == null) return res.jsonp({ error: 'Missing parameters!' })
|
|
|
|
// Check
|
|
const db = await dbPromise
|
|
let link = await db.get('SELECT * FROM link WHERE url = ? AND uuid = ?', req.body.url, user)
|
|
if (!link) {
|
|
link = await db.get('SELECT * FROM link WHERE name = ? AND uuid = ?', req.body.name, user)
|
|
}
|
|
|
|
if (!link) return res.jsonp({ error: 'Invalid link parameter!' })
|
|
|
|
// Delete
|
|
await db.run('DELETE FROM link WHERE id = ?', link.id)
|
|
res.jsonp({ success: true })
|
|
})
|
|
|
|
// Player
|
|
app.get('/watch/:name', (req, res) => {
|
|
res.render('player.html', {
|
|
name: req.params.name,
|
|
server: streamServer,
|
|
csrf: req.session.csrf,
|
|
email: emailTransport != null,
|
|
})
|
|
})
|
|
|
|
app.get('/player/:name', (req, res) => {
|
|
res.redirect('/watch/' + req.params.name)
|
|
})
|
|
|
|
// Public data
|
|
app.get('/api/channel/:name', async (req, res) => {
|
|
const name = req.params.name
|
|
const db = await dbPromise
|
|
const data = await db.get('SELECT user_uuid,name,live_at,last_stream,chat_channel FROM channels WHERE name=?', name)
|
|
if (!data) return res.jsonp({ error: 'No such channel!' })
|
|
const links = await db.all('SELECT name,url FROM link WHERE uuid = ?', data.user_uuid)
|
|
|
|
delete data.user_uuid
|
|
data.live = data.live_at != null
|
|
data.live_at = new Date(parseInt(data.live_at, 10))
|
|
data.last_stream = new Date(parseInt(data.last_stream, 10))
|
|
data.links = links || []
|
|
data.viewers = Object.keys(cache.viewers[name] || {}).length
|
|
data.source = streamServer + name + '.m3u8'
|
|
|
|
res.jsonp(data)
|
|
})
|
|
|
|
app.post('/api/email/:channel', emlLimiter, async (req, res) => {
|
|
if (!emailTransport) {
|
|
return res.json({ message: 'Email transport is disabled.' })
|
|
}
|
|
|
|
const csrf = req.body.csrf
|
|
if (!csrf || !req.session.csrf || req.session.csrf !== csrf) {
|
|
return res.status(400).json({error: true, message: 'Illegal request!'})
|
|
}
|
|
|
|
const email = req.body.email
|
|
if (!email || !validemail.validate(email)) {
|
|
return res.status(400).json({error: true, message: 'Invalid email address!'})
|
|
}
|
|
|
|
try {
|
|
await subscribeToChannel(req.params.channel, email)
|
|
} catch (e) {
|
|
return res.status(400).json({error: true, message: e.message})
|
|
}
|
|
res.json({ message: 'Confirmation email has been sent!' })
|
|
})
|
|
|
|
app.get('/email/:key', async (req, res) => {
|
|
if (!emailTransport) {
|
|
return res.redirect('/?activated=false')
|
|
}
|
|
await activateSubscription(req.params.key)
|
|
res.redirect('/?activated=true')
|
|
})
|
|
|
|
app.get('/unsubscribe/:key', async (req, res) => {
|
|
if (!emailTransport) {
|
|
return res.redirect('/?unsubscribe=false')
|
|
}
|
|
await unsubscribe(req.params.key)
|
|
res.redirect('/?unsubscribe=true')
|
|
})
|
|
|
|
// Error handler
|
|
app.use((error, req, res, next) => {
|
|
if (dev) console.error(error.stack)
|
|
res.send(error.message)
|
|
})
|
|
|
|
// Socket Server
|
|
wss.on('connection', (ws, request, client) => {
|
|
let userId = request.session.id
|
|
let username = 'A Friendly Guest'
|
|
const myChannels = []
|
|
|
|
if (request.user) {
|
|
userId = request.user.uuid
|
|
username = request.user.username
|
|
}
|
|
|
|
dev && console.log(userId, 'connected')
|
|
ws.on('message', (msg) => {
|
|
dev && console.log(userId, 'said', msg)
|
|
const is = msg.toString().trim().split(' ')
|
|
const chan = is[1]
|
|
if (!chan) return
|
|
switch (is[0]) {
|
|
case 'watch':
|
|
dev && console.log('adding watcher', userId, 'to channel', chan)
|
|
if (cache.live.indexOf(chan) !== -1) {
|
|
if (!cache.viewers[chan]) cache.viewers[chan] = {}
|
|
cache.viewers[chan][userId] = username
|
|
if (myChannels.indexOf(chan) === -1) myChannels.push(chan)
|
|
}
|
|
break
|
|
case 'stop':
|
|
dev && console.log('removing watcher', userId, 'from channel', chan)
|
|
if (cache.live.indexOf(chan) !== -1) {
|
|
if (cache.viewers[chan] && cache.viewers[chan][userId]) delete cache.viewers[chan][userId]
|
|
if (myChannels.indexOf(chan) !== -1) myChannels.splice(myChannels.indexOf(chan), 1)
|
|
}
|
|
break
|
|
case 'viewers':
|
|
if (cache.viewers[chan] != null) ws.send('viewlist ' + Object.values(cache.viewers[chan]).join(','))
|
|
break
|
|
}
|
|
})
|
|
|
|
ws.on('close', () => {
|
|
dev && console.log(userId, 'disconnected')
|
|
for (const i in myChannels) {
|
|
const chan = myChannels[i]
|
|
const viewers = cache.viewers[chan]
|
|
if (viewers && viewers[userId]) delete cache.viewers[chan][userId]
|
|
}
|
|
})
|
|
|
|
ws.on('error', (e) => {
|
|
dev && console.error('Socket error:', e)
|
|
})
|
|
})
|
|
|
|
// Handle upgrade, parse included session
|
|
server.on('upgrade', (request, socket, head) => {
|
|
sessionParser(request, {}, () => {
|
|
if (!request.session || !request.session.id) return socket.destroy()
|
|
if (request.session && request.session.passport) {
|
|
request.user = request.session.passport.user
|
|
}
|
|
|
|
wss.handleUpgrade(request, socket, head, function (ws) {
|
|
wss.emit('connection', ws, request)
|
|
})
|
|
})
|
|
})
|
|
|
|
// Stream start notifications pump
|
|
function notify() {
|
|
const channel = notifQueue.pop()
|
|
if (channel) {
|
|
sendEmailPush(channel).catch(e => console.error(e))
|
|
}
|
|
setTimeout(notify, notifQueue.length ? 1000 : 5000)
|
|
}
|
|
|
|
// Start server
|
|
const host = dev ? '0.0.0.0' : '127.0.0.1'
|
|
server.listen(port, host, () => {
|
|
// Get currently live channels, for example, when server restarted while someone was live
|
|
(async function () {
|
|
const db = await dbPromise
|
|
await db.migrate()
|
|
|
|
const allLive = await db.all('SELECT name FROM channels WHERE live_at IS NOT NULL')
|
|
|
|
for (const i in allLive) {
|
|
cache.live.push(allLive[i].name)
|
|
}
|
|
|
|
console.log(`=> Found ${cache.live.length} channels still live`)
|
|
})().catch(e => console.error(e.stack))
|
|
|
|
console.log('Listening on %s:%d', host, port)
|
|
console.log('Authentication module: %s (%s)', strategyConfig.strategy, strategyConfig.provider)
|
|
|
|
notify()
|
|
})
|