From ffd4079a4283bf84df7bd9deec038825c9d121cc Mon Sep 17 00:00:00 2001 From: Evert Prants Date: Wed, 23 Oct 2019 16:43:57 +0300 Subject: [PATCH] Websockets viewer count --- app.js | 95 +++++++++++++++++++++++++++++++++++++++---- nginx.example.conf | 11 +++++ package-lock.json | 13 ++++++ package.json | 1 + src/css/player.css | 7 +++- src/player.js | 62 ++++++++++++++++++++++++++-- templates/player.html | 3 +- 7 files changed, 177 insertions(+), 15 deletions(-) diff --git a/app.js b/app.js index d0384c5..e0531b3 100644 --- a/app.js +++ b/app.js @@ -1,16 +1,18 @@ -const express = require('express') +const connectSession = require('connect-redis') const session = require('express-session') const bodyParser = require('body-parser') +const express = require('express') const request = require('request') const nunjucks = require('nunjucks') const sqlite = require('sqlite') const xml2js = require('xml2js') const path = require('path') const toml = require('toml') +const http = require('http') const fs = require('fs') +const WebSocket = require('ws') const uuid = require('uuid/v4') const redis = require('redis') -const connectSession = require('connect-redis') const URL = require('url') require('express-async-errors') @@ -27,7 +29,7 @@ const dev = process.env.NODE_ENV === 'development' const filename = path.join(__dirname, 'config.toml') let config -let cache = { _updated: 0, streamers: {} } +let cache = { _updated: 0, streamers: {}, viewers: {}, live: [] } try { config = toml.parse(fs.readFileSync(filename)) @@ -81,7 +83,9 @@ const dbPromise = Promise.resolve() .then(db => db.migrate()) // Setup server -let app = express() +const app = express() +const server = http.createServer(app) +const wss = new WebSocket.Server({ clientTracking: false, noServer: true }) app.enable('trust proxy', 1) @@ -95,7 +99,7 @@ nunjucks.configure('templates', { express: app }) -app.use(session({ +const sessionParser = session({ key: 'Streamserver Session', secret: config['Streaming']['Secret'], resave: false, @@ -105,7 +109,9 @@ app.use(session({ secure: !dev, maxAge: 2678400000 // 1 month } -})) +}) + +app.use(sessionParser) // Parse stream metrics from the stat.xml file async function pullMetrics (uuid) { @@ -179,7 +185,7 @@ app.post('/publish', async (req, res) => { let streamer = await db.get('SELECT * FROM channels WHERE key=?', req.body.name) if (!streamer) throw new Error('Invalid stream key.') - console.log('Streamer %s has started streaming!', streamer.name) + console.log('=> Streamer %s has started streaming!', streamer.name) // Generate real publish address for the server let publishAddress = config['Streaming']['PublishAddress'] @@ -188,6 +194,7 @@ app.post('/publish', async (req, res) => { // Set channel streaming status db.run('UPDATE channels SET live_at=? WHERE id=?', Date.now(), streamer.id) + cache.live.push(streamer.name) // Redirect the streaming server to the target res.set('Location', publishAddress) @@ -199,7 +206,13 @@ app.post('/publish_done', async (req, res) => { if (!req.body.name) throw new Error('Invalid request.') let db = await dbPromise - db.run('UPDATE channels SET live_at=NULL, last_stream=? WHERE key=?', Date.now(), req.body.name) + let chan = await db.get('SELECT * FROM channels WHERE key = ?', req.body.name) + console.log('<= Streamer %s has stopped streaming!', chan.name) + + try { delete cache.viewers[chan.name] } catch (e) {} + if (cache.live.indexOf(chan.name) !== -1) cache.live.splice(cache.live.indexOf(chan.name), 1) + + db.run('UPDATE channels SET live_at=NULL, last_stream=? WHERE key=?', Date.now(), chan.key) res.send('OK') }) @@ -442,6 +455,7 @@ app.get('/api/channel/:name', async (req, res) => { data.live_at = new Date(parseInt(data.live_at)) data.last_stream = new Date(parseInt(data.last_stream)) data.links = links || [] + data.viewers = Object.keys(cache.viewers[name] || {}).length for (let i in data.links) { delete data.links[i].id @@ -457,6 +471,69 @@ app.use((error, req, res, next) => { res.send(error.message) }) +// Socket Server +wss.on('connection', (ws, request, client) => { + const userId = request.session.login || request.session.id + const username = request.session.username + + dev && console.log(username || userId,'connected') + ws.on('message', (msg) => { + dev && console.log(userId,'said',msg) + if (msg.indexOf('watch ') === 0) { + let chan = msg.substring(6) + dev && console.log('adding a watcher to channel',chan) + if (cache.live.indexOf(chan) !== -1) { + if (!cache.viewers[chan]) cache.viewers[chan] = {} + cache.viewers[chan][userId] = username || 'A Friendly Guest' + } + } else if (msg.indexOf('stop ') === 0) { + let chan = msg.substring(5) + dev && console.log('removing a watcher from channel',chan) + if (cache.live.indexOf(chan) !== -1) { + if (cache.viewers[chan] && cache.viewers[chan][userId]) delete cache.viewers[chan][userId] + } + } else if (msg.indexOf('viewers ') === 0) { + let chan = msg.substring(8) + if (cache.viewers[chan] != null) ws.send('viewlist ' + Object.values(cache.viewers[chan]).join(',')) + } + }) + + ws.on('close', () => { + dev && console.log(userId,'disconnected') + for (let chan in cache.viewers) { + let viewers = cache.viewers[chan] + if (viewers[userId]) { + delete cache.viewers[chan][userId] + } + } + }) + + ws.on('error', (e) => { + dev && console.error('Socket error:', e) + }) +}) + +// Handle upgrade, parse included session +server.on('upgrade', (request, socket, head) => { + sessionParser(request, {}, () => { + wss.handleUpgrade(request, socket, head, function(ws) { + wss.emit('connection', ws, request) + }) + }) +}) + // Start server const host = dev ? '0.0.0.0' : '127.0.0.1' -app.listen(port, host, () => console.log('Listening on %s:%d', host, port)) +server.listen(port, host, () => { + // Get currently live channels, for example, when server restarted while someone was live + (async function () { + let db = await dbPromise + let allLive = await db.all('SELECT name FROM channels WHERE live_at IS NOT NULL') + for (let i in allLive) { + cache.live.push(allLive[i].name) + } + console.log(`=> Found ${cache.live.length} channels still live`) + })().catch(e => console.error(e.stack)) + + console.log('Listening on %s:%d', host, port) +}) diff --git a/nginx.example.conf b/nginx.example.conf index 24f5ec0..72d545f 100644 --- a/nginx.example.conf +++ b/nginx.example.conf @@ -44,6 +44,17 @@ http { } location @distrib { + proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header Host $http_host; + proxy_set_header X-NginX-Proxy true; + + # socket support + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_pass http://localhost:5000; proxy_redirect off; } diff --git a/package-lock.json b/package-lock.json index 9e21ad4..5b11bbe 100644 --- a/package-lock.json +++ b/package-lock.json @@ -532,6 +532,11 @@ "resolved": "https://registry.npmjs.org/async-each/-/async-each-1.0.1.tgz", "integrity": "sha1-GdOGodntxufByF04iu28xW0zYC0=" }, + "async-limiter": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.1.tgz", + "integrity": "sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==" + }, "asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", @@ -6369,6 +6374,14 @@ "signal-exit": "^3.0.2" } }, + "ws": { + "version": "7.2.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.2.0.tgz", + "integrity": "sha512-+SqNqFbwTm/0DC18KYzIsMTnEWpLwJsiasW/O17la4iDRRIO9uaHbvKiAS3AHgTiuuWerK/brj4O6MYZkei9xg==", + "requires": { + "async-limiter": "^1.0.0" + } + }, "xdg-basedir": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/xdg-basedir/-/xdg-basedir-3.0.0.tgz", diff --git a/package.json b/package.json index 61d8f9e..8ddb87f 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,7 @@ "sqlite": "^3.0.3", "toml": "^3.0.0", "uuid": "^3.3.2", + "ws": "^7.2.0", "xml2js": "^0.4.22" } } diff --git a/src/css/player.css b/src/css/player.css index 51cf433..2a6f5a4 100644 --- a/src/css/player.css +++ b/src/css/player.css @@ -19,7 +19,7 @@ body { width: 100%; height: 100%; } -.live { +.badge { position: absolute; color: white; font-size: 120%; @@ -29,7 +29,10 @@ body { padding: 10px; border-radius: 5px; } -.live.offline { +.badge.viewers { + right: 0; +} +.badge.live.offline { background-color: rgba(93, 93, 93, 0.7); } .overlay { diff --git a/src/player.js b/src/player.js index 37eed5c..75a4b70 100644 --- a/src/player.js +++ b/src/player.js @@ -16,6 +16,7 @@ let bigbtn = overlay.querySelector('.bigplaybtn') let volumebar = overlay.querySelector('#volume_seek') let volumeseek = volumebar.querySelector('.seeker') let volumeseekInner = volumeseek.querySelector('.seekbar') +let viewers = overlay.querySelector('.viewers') let links let linksList @@ -27,6 +28,7 @@ let vidReady = false let shouldHide = true let inFullscreen = false let errored = false +let ws function GET (url, istext) { return new Promise((resolve, reject) => { @@ -74,20 +76,71 @@ function updateVolume () { volumeseekInner.style.width = vid.volume * 100 + '%' } +function viewersCount (res) { + viewers.style.display = 'block' + viewers.innerHTML = res.length + ' watching' +} + +function handleWebSocket (live) { + if (!live && ws) { + ws.onerror = ws.onopen = ws.onclose = null + ws.close() + ws = null + return + } + + if (ws) return + + ws = new WebSocket(`ws://${location.host}`) + ws.onerror = function(e) { + console.error('Socket errored, retrying..', e) + handleWebSocket(false) + setTimeout(() => handleWebSocket(vidReady), 5000) + } + + ws.onopen = function() { + console.log('Upstream socket connection established') + if (!vid.paused) ws.send('watch ' + STREAM_NAME) + ws.onmessage = function (event) { + if (!event) return + let message = event.data + if (message.indexOf('viewlist ') === 0) { + let str = message.substring(9) + let list = str.split(',') + if (str === '') list = [] + viewersCount(list) + } + } + } + + ws.onclose = function() { + console.error('Socket died, retrying..') + ws = null + setTimeout(() => handleWebSocket(vidReady), 5000) + } +} + function liveStatus (status) { if (status) { lstat.innerHTML = 'live now' - lstat.className = 'live' + lstat.className = 'badge live' clearTimeout(retryTimeout) + if (vid.paused) { showBigBtn(true) } + + handleWebSocket(true) } else { lstat.innerHTML = 'offline' - lstat.className = 'live offline' + lstat.className = 'badge live offline' + viewers.style.display = 'none' + + handleWebSocket(false) + retryTimeout = setTimeout(() => { if (vidReady) return - hls.loadSource(STREAM_SERVER + STREAM_NAME + '.m3u8') + loadSource() }, 10000) } } @@ -123,10 +176,12 @@ function toggleStream () { if (!vid) return if (!vidReady) return if (vid.paused) { + if (ws) ws.send('watch ' + STREAM_NAME) vid.play() btn.innerHTML = '' showBigBtn(false) } else { + if (ws) ws.send('stop ' + STREAM_NAME) vid.pause() btn.innerHTML = '' showBigBtn(true) @@ -356,6 +411,7 @@ function getStreamStatus () { } if (jd.links) updateLinks(jd.links) + if (ws) ws.send('viewers ' + STREAM_NAME) if (jd.live && !vidReady) loadSource() liveStatus(jd.live) diff --git a/templates/player.html b/templates/player.html index eaa3835..ee5f159 100644 --- a/templates/player.html +++ b/templates/player.html @@ -16,7 +16,8 @@
-
offline
+
offline
+