Websockets viewer count

This commit is contained in:
Evert Prants 2019-10-23 16:43:57 +03:00
parent f82214a6f5
commit ffd4079a42
Signed by: evert
GPG Key ID: 1688DA83D222D0B5
7 changed files with 177 additions and 15 deletions

95
app.js
View File

@ -1,16 +1,18 @@
const express = require('express') const connectSession = require('connect-redis')
const session = require('express-session') const session = require('express-session')
const bodyParser = require('body-parser') const bodyParser = require('body-parser')
const express = require('express')
const request = require('request') const request = require('request')
const nunjucks = require('nunjucks') const nunjucks = require('nunjucks')
const sqlite = require('sqlite') const sqlite = require('sqlite')
const xml2js = require('xml2js') const xml2js = require('xml2js')
const path = require('path') const path = require('path')
const toml = require('toml') const toml = require('toml')
const http = require('http')
const fs = require('fs') const fs = require('fs')
const WebSocket = require('ws')
const uuid = require('uuid/v4') const uuid = require('uuid/v4')
const redis = require('redis') const redis = require('redis')
const connectSession = require('connect-redis')
const URL = require('url') const URL = require('url')
require('express-async-errors') require('express-async-errors')
@ -27,7 +29,7 @@ const dev = process.env.NODE_ENV === 'development'
const filename = path.join(__dirname, 'config.toml') const filename = path.join(__dirname, 'config.toml')
let config let config
let cache = { _updated: 0, streamers: {} } let cache = { _updated: 0, streamers: {}, viewers: {}, live: [] }
try { try {
config = toml.parse(fs.readFileSync(filename)) config = toml.parse(fs.readFileSync(filename))
@ -81,7 +83,9 @@ const dbPromise = Promise.resolve()
.then(db => db.migrate()) .then(db => db.migrate())
// Setup server // Setup server
let app = express() const app = express()
const server = http.createServer(app)
const wss = new WebSocket.Server({ clientTracking: false, noServer: true })
app.enable('trust proxy', 1) app.enable('trust proxy', 1)
@ -95,7 +99,7 @@ nunjucks.configure('templates', {
express: app express: app
}) })
app.use(session({ const sessionParser = session({
key: 'Streamserver Session', key: 'Streamserver Session',
secret: config['Streaming']['Secret'], secret: config['Streaming']['Secret'],
resave: false, resave: false,
@ -105,7 +109,9 @@ app.use(session({
secure: !dev, secure: !dev,
maxAge: 2678400000 // 1 month maxAge: 2678400000 // 1 month
} }
})) })
app.use(sessionParser)
// Parse stream metrics from the stat.xml file // Parse stream metrics from the stat.xml file
async function pullMetrics (uuid) { async function pullMetrics (uuid) {
@ -179,7 +185,7 @@ app.post('/publish', async (req, res) => {
let streamer = await db.get('SELECT * FROM channels WHERE key=?', req.body.name) let streamer = await db.get('SELECT * FROM channels WHERE key=?', req.body.name)
if (!streamer) throw new Error('Invalid stream key.') if (!streamer) throw new Error('Invalid stream key.')
console.log('Streamer %s has started streaming!', streamer.name) console.log('=> Streamer %s has started streaming!', streamer.name)
// Generate real publish address for the server // Generate real publish address for the server
let publishAddress = config['Streaming']['PublishAddress'] let publishAddress = config['Streaming']['PublishAddress']
@ -188,6 +194,7 @@ app.post('/publish', async (req, res) => {
// Set channel streaming status // Set channel streaming status
db.run('UPDATE channels SET live_at=? WHERE id=?', Date.now(), streamer.id) db.run('UPDATE channels SET live_at=? WHERE id=?', Date.now(), streamer.id)
cache.live.push(streamer.name)
// Redirect the streaming server to the target // Redirect the streaming server to the target
res.set('Location', publishAddress) res.set('Location', publishAddress)
@ -199,7 +206,13 @@ app.post('/publish_done', async (req, res) => {
if (!req.body.name) throw new Error('Invalid request.') if (!req.body.name) throw new Error('Invalid request.')
let db = await dbPromise let db = await dbPromise
db.run('UPDATE channels SET live_at=NULL, last_stream=? WHERE key=?', Date.now(), req.body.name) let chan = await db.get('SELECT * FROM channels WHERE key = ?', req.body.name)
console.log('<= Streamer %s has stopped streaming!', chan.name)
try { delete cache.viewers[chan.name] } catch (e) {}
if (cache.live.indexOf(chan.name) !== -1) cache.live.splice(cache.live.indexOf(chan.name), 1)
db.run('UPDATE channels SET live_at=NULL, last_stream=? WHERE key=?', Date.now(), chan.key)
res.send('OK') res.send('OK')
}) })
@ -442,6 +455,7 @@ app.get('/api/channel/:name', async (req, res) => {
data.live_at = new Date(parseInt(data.live_at)) data.live_at = new Date(parseInt(data.live_at))
data.last_stream = new Date(parseInt(data.last_stream)) data.last_stream = new Date(parseInt(data.last_stream))
data.links = links || [] data.links = links || []
data.viewers = Object.keys(cache.viewers[name] || {}).length
for (let i in data.links) { for (let i in data.links) {
delete data.links[i].id delete data.links[i].id
@ -457,6 +471,69 @@ app.use((error, req, res, next) => {
res.send(error.message) res.send(error.message)
}) })
// Socket Server
wss.on('connection', (ws, request, client) => {
const userId = request.session.login || request.session.id
const username = request.session.username
dev && console.log(username || userId,'connected')
ws.on('message', (msg) => {
dev && console.log(userId,'said',msg)
if (msg.indexOf('watch ') === 0) {
let chan = msg.substring(6)
dev && console.log('adding a watcher to channel',chan)
if (cache.live.indexOf(chan) !== -1) {
if (!cache.viewers[chan]) cache.viewers[chan] = {}
cache.viewers[chan][userId] = username || 'A Friendly Guest'
}
} else if (msg.indexOf('stop ') === 0) {
let chan = msg.substring(5)
dev && console.log('removing a watcher from channel',chan)
if (cache.live.indexOf(chan) !== -1) {
if (cache.viewers[chan] && cache.viewers[chan][userId]) delete cache.viewers[chan][userId]
}
} else if (msg.indexOf('viewers ') === 0) {
let chan = msg.substring(8)
if (cache.viewers[chan] != null) ws.send('viewlist ' + Object.values(cache.viewers[chan]).join(','))
}
})
ws.on('close', () => {
dev && console.log(userId,'disconnected')
for (let chan in cache.viewers) {
let viewers = cache.viewers[chan]
if (viewers[userId]) {
delete cache.viewers[chan][userId]
}
}
})
ws.on('error', (e) => {
dev && console.error('Socket error:', e)
})
})
// Handle upgrade, parse included session
server.on('upgrade', (request, socket, head) => {
sessionParser(request, {}, () => {
wss.handleUpgrade(request, socket, head, function(ws) {
wss.emit('connection', ws, request)
})
})
})
// Start server // Start server
const host = dev ? '0.0.0.0' : '127.0.0.1' const host = dev ? '0.0.0.0' : '127.0.0.1'
app.listen(port, host, () => console.log('Listening on %s:%d', host, port)) server.listen(port, host, () => {
// Get currently live channels, for example, when server restarted while someone was live
(async function () {
let db = await dbPromise
let allLive = await db.all('SELECT name FROM channels WHERE live_at IS NOT NULL')
for (let i in allLive) {
cache.live.push(allLive[i].name)
}
console.log(`=> Found ${cache.live.length} channels still live`)
})().catch(e => console.error(e.stack))
console.log('Listening on %s:%d', host, port)
})

View File

@ -44,6 +44,17 @@ http {
} }
location @distrib { location @distrib {
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $http_host;
proxy_set_header X-NginX-Proxy true;
# socket support
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_pass http://localhost:5000; proxy_pass http://localhost:5000;
proxy_redirect off; proxy_redirect off;
} }

13
package-lock.json generated
View File

@ -532,6 +532,11 @@
"resolved": "https://registry.npmjs.org/async-each/-/async-each-1.0.1.tgz", "resolved": "https://registry.npmjs.org/async-each/-/async-each-1.0.1.tgz",
"integrity": "sha1-GdOGodntxufByF04iu28xW0zYC0=" "integrity": "sha1-GdOGodntxufByF04iu28xW0zYC0="
}, },
"async-limiter": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.1.tgz",
"integrity": "sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ=="
},
"asynckit": { "asynckit": {
"version": "0.4.0", "version": "0.4.0",
"resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz",
@ -6369,6 +6374,14 @@
"signal-exit": "^3.0.2" "signal-exit": "^3.0.2"
} }
}, },
"ws": {
"version": "7.2.0",
"resolved": "https://registry.npmjs.org/ws/-/ws-7.2.0.tgz",
"integrity": "sha512-+SqNqFbwTm/0DC18KYzIsMTnEWpLwJsiasW/O17la4iDRRIO9uaHbvKiAS3AHgTiuuWerK/brj4O6MYZkei9xg==",
"requires": {
"async-limiter": "^1.0.0"
}
},
"xdg-basedir": { "xdg-basedir": {
"version": "3.0.0", "version": "3.0.0",
"resolved": "https://registry.npmjs.org/xdg-basedir/-/xdg-basedir-3.0.0.tgz", "resolved": "https://registry.npmjs.org/xdg-basedir/-/xdg-basedir-3.0.0.tgz",

View File

@ -32,6 +32,7 @@
"sqlite": "^3.0.3", "sqlite": "^3.0.3",
"toml": "^3.0.0", "toml": "^3.0.0",
"uuid": "^3.3.2", "uuid": "^3.3.2",
"ws": "^7.2.0",
"xml2js": "^0.4.22" "xml2js": "^0.4.22"
} }
} }

View File

@ -19,7 +19,7 @@ body {
width: 100%; width: 100%;
height: 100%; height: 100%;
} }
.live { .badge {
position: absolute; position: absolute;
color: white; color: white;
font-size: 120%; font-size: 120%;
@ -29,7 +29,10 @@ body {
padding: 10px; padding: 10px;
border-radius: 5px; border-radius: 5px;
} }
.live.offline { .badge.viewers {
right: 0;
}
.badge.live.offline {
background-color: rgba(93, 93, 93, 0.7); background-color: rgba(93, 93, 93, 0.7);
} }
.overlay { .overlay {

View File

@ -16,6 +16,7 @@ let bigbtn = overlay.querySelector('.bigplaybtn')
let volumebar = overlay.querySelector('#volume_seek') let volumebar = overlay.querySelector('#volume_seek')
let volumeseek = volumebar.querySelector('.seeker') let volumeseek = volumebar.querySelector('.seeker')
let volumeseekInner = volumeseek.querySelector('.seekbar') let volumeseekInner = volumeseek.querySelector('.seekbar')
let viewers = overlay.querySelector('.viewers')
let links let links
let linksList let linksList
@ -27,6 +28,7 @@ let vidReady = false
let shouldHide = true let shouldHide = true
let inFullscreen = false let inFullscreen = false
let errored = false let errored = false
let ws
function GET (url, istext) { function GET (url, istext) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
@ -74,20 +76,71 @@ function updateVolume () {
volumeseekInner.style.width = vid.volume * 100 + '%' volumeseekInner.style.width = vid.volume * 100 + '%'
} }
function viewersCount (res) {
viewers.style.display = 'block'
viewers.innerHTML = res.length + ' watching'
}
function handleWebSocket (live) {
if (!live && ws) {
ws.onerror = ws.onopen = ws.onclose = null
ws.close()
ws = null
return
}
if (ws) return
ws = new WebSocket(`ws://${location.host}`)
ws.onerror = function(e) {
console.error('Socket errored, retrying..', e)
handleWebSocket(false)
setTimeout(() => handleWebSocket(vidReady), 5000)
}
ws.onopen = function() {
console.log('Upstream socket connection established')
if (!vid.paused) ws.send('watch ' + STREAM_NAME)
ws.onmessage = function (event) {
if (!event) return
let message = event.data
if (message.indexOf('viewlist ') === 0) {
let str = message.substring(9)
let list = str.split(',')
if (str === '') list = []
viewersCount(list)
}
}
}
ws.onclose = function() {
console.error('Socket died, retrying..')
ws = null
setTimeout(() => handleWebSocket(vidReady), 5000)
}
}
function liveStatus (status) { function liveStatus (status) {
if (status) { if (status) {
lstat.innerHTML = 'live now' lstat.innerHTML = 'live now'
lstat.className = 'live' lstat.className = 'badge live'
clearTimeout(retryTimeout) clearTimeout(retryTimeout)
if (vid.paused) { if (vid.paused) {
showBigBtn(true) showBigBtn(true)
} }
handleWebSocket(true)
} else { } else {
lstat.innerHTML = 'offline' lstat.innerHTML = 'offline'
lstat.className = 'live offline' lstat.className = 'badge live offline'
viewers.style.display = 'none'
handleWebSocket(false)
retryTimeout = setTimeout(() => { retryTimeout = setTimeout(() => {
if (vidReady) return if (vidReady) return
hls.loadSource(STREAM_SERVER + STREAM_NAME + '.m3u8') loadSource()
}, 10000) }, 10000)
} }
} }
@ -123,10 +176,12 @@ function toggleStream () {
if (!vid) return if (!vid) return
if (!vidReady) return if (!vidReady) return
if (vid.paused) { if (vid.paused) {
if (ws) ws.send('watch ' + STREAM_NAME)
vid.play() vid.play()
btn.innerHTML = '<i class="fa fa-pause fa-fw"></i>' btn.innerHTML = '<i class="fa fa-pause fa-fw"></i>'
showBigBtn(false) showBigBtn(false)
} else { } else {
if (ws) ws.send('stop ' + STREAM_NAME)
vid.pause() vid.pause()
btn.innerHTML = '<i class="fa fa-play fa-fw"></i>' btn.innerHTML = '<i class="fa fa-play fa-fw"></i>'
showBigBtn(true) showBigBtn(true)
@ -356,6 +411,7 @@ function getStreamStatus () {
} }
if (jd.links) updateLinks(jd.links) if (jd.links) updateLinks(jd.links)
if (ws) ws.send('viewers ' + STREAM_NAME)
if (jd.live && !vidReady) loadSource() if (jd.live && !vidReady) loadSource()
liveStatus(jd.live) liveStatus(jd.live)

View File

@ -16,7 +16,8 @@
<video id="stream"></video> <video id="stream"></video>
</div> </div>
<div class="overlay"> <div class="overlay">
<div class="live offline">offline</div> <div class="badge live offline">offline</div>
<div class="badge viewers" style="display: none;">0</div>
<div class="bigplaybtn hidden"><i class="fa fa-play fa-fw"></i></div> <div class="bigplaybtn hidden"><i class="fa fa-play fa-fw"></i></div>
<div class="controls"> <div class="controls">
<div id="playbtn" class="button"><i class="fa fa-play fa-fw"></i></div> <div id="playbtn" class="button"><i class="fa fa-play fa-fw"></i></div>