355 lines
9.5 KiB
355 lines
9.5 KiB
#!/usr/bin/env python3
from flask import Flask, request, make_response, session, redirect, render_template, send_from_directory, jsonify
from time import gmtime, strftime, time
from uuid import uuid4
from bs4 import BeautifulSoup
import requests
import json
import sqlite3
import base64
import configparser
import os
SQLite3 Database Schema
CREATE TABLE channels (
"live_at" TEXT,
"password" TEXT,
"user_uuid" TEXT
, "last_stream" TEXT);
CREATE TABLE signed_users (
# Load configuration
config = configparser.ConfigParser()
config['Streaming'] = {
'Database': 'streaming.db',
'StreamServer': 'https://tv.icynet.eu/live/',
'ServerHost': 'icynet.eu',
'PublishAddress': 'rtmp://{host}:1935/hls-live/{streamer}',
'Secret': 'changeme',
config['Auth'] = {
'Server': 'http://localhost:8282',
'Redirect': 'http://localhost:5000/auth/_callback/',
config['OAuth2'] = {
'ClientID': '1',
'ClientSecret': 'changeme',
if os.path.exists('config.ini'):
with open('config.ini', 'w') as configfile:
# App settings
app = Flask(__name__)
app.secret_key = config['Streaming']['Secret']
app.config['SESSION_TYPE'] = 'redis'
auth_image = "{server}/api/avatar/{uuid}"
oauth_auth = "{server}/oauth2/authorize?response_type=code&state={state}&redirect_uri={redirect}&client_id={client}&scope=image"
stream_server = config['Streaming']['StreamServer']
auth_server = config['Auth']['Server']
oauth_redirect = config['Auth']['Redirect']
oauth_id = int(config['OAuth2']['ClientID'])
oauth_secret = config['OAuth2']['ClientSecret']
# Database
conn = sqlite3.connect(config['Streaming']['Database'], check_same_thread=False)
# Streamer memcache
stream_cache = {}
# Stat memcache
stat_cache = False
stat_cache_updated = 0
# Check if user is a streamer
def valid_streamer(uuid):
streamer = None
if 'uuid' in session:
if session['uuid'] in stream_cache:
streamer = stream_cache[session['uuid']]
# Find key in database
data = conn.execute('SELECT * FROM channels WHERE user_uuid=?', (session['uuid'],))
row = data.fetchone()
# Deny stream publish
if row:
streamer = row[2]
return streamer
def pull_stream_metrics(uuid):
global stat_cache
global stat_cache_updated
updated = False
if not stat_cache or stat_cache_updated < int(time()) - 5:
metric_path = stream_server + "stat"
r = requests.get(metric_path)
stat_cache = BeautifulSoup(r.text, "xml")
updated = True
print("Updating per request..")
allapps = stat_cache.find_all("application")
relevant = None
for app in allapps:
if str(app.find("name").string) == "live":
relevant = app
if not relevant:
return None
streams = relevant.find_all("stream")
if not streams:
return None
if updated:
stat_cache_updated = int(time())
relevant = None
for stream in streams:
name = stream.find("name").string
if name == uuid:
relevant = stream
if not relevant:
return None
data = {
'time' : relevant.time.string,
'bytes' : relevant.bytes_in.string,
'video': {
'width': relevant.meta.video.width.string,
'height': relevant.meta.video.height.string,
'frame_rate': relevant.meta.video.frame_rate.string,
'codec': relevant.meta.video.codec.string,
'audio': {
'sample_rate': relevant.meta.audio.sample_rate.string,
'channels': relevant.meta.audio.channels.string,
'codec': relevant.meta.audio.codec.string,
return data
def index():
streamer = False
if 'uuid' in session:
if valid_streamer(session['uuid']):
streamer = True
return render_template("index.html", streamer = streamer)
def dashboard():
if not 'uuid' in session:
return redirect("/login", code = 302)
streamkey = valid_streamer(session['uuid'])
if not streamkey:
return make_response("Unauthorized.", 402)
return render_template("dashboard.html", stream = streamkey,
server = "rtmp://" + config['Streaming']['ServerHost'] + "/live/")
def dashboard_stats():
if not 'uuid' in session:
return jsonify({'error': 'Unauthorized'})
streamkey = valid_streamer(session['uuid'])
if not streamkey:
return jsonify({'error': 'Unauthorized'})
data = pull_stream_metrics(streamkey)
if not data:
return jsonify({'error': 'No data was returned..'})
return jsonify(data)
def dashboard_data():
if not 'uuid' in session:
return jsonify({'error': 'Unauthorized'})
streamkey = valid_streamer(session['uuid'])
if not streamkey:
return jsonify({'error': 'Unauthorized'})
# Find key in database
data = conn.execute('SELECT * FROM channels WHERE key=?', (streamkey,))
row = data.fetchone()
if not row:
return jsonify({'error': 'Unauthorized'})
livedate = row[3]
data = {
'name': row[1],
'key': streamkey,
'uuid': session['uuid'],
'live': livedate != None,
'live_at': livedate,
'last_stream': row[6],
return jsonify(data)
# Called when starting publishing
@app.route("/publish", methods=["POST"])
def publish():
print(json.dumps(request.form, ensure_ascii=False))
streamkey = request.form["name"]
# Find key in database
data = conn.execute('SELECT * FROM channels WHERE key=?', (streamkey,))
row = data.fetchone()
# Deny stream publish
if not row:
return make_response("Request Denied", 400)
streamer = row[1]
print("Streamer %s has started streaming!" % streamer)
# Redirect stream publish to stream name
url = config['Streaming']['PublishAddress'].format(streamer = streamer, host = "")
response = make_response(url, 302)
response.headers["Location"] = url
# Update database with stream timestamp
starttime = strftime("%Y-%m-%d %H:%M:%S", gmtime())
conn.execute('UPDATE channels SET live_at=? WHERE id=?', (starttime, row[0]))
return response
# Called when stopped publishing
@app.route("/publish_done", methods=["POST"])
def publish_done():
streamkey = request.form["name"]
# Update database with stream end time
endtime = strftime("%Y-%m-%d %H:%M:%S", gmtime())
conn.execute('UPDATE channels SET live_at=NULL, last_stream=? WHERE key=?', (endtime, streamkey,))
return "OK"
def login():
if 'uuid' in session:
return make_response("Already authenticated as %s!" % session['username'])
state = str(uuid4())
session['state'] = state
return redirect(oauth_auth.format(state = state, client = oauth_id, redirect = oauth_redirect, server = auth_server), code = 302)
def logout():
return redirect("/", code = 302)
def cb():
if not 'state' in session:
return make_response("Something went wrong!", 402)
code = request.args.get('code')
state = request.args.get('state')
if session['state'] != state:
return make_response("Something went wrong!", 402)
if not code:
return make_response("Authorization denied by user", 402)
# Get access token
r = requests.post(auth_server + "/oauth2/token", data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': oauth_redirect,
'client_id': oauth_id,
'client_secret': oauth_secret
}, headers = {
'Authorization': 'Basic ' + str(base64.b64encode(bytes("%s:%s" % (oauth_id, oauth_secret), 'utf-8')))
res_token = None
res_token = r.json()
except ValueError:
return make_response("Something went wrong while getting an access token!")
if 'error' in res_token:
return make_response("%s: %s" % (res_token['error'], res_token['error_description']), 500)
token = res_token['access_token']
# Get user info
ru = requests.get(auth_server + "/oauth2/user", headers = {
'Authorization': 'Bearer ' + token
res_uinfo = None
res_uinfo = ru.json()
except ValueError:
return make_response("Something went wrong while getting user information!")
udata = conn.execute('SELECT * FROM signed_users WHERE uuid=?', (res_uinfo['uuid'],))
row = udata.fetchone()
if not row:
conn.execute('INSERT INTO signed_users (uuid, name) VALUES (?, ?)', (res_uinfo['uuid'], res_uinfo['username']))
session['uuid'] = res_uinfo['uuid']
session['username'] = res_uinfo['username']
return redirect("/", code = 302)
def watch(name):
return render_template("player.html", name = name, server = stream_server)
def watch_old(name):
return redirect("/watch/%s" % name, code = 302)
def dist(path):
return send_from_directory("dist", path)
if __name__ == '__main__':
app.secret_key = '00-wegrhr[gqw[er=1ew qwergfdq.///**+'
app.debug = True