De-duping those additions seems like a good idea, but we found out that in some cases, OME will emit open and closes for streams while they're still live. De-duping those resulted in streams erroneously being removed from the live list. One such scenario is someone trying to "Steal" and already live key. OME will emit an open and an immediate close as the connection is rejected. The close was removing the still-live stream from the list. Fixes #9
169 lines
6.2 KiB
Python
169 lines
6.2 KiB
Python
import time
|
|
from pathlib import Path
|
|
|
|
import cherrypy
|
|
import requests
|
|
|
|
import config
|
|
|
|
|
|
def check_webhook_throttle() -> bool:
|
|
now = time.time()
|
|
|
|
# Clean up notification list to recent notifications
|
|
while config.NOTIFICATIONS and now - config.NOTIFICATIONS[0] > 60:
|
|
config.NOTIFICATIONS.pop(0)
|
|
|
|
config.NOTIFICATIONS.append(now)
|
|
|
|
return not len(config.NOTIFICATIONS) > config.NOTIFICATION_THROTTLE
|
|
|
|
|
|
def webhook_online(stream) -> None:
|
|
if not config.is_webhook_ready():
|
|
return
|
|
|
|
data = {"username": f"{config.WEBHOOK_NAME} Online", "content": config.WEBHOOK_ONLINE}
|
|
|
|
if config.is_avatar_ready():
|
|
target_av = f"{stream[1]}/{stream[2]}.png"
|
|
|
|
# Try an exact match first, then lower case, then use the default
|
|
if Path(config.WEBHOOK_AVATAR_PATH, target_av).is_file():
|
|
avatar = target_av
|
|
elif Path(config.WEBHOOK_AVATAR_PATH, target_av.lower()).is_file():
|
|
avatar = target_av.lower()
|
|
else:
|
|
avatar = "default.png"
|
|
data["avatar_url"] = f"{config.WEBHOOK_AVATAR_URL}/{avatar}"
|
|
|
|
requests.post(config.WEBHOOK_URL, timeout=10, json=data, headers=config.WEBHOOK_HEADERS)
|
|
|
|
|
|
def webhook_offline() -> None:
|
|
if not config.is_webhook_ready():
|
|
return
|
|
|
|
data = {"username": f"{config.WEBHOOK_NAME} Offline", "content": config.WEBHOOK_OFFLINE}
|
|
|
|
if config.WEBHOOK_AVATAR_PATH and config.WEBHOOK_AVATAR_URL:
|
|
data["avatar_url"] = f"{config.WEBHOOK_AVATAR_URL}/offline.png"
|
|
|
|
requests.post(config.WEBHOOK_URL, timeout=10, json=data, headers=config.WEBHOOK_HEADERS)
|
|
|
|
|
|
def check_authorized(host, app, stream, source, access_key) -> bool:
|
|
# Are we globally disabled?
|
|
if config.DISABLED:
|
|
return False
|
|
# IP Banned?
|
|
if source in config.BLOCKED_IPS:
|
|
return False
|
|
# Nothing in the Oven API maps a domain to "default" vhost
|
|
# So here we fudge checking default vhost for all apps/streams
|
|
if f"default:{app}:{stream}" in config.DISABLED_KEYS:
|
|
return False
|
|
# Finally check the provided vhost app/stream
|
|
if f"{host}:{app}:{stream}" in config.DISABLED_KEYS:
|
|
return False
|
|
# Check for an access key requirement
|
|
return not (config.ACCESS_KEY and access_key != config.ACCESS_KEY)
|
|
|
|
|
|
@cherrypy.tools.register("on_end_request")
|
|
def handle_notify() -> None:
|
|
"""
|
|
Inspect and react to live streamer state after an admission webhook has fired.
|
|
|
|
After the new stream has been authorized and the connection closed, this hook fires.
|
|
It expects two variables inserted into the cherrypy.request namespace named "update_stream"
|
|
and "update_opening" to let it process who has changed and in which direction. After
|
|
altering our stream list appropriately it checks if a notification webhook needs to fire.
|
|
"""
|
|
# If we didn't fill out a state change, we don't need to do anything
|
|
if not hasattr(cherrypy.request, "update_opening") or not hasattr(cherrypy.request, "update_stream"):
|
|
return
|
|
|
|
# Copy over our prior stream list so we have a clean one to alter
|
|
stream_list = config.LAST_STREAM_LIST.copy()
|
|
|
|
# Remove or add the changed stream, as appropriate
|
|
# NOTE: This can result in the list containing duplicates; we need to de-dupe before we use it
|
|
# This is necessary because OME emits open and close events for rejections due to key
|
|
# already being in use. Without dupes, this scenario would knock the stream out of the
|
|
# list entirely.
|
|
if cherrypy.request.update_opening:
|
|
stream_list.append(cherrypy.request.update_stream)
|
|
|
|
if not cherrypy.request.update_opening and cherrypy.request.update_stream in stream_list:
|
|
stream_list.remove(cherrypy.request.update_stream)
|
|
|
|
# Figure out if we changed state between "someone online" and "no one online"
|
|
changed = bool(stream_list) != bool(config.LAST_STREAM_LIST)
|
|
|
|
# Save our stream list out before dispatching any webhooks
|
|
# We do this before any network callouts to try to prevent race conditions
|
|
# FIXME: The right way to handle this is threadsafe locking
|
|
config.LAST_STREAM_LIST = stream_list.copy()
|
|
|
|
if changed:
|
|
if not check_webhook_throttle():
|
|
cherrypy.log("Webhook throttle limit hit, ignoring")
|
|
return
|
|
|
|
# Dispatch the appropriate webhook
|
|
webhook_online(stream_list[0]) if stream_list else webhook_offline()
|
|
|
|
|
|
class Admission:
|
|
# /admission to control/trigger sessions
|
|
@cherrypy.expose
|
|
@cherrypy.tools.json_in()
|
|
@cherrypy.tools.json_out()
|
|
@cherrypy.tools.handle_notify()
|
|
def default(self) -> dict:
|
|
# Fast fail if we have no json payload
|
|
try:
|
|
input_json = cherrypy.request.json
|
|
except AttributeError:
|
|
cherrypy.response.status = 400
|
|
return {}
|
|
|
|
# If this is a viewer, allow it with no processing
|
|
# This should never happen since we won't enable webhooks for viewing
|
|
if input_json["request"]["direction"] == "outgoing":
|
|
return {"allowed": True}
|
|
|
|
# Figure out scheme, host, app and stream name for recording who is live
|
|
_, _, host, app, path = input_json["request"]["url"].split("/")[:5]
|
|
stream = path.split("?")[0]
|
|
|
|
# Tokenize out URL GET parameters
|
|
params = {}
|
|
if "?" in path:
|
|
for pair in path.split("?")[1].split("&"):
|
|
if "=" in pair:
|
|
k, v = pair.split("=", 1)
|
|
params[k] = v
|
|
else:
|
|
params[pair] = None
|
|
|
|
# Populate variables for our on_end_request tool into request object
|
|
cherrypy.request.update_stream = ("default", app, stream)
|
|
|
|
# If we are closing, return a fast 200
|
|
if input_json["request"]["status"] == "closing":
|
|
cherrypy.request.update_opening = False
|
|
return {}
|
|
|
|
# Get client IP for ACL checking
|
|
ip = input_json["client"]["real_ip"]
|
|
|
|
# Check if stream is authorized
|
|
if not check_authorized(host, app, stream, ip, params.get("access_key")):
|
|
cherrypy.log(f"Unauthorized stream key: {app}/{stream}")
|
|
return {"allowed": False}
|
|
|
|
# Compile and dispatch our response
|
|
cherrypy.request.update_opening = True
|
|
return {"allowed": True}
|