diff --git a/README.md b/README.md index 77d0f45..6152e6e 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,50 @@ # ovenemprex -OvenMediaEngine player and management middleware \ No newline at end of file +OvenMediaEngine management middleware + + +# Reqirements + +This project tries to be pretty lean. Requirements should be roughly... + +- OvenMediaEngine 0.10.30 or greater +- Python 3.8 or greater +- python-cherrypy +- python-requests + + +# Setup + +1. Install and configure Ovenmediaengine. The following components are required: + 1. WebRTC publishing + 2. The API enabled with a user/password set + 3. Some number of applications +2. Extract or clone this repository somewhere +3. Configure your HTTP daemon/proxy/etc to proxy HTTPS to `http://localhost:8080` +4. Set up environment variables to your liking. The OvenMediaEngine API key and password are mandatory; see Configuration below +5. Start the management engine with either `python3 main.py` or a systemd unit as noted in `examples/` + + +# Usage + +By default this provides a few things: + +- `https:///` will provide a "Discord like" interface to every stream live in the current app +- `https:////` will display only that stream +- `https:////` will, if configured, display a management interface to allow basic stream management + + +# Configuration + +All configuration is done with environment variables. If using systemd you can configure systemd unit overrides. If you're using your own management script you can set your environment variables any way you wish. + +Check out the config files in the `examples/` dir to see available configuration arguments. + + +# Customization + +There's only a couple supported methods of customization at this time: + +1. `assets/webhook_avatars` can provide for a way to assign stream keys an avatar that the webhook will use when announcing that key has gone live +2. `assets/errorlogo.gif` can be replaced to replace the throbber on any interface waiting for a stream to start +3. Anything in `templates/` can be edited as desired but will likely be reverted in a future update diff --git a/admission.py b/admission.py new file mode 100644 index 0000000..d493145 --- /dev/null +++ b/admission.py @@ -0,0 +1,124 @@ +import time +from pathlib import Path + +import cherrypy +import requests + +import config +import ovenapi + + +def check_webhook_throttle() -> bool: + now = time.time() + + # Clean up notification list to recent notifications + while config.NOTIFICATIONS and now - config.NOTIFICATIONS[0] > 60: + config.NOTIFICATIONS.pop(0) + + config.NOTIFICATIONS.append(now) + + return not len(config.NOTIFICATIONS) > config.NOTIFICATION_THROTTLE + + +def webhook_online(stream) -> None: + if not config.is_webhook_ready(): + return + + data = {"username": f"{config.WEBHOOK_NAME} Online", "content": config.WEBHOOK_ONLINE} + + if config.is_avatar_ready(): + target_av = f"{stream[1]}/{stream[2]}.png" + avatar = target_av if Path(config.WEBHOOK_AVATAR_PATH, target_av).is_file() else "default.png" + data["avatar_url"] = f"{config.WEBHOOK_AVATAR_URL}/{avatar}" + + requests.post(config.WEBHOOK_URL, timeout=10, json=data, headers=config.WEBHOOK_HEADERS) + + +def webhook_offline() -> None: + if not config.is_webhook_ready(): + return + + data = {"username": f"{config.WEBHOOK_NAME} Offline", "content": config.WEBHOOK_OFFLINE} + + if config.WEBHOOK_AVATAR_PATH and config.WEBHOOK_AVATAR_URL: + data["avatar_url"] = f"{config.WEBHOOK_AVATAR_URL}/offline.png" + + requests.post(config.WEBHOOK_URL, timeout=10, json=data, headers=config.WEBHOOK_HEADERS) + + +def check_authorized(host, app, stream, source) -> bool: + # Are we globally disabled? + if config.DISABLED: + return False + # IP Banned? + if source in config.BLOCKED_IPS: + return False + # Nothing in the Oven API maps a domain to "default" vhost + # So here we fudge checking default vhost for all apps/streams + if f"default:{app}:{stream}" in config.DISABLED_KEYS: + return False + # Finally check the provided vhost app/stream + return f"{host}:{app}:{stream}" not in config.DISABLED_KEYS + + +@cherrypy.tools.register("on_end_request") +def handle_notify() -> None: + # If we don't have API creds we can't do this, abort + if not (config.API_USER and config.API_PASS): + return + + # Get stream list from API + # Unfortunately Oven doesn't reflect the new stream fast enough so we have to wait :( + time.sleep(1) + stream_list = ovenapi.OvenAPI(config.API_USER, config.API_PASS).get_stream_list() + + # If we haven't gone empty->active or active->empty we need to do nothing + if bool(stream_list) != bool(config.LAST_STREAM_LIST): + if not check_webhook_throttle(): + cherrypy.log("Webhook throttle limit hit, ignoring") + return + + # Dispatch the appropriate webhook + webhook_online(stream_list[0]) if stream_list else webhook_offline() + + # Save our stream list into a durable value + config.LAST_STREAM_LIST = stream_list.copy() + + +class Admission: + # /admission to control/trigger sessions + @cherrypy.expose + @cherrypy.tools.json_in() + @cherrypy.tools.json_out() + @cherrypy.tools.handle_notify() + def default(self) -> dict: + # Fast fail if we have no json payload + try: + input_json = cherrypy.request.json + except AttributeError: + cherrypy.response.status = 400 + return {} + + # If this is a viewer, allow it with no processing + # This should never happen since we won't enable webhooks for viewing + if input_json["request"]["direction"] == "outgoing": + return {"allowed": True} + + # Figure out scheme, host, app and stream name for recording who is live + _, _, host, app, path = input_json["request"]["url"].split("/")[:5] + stream = path.split("?")[0] + + # If we are closing, return a fast 200 + if input_json["request"]["status"] == "closing": + return {} + + # Get client IP for ACL checking + ip = input_json["client"]["real_ip"] + + # Check if stream is authorized + if not check_authorized(host, app, stream, ip): + cherrypy.log(f"Unauthorized stream key: {app}/{stream}") + return {"allowed": False} + + # Compile and dispatch our response + return {"allowed": True} diff --git a/assets/errorlogo.gif b/assets/errorlogo.gif new file mode 100644 index 0000000..10b62d2 Binary files /dev/null and b/assets/errorlogo.gif differ diff --git a/assets/player.js b/assets/player.js new file mode 100644 index 0000000..8c1620f --- /dev/null +++ b/assets/player.js @@ -0,0 +1,219 @@ +// Get our initial stream list and create streams +xhr = new XMLHttpRequest(); +xhr.onreadystatechange = function() { + if (this.readyState == 4 && this.status == 200) { + processStreamList(JSON.parse(this.responseText)); + } +} + +disabledPlayers = []; + +// Auto-resize frames in a webcall interface +function webcallFrameResize() { + // Figure out how many Frames are visible + div_count = 0; + + document.querySelectorAll(".frame").forEach( + function(element) { + div_count += 1; + } + ) + + // If none are visible, show placeholder text and bail + if (div_count < 1) { + document.querySelector("#placeholder").style.display = "block"; + return; + } + + // Hide placeholder if any players are visible + document.querySelector("#placeholder").style.display = "none"; + + // Get player frame aspect ratio for fitting purposes + const player_ar = 16 / 9; + + // Try arrangements until the best fit is found + // Take the first column count that doesn't overflow height + cols = 0 + for (let i = 1; i <= div_count; i++) { + const frame_width = window.innerWidth / i; + const frame_height = frame_width / player_ar; + + if (frame_height * Math.ceil(div_count / i) <= window.innerHeight) { + cols = i; + break; + } + } + + // Set frames to the appropriate width + if (cols) { + w = `${Math.floor(100 / cols)}%`; + } else { + w = `${Math.floor(window.innerHeight * player_ar)}px`; + } + + document.querySelectorAll(".frame").forEach( + function(element) { + element.style.width = w; + } + ) +} + +function createPlayer(stream, muted, volume) { + // Create frame + var outer_div = document.createElement("div"); + outer_div.classList.add("frame"); + outer_div.id = `frame_${stream}`; + + // If we're putting name frames around players, make them + // Also provide close buttons to nuke this player + if (named_frames) { + var tab_div = document.createElement("div"); + tab_div.classList.add("frame_tabs"); + outer_div.appendChild(tab_div); + + var name_div = document.createElement("div"); + name_div.classList.add("frame_name"); + name_div.innerHTML = ` ${stream} `; + tab_div.appendChild(name_div); + + var button_div = document.createElement("div"); + button_div.classList.add("frame_buttons"); + button_div.innerHTML = ``; + tab_div.appendChild(button_div); + } + + // Put the player div in the container + var player_div = document.createElement("div"); + player_div.classList.add("player"); + player_div.id = stream; + outer_div.appendChild(player_div); + + // Create a throbber for dead streams + // We had this in the player div before, but it blinks every time the player reconnects + // This hides under a backgroundless player so it's only visible when nothing's up + throbber_div = document.createElement("div"); + throbber_div.id = "throbber"; + outer_div.appendChild(throbber_div); + + // Put container in document + document.body.appendChild(outer_div); + + // Initialize OvenPlayer + // We want as little interface stuff as possible, but we need to keep + // volume controls around so that can't be hidden. + player = OvenPlayer.create(stream, { + currentProtocolOnly: true, + showBigPlayButton: false, + aspect: "16:9", + autoStart: true, + mute: muted, + volume: volume, + sources: [ + { + label: stream, + type: 'webrtc', + file: `wss://${domain}:3334/${app_name}/${stream}` + } + ] + }); + + // Set player up to auto-restart if it stops + // If a streamer goes offline, the player will be reaped + player.stateManager = manageState; + player.on("stateChanged", player.stateManager); + + // Run a resize + if (named_frames) { + webcallFrameResize(); + } +} + +function manageState(data) { + // This serves one purpose: keep attempting to start a live player if it stops + if (data.newstate == "error") { + setTimeout(this.setCurrentSource, 3000, 0); + } +} + +function closePlayer(containerId) { + // Close the player + destroyPlayerById(containerId); + + // Add this ID to the closed player list so we don't reopen it + if (!disabledPlayers.includes(containerId)) { + disabledPlayers.push(containerId); + } +} + +function destroyPlayerById(containerId) { + // Tear down player + player = OvenPlayer.getPlayerByContainerId(containerId); + player.remove(); + + // Delete frame + document.getElementById(`frame_${containerId}`).remove(); + + // Run our resize + if (named_frames) { + webcallFrameResize(); + } +} + +function processStreamList(streams) { + // Remove any closed player from the list + disabledPlayers.forEach((i, index) => { + var removeIndex = streams.indexOf(i); + if (removeIndex !== -1) { + streams.splice(removeIndex, 1); + } + }) + + // Create any player in the list that doesn't have one + streams.forEach((i, index) => { + if (OvenPlayer.getPlayerByContainerId(i) == null) { + createPlayer(i, true, 100); + } + }) + + // Destroy any player not in the list + var players = OvenPlayer.getPlayerList(); + players.forEach((p, index) => { + if (!streams.includes(p.getContainerId())) { + destroyPlayerById(p.getContainerId()); + } + }) +} + +function requestStreamList() { + xhr.open("GET", `https://${domain}/status/default/${app_name}/`) + xhr.send() +} + +// Set up each embed +function EmprexSetup() { + // Pre-populate our disabled list if we have a param for it + window.location.search.substr(1).split("&").forEach((s, index) => { + tmp = s.split("="); + if (tmp[0] == "disabled") { + disabledPlayers = decodeURIComponent(tmp[1]).split(","); + } + }) + + // Placeholder if nothing is live + placeholder = document.createElement("div"); + placeholder.id = "placeholder"; + placeholder.innerHTML = "
Waiting for a stream to start..."; + document.body.appendChild(placeholder); + + // Also resize elements to fill the frame + // We try to debounce this so we're not editing the DOM 100x a second on resize + resize_timeout = false; + addEventListener("resize", function() { + clearTimeout(resize_timeout); + resize_timeout = setTimeout(webcallFrameResize, 250); + }) + + // Update streams every 5 seconds, and immediately + requestStreamList(); + setInterval(requestStreamList, 5000); +} diff --git a/assets/webhook_avatars/README b/assets/webhook_avatars/README new file mode 100644 index 0000000..f169440 --- /dev/null +++ b/assets/webhook_avatars/README @@ -0,0 +1,3 @@ +Put webhook avatars here in the path format of {app}/{stream key}.png + +If configured, webhooks will set the webhook icon to the matching image when sending online announcements diff --git a/config.py b/config.py new file mode 100644 index 0000000..0f423f3 --- /dev/null +++ b/config.py @@ -0,0 +1,56 @@ +"""Config directives for application.""" + +import json +import os +from pathlib import Path + +API_USER = os.getenv("OVENMONITOR_API_USER", "") +API_PASS = os.getenv("OVENMONITOR_API_PASSWORD", "") + +WEBHOOK_URL = os.getenv("OVENMONITOR_WEBHOOK_URL", "") +WEBHOOK_ONLINE = os.getenv("OVENMONITOR_WEBHOOK_ONLINE", "") +WEBHOOK_OFFLINE = os.getenv("OVENMONITOR_WEBHOOK_OFFLINE", "") +WEBHOOK_NAME = os.getenv("OVENMONITOR_WEBHOOK_NAME", "") +WEBHOOK_AVATAR_PATH = os.getenv("OVENMONITOR_WEBHOOK_AVATARPATH", "") +WEBHOOK_AVATAR_URL = os.getenv("OVENMONITOR_WEBHOOK_AVATARURL", "") +WEBHOOK_HEADERS = {"Content-Type": "application/json"} + +# Notifications/min to halt notifying on +NOTIFICATION_THROTTLE = 4 + +LAST_STREAM_LIST = [] +NOTIFICATIONS = [] +DISABLED_KEYS = [] +BLOCKED_IPS = [] +DISABLED = False + + +def load() -> None: + f = Path(Path.home(), Path(".ome_state.json")) + if not f.is_file(): + return + + global DISABLED_KEYS, BLOCKED_IPS, DISABLED + data = json.loads(f.read_text(encoding="utf-8")) + DISABLED_KEYS = data["disabled_keys"] + BLOCKED_IPS = data["blocked_ips"] + DISABLED = data["is_disabled"] + + +def save() -> None: + f = Path(Path.home(), Path(".ome_state.json")) + data = {"disabled_keys": DISABLED_KEYS, "blocked_ips": BLOCKED_IPS, "is_disabled": DISABLED} + + f.write_text(json.dumps(data), encoding="utf-8") + + +def is_api_ready() -> bool: + return bool(API_USER and API_PASS) + + +def is_webhook_ready() -> bool: + return bool(WEBHOOK_URL) + + +def is_avatar_ready() -> bool: + return bool(is_webhook_ready() and WEBHOOK_AVATAR_PATH and WEBHOOK_AVATAR_URL) diff --git a/example/management.service b/example/management.service new file mode 100644 index 0000000..e578318 --- /dev/null +++ b/example/management.service @@ -0,0 +1,12 @@ +[Unit] +Description=Ovenmediaengine Management Script + +[Service] +User=caddy +Group=caddy +Restart=on-failure +WorkingDirectory=/var/lib/caddy +ExecStart=/usr/bin/python /var/lib/caddy/ome-management/main.py + +[Install] +WantedBy=multi-user.target diff --git a/example/ome_management_auth.conf b/example/ome_management_auth.conf new file mode 100644 index 0000000..768e920 --- /dev/null +++ b/example/ome_management_auth.conf @@ -0,0 +1,3 @@ +[Service] +Environment="OVENMONITOR_API_USER=apiuser" +Environment="OVENMONITOR_API_PASSWORD=apipassword" diff --git a/example/ome_webhook.conf b/example/ome_webhook.conf new file mode 100644 index 0000000..c87e1ef --- /dev/null +++ b/example/ome_webhook.conf @@ -0,0 +1,7 @@ +[Service] +Environment="OVENMONITOR_WEBHOOK_URL=FULL WEBHOOK URL" +Environment="OVENMONITOR_WEBHOOK_ONLINE=TEXT WHEN STREAM GOES ONLINE" +Environment="OVENMONITOR_WEBHOOK_OFFLINE=TEXT WHEN STREAM GOES OFFLINE" +Environment="OVENMONITOR_WEBHOOK_NAME=NAME TO ASSIGN TO WEBHOOK BOT" +Environment="OVENMONITOR_WEBHOOK_AVATARPATH=/srv/http/example.com/assets/webhook_avatars" +Environment="OVENMONITOR_WEBHOOK_AVATARURL=https://example.com/assets/webhook_avatars" diff --git a/main.py b/main.py new file mode 100644 index 0000000..2b5a256 --- /dev/null +++ b/main.py @@ -0,0 +1,59 @@ +""" +Management script to let trusted users control OME. + +This should listen on localhost and have Caddy proxy with auth. +""" + +from pathlib import Path + +import cherrypy +from cherrypy.process.plugins import SignalHandler + +import admission +import config +import management +import ovenapi +import status +import viewer + +cherrypy.config.update({ + "server.socket_host": "127.0.0.1", + "environment": "production", + "tools.proxy.on": True, +}) + + +class Noop: + pass + + +def on_exit() -> None: + config.save() + cherrypy.engine.exit() + + +if __name__ == "__main__": + # Establish our config save-out signals + signalhandler = SignalHandler(cherrypy.engine) + signalhandler.handlers["SIGTERM"] = on_exit + signalhandler.handlers["SIGHUP"] = on_exit + signalhandler.handlers["SIGQUIT"] = on_exit + signalhandler.handlers["SIGINT"] = on_exit + signalhandler.subscribe() + + # Load config values + config.load() + + # If we have API access, use it to pull our stream list + if config.is_api_ready(): + config.LAST_STREAM_LIST = ovenapi.OvenAPI(config.API_USER, config.API_PASS).get_stream_list() + + runpath = Path(Path(__file__).parent.resolve(), "assets") + + cherrypy.tree.mount(admission.Admission(), "/admission") + cherrypy.tree.mount(management.Management(), "/management") + cherrypy.tree.mount(status.Status(), "/status") + cherrypy.tree.mount(Noop(), "/assets", config={"/": {"tools.staticdir.on": True, "tools.staticdir.dir": runpath}}) + cherrypy.tree.mount(viewer.Viewer(), "/") + cherrypy.engine.start() + cherrypy.engine.block() diff --git a/management.py b/management.py new file mode 100644 index 0000000..c7d637b --- /dev/null +++ b/management.py @@ -0,0 +1,94 @@ +import subprocess +from pathlib import Path + +import cherrypy +from mako.template import Template + +import config +import ovenapi + + +@cherrypy.tools.register("on_end_request") +def restart_server() -> None: + subprocess.call(["sudo", "/usr/bin/systemctl", "restart", "ovenmediaengine"]) + + +class Management: + def __init__(self): + self.page_template = Path("template/management.mako").read_text(encoding="utf-8") + self.redirect_template = Path("template/message.mako").read_text(encoding="utf-8") + self.api = ovenapi.OvenAPI(config.API_USER, config.API_PASS) + + def __message_and_redirect(self, message: str) -> bytes | str: + return Template(self.redirect_template).render(message=message) + + @cherrypy.expose + @cherrypy.tools.restart_server() + def restart(self) -> bytes | str: + # Blank our stream list because we're about to DC everyone + config.LAST_STREAM_LIST = [] + + # Compile and dispatch our response + return self.__message_and_redirect("Restart command dispatched") + + @cherrypy.expose + def disconnect(self, target): + vhost, app, stream = target.split(":") + self.api.disconnect_key(vhost, app, stream) + return self.__message_and_redirect(f"Disconnected {target}") + + @cherrypy.expose + def ban(self, target): + vhost, app, stream = target.split(":") + ip = self.api.get_stream_ip(vhost, app, stream) + if ip: + config.BLOCKED_IPS.append(ip) + self.disconnect(target) + return self.__message_and_redirect(f"Banned {ip}") + return self.__message_and_redirect("No stream found at that location or other error") + + @cherrypy.expose + def unban(self, target): + if target in config.BLOCKED_IPS: + config.BLOCKED_IPS.remove(target) + return self.__message_and_redirect(f"Unbanned {target}") + return self.__message_and_redirect(f"{target} not in ban list") + + @cherrypy.expose + def disable(self, target): + config.DISABLED_KEYS.append(target) + self.disconnect(target) + return self.__message_and_redirect(f"Disabled key {target}") + + @cherrypy.expose + def enable(self, target): + if target in config.DISABLED_KEYS: + config.DISABLED_KEYS.remove(target) + return self.__message_and_redirect(f"Re-enabled {target}") + return self.__message_and_redirect(f"{target} not in disabled key list") + + @cherrypy.expose + def stop(self): + config.DISABLED = True + self.api.disconnect_all() + return self.__message_and_redirect("Server disabled") + + @cherrypy.expose + def start(self): + config.DISABLED = False + return self.__message_and_redirect("Server re-enabled") + + @cherrypy.expose + def default(self) -> bytes | str: + if not (config.API_USER and config.API_PASS): + cherrypy.response.status = 503 + return "Remote management is disabled on this node." + + data = self.api.get_all_stream_info() + + return Template(self.page_template).render( + DISABLED=config.DISABLED, + BLOCKED_IPS=config.BLOCKED_IPS, + DISABLED_KEYS=config.DISABLED_KEYS, + data=data, + ) diff --git a/ovenapi.py b/ovenapi.py new file mode 100644 index 0000000..f77829d --- /dev/null +++ b/ovenapi.py @@ -0,0 +1,113 @@ +import requests + + +class OvenAPI: + def __init__(self, username: str, password: str, api_path: str = "http://localhost:8081/v1") -> None: + self.opener = requests.Session() + self.opener.auth = (username, password) + self.api_path = api_path + + def __get_api_data(self, rel_path: str, timeout: int = 3) -> dict: + abs_path = f"{self.api_path}/{rel_path.strip('/')}" + + return self.opener.get(abs_path, timeout=timeout).json() + + def get_vhosts(self) -> list: + return self.__get_api_data("/vhosts").get("response", []) + + def get_vhost_info(self, vhost: str) -> dict: + return self.__get_api_data(f"/vhosts/{vhost}").get("response", {}) + + def get_vhost_apps(self, vhost: str) -> list: + return self.__get_api_data(f"/vhosts/{vhost}/apps").get("response", []) + + def get_vhost_stats(self, vhost: str) -> dict: + return self.__get_api_data(f"/stats/current/vhosts/{vhost}").get("response", {}) + + def get_app_info(self, vhost: str, app: str) -> dict: + return self.__get_api_data(f"/vhosts/{vhost}/apps/{app}").get("response", {}) + + def get_app_streams(self, vhost: str, app: str) -> list: + return self.__get_api_data(f"/vhosts/{vhost}/apps/{app}/streams").get("response", []) + + def get_stream_info(self, vhost: str, app: str, stream: str) -> dict: + return self.__get_api_data(f"/vhosts/{vhost}/apps/{app}/streams/{stream}").get("response", {}) + + def get_stream_list(self) -> list[tuple]: + streams = set() + + for vhost in self.get_vhosts(): + for app in self.get_vhost_apps(vhost): + for stream in self.get_app_streams(vhost, app): + streams.add((vhost, app, stream)) + + return list(streams) + + def get_all_stream_info(self) -> dict: + data = {"vhosts": {}} + + for vhost in self.get_vhosts(): + this_vhost = {"apps": {}} + for app in self.get_vhost_apps(vhost): + this_app = {"streams": {}} + for stream in self.get_app_streams(vhost, app): + resp = self.get_stream_info(vhost, app, stream) + + # Simple data: streamer IP, type, start time + this_stream = { + "ip_address": resp["input"]["sourceUrl"].split("://")[1].split(":")[0], + "type": resp["input"]["sourceType"].lower(), + "created": resp["input"]["createdTime"], + } + + # Video data: FPS, bitrate + fps_advertised = 0 + bitrate_advertised = 0 + fps_actual = 0 + bitrate_actual = 0 + has_bframes = False + for track in resp["input"]["tracks"]: + track_type = track.get("type", "none").lower() + this_track = track.get(track_type, {}) + + bitrate_advertised += int(this_track.get("bitrate", 0)) + bitrate_actual += int(this_track.get("bitrateLatest", 0)) + fps_advertised = max(this_track.get("framerate", 0), fps_advertised) + fps_actual = max(this_track.get("framerateLatest", 0), fps_actual) + has_bframes = any([has_bframes, this_track.get("hasBframes", False)]) + + this_stream["fps_advertised"] = fps_advertised + this_stream["fps_actual"] = fps_actual + this_stream["bitrate_advertised"] = bitrate_advertised + this_stream["bitrate_actual"] = bitrate_actual + this_stream["has_bframes"] = has_bframes + + # Stats: We need a different endpoint for this + stats = self.__get_api_data(f"/stats/current/vhosts/{vhost}/apps/{app}/streams/{stream}").get( + "response", {} + ) + this_stream["viewers"] = sum(stats.get("connections", {}).values()) + + # Save this out to the main dict + this_app["streams"][stream] = this_stream + this_vhost["apps"][app] = this_app + data["vhosts"][vhost] = this_vhost + + return data + + def app_exists(self, app_name: str) -> bool: + return app_name in self.__get_api_data("/vhosts/default/apps").get("response", {}) + + def disconnect_all(self) -> None: + for stream in self.get_stream_list(): + self.disconnect_key(stream[0], stream[1], stream[2]) + + def disconnect_key(self, vhost: str, app: str, stream: str) -> None: + self.opener.delete(f"{self.api_path}/vhosts/{vhost}/apps/{app}/streams/{stream}") + + def get_stream_ip(self, vhost: str, app: str, stream: str) -> str | None: + try: + resp = self.get_stream_info(vhost, app, stream) + return resp["response"]["input"]["sourceUrl"].split("://")[1].split(":")[0] + except Exception: + return None diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e69de29 diff --git a/status.py b/status.py new file mode 100644 index 0000000..5245424 --- /dev/null +++ b/status.py @@ -0,0 +1,42 @@ +import cherrypy + +import config +import ovenapi + + +class Status: + def _cp_dispatch(self, vpath): + if len(vpath): + cherrypy.request.params["vhost"] = vpath.pop(0) + if len(vpath): + cherrypy.request.params["app"] = vpath.pop(0) + if len(vpath): + cherrypy.request.params["stream"] = vpath.pop(0) + + return self + + def __init__(self): + self.api = ovenapi.OvenAPI(config.API_USER, config.API_PASS) + + @cherrypy.expose + @cherrypy.tools.json_out() + def index(self, **params: dict): + vhost = str(params.get("vhost")) + app = str(params.get("app")) + # App status + if "app" in params: + streams = [] + + # Use config.LAST_STREAM_LIST here because this is a cache of the last + # stream list the last time the list of streams changed. It should be + # accurate. + for s_vhost, s_app, s_stream in config.LAST_STREAM_LIST: + if s_vhost == vhost and s_app == app: + streams.append(s_stream) + return streams + + if "vhost" not in params: + vhost = "default" + + # Vhost status + return self.api.get_vhost_stats(vhost) diff --git a/template/management.mako b/template/management.mako new file mode 100644 index 0000000..1966f3f --- /dev/null +++ b/template/management.mako @@ -0,0 +1,143 @@ +<%! + import datetime + from dateutil import tz + + def relative_time(timestamp): + seconds = (datetime.datetime.now(tz.UTC) - datetime.datetime.fromisoformat(timestamp)).seconds + + days = hours = minutes = 0 + if seconds > 86400: + days, seconds = divmod(seconds, 86400) + if seconds > 3600: + hours, seconds = divmod(seconds, 3600) + if seconds > 60: + minutes, seconds = divmod(seconds, 60) + + return f"{days}d, {hours:02d}:{minutes:02d}:{seconds:02d}" +%> + +<%def name="blockdata(disabled, blocked_ips, blocked_keys)"> + % if disabled: +

The server is currently disabled!

+ % endif + + % if blocked_ips: +
Blocked IPs +
    + % for ip in blocked_ips: +
  • ${ip}
  • + % endfor +
+
+ % endif + + % if blocked_keys: +
Disabled Keys +
    + % for streamkey in blocked_keys: +
  • ${streamkey}
  • + % endfor +
+
+ % endif + + % if disabled or blocked_ips or blocked_keys: +
+ % endif + + +<%def name="stream_buttons(vhost, app, stream)"> + + + + + + + + + + + + + + + +
+ ${blockdata(DISABLED, BLOCKED_IPS, DISABLED_KEYS)} + + % for vhost_name, vhost_data in data["vhosts"].items(): +
${vhost_name} vhost + + % for vhost_k, vhost_v in vhost_data.items(): + % if vhost_k != "apps": + + % endif + % endfor +
${vhost_k}${vhost_v}
+ + % for app_name, app_data in vhost_data.get("apps", {}).items(): +
${app_name} + + % for app_k, app_v in app_data.items(): + % if app_k != "streams": + + % endif + % endfor +
${app_k}${app_v}
+ + % for stream_name, stream_data in app_data.get("streams", {}).items(): +
${app_name}/${stream_name} + + % for stream_k, stream_v in stream_data.items(): + <% + if stream_k == "has_bframes" and stream_v: + classname = 'class="alert"' + else: + classname = '' + + if stream_k == "fps_actual": + stream_v = f"{stream_v:.1f}" + elif stream_k.startswith("bitrate_"): + stream_v = f"{stream_v / 1000:.0f}kbps" + elif stream_k == "created": + stream_v = f"{relative_time(stream_v)} ago" + %> + + % endfor +
${stream_k}${stream_v}
+ ${stream_buttons(vhost_name, app_name, stream_name)} +
+ % endfor +
+ % endfor +
+ % endfor + + diff --git a/template/message.mako b/template/message.mako new file mode 100644 index 0000000..b1da7bc --- /dev/null +++ b/template/message.mako @@ -0,0 +1,16 @@ + + + + + + + + +${message}. Returning in 5 seconds. + + diff --git a/template/single.mako b/template/single.mako new file mode 100644 index 0000000..73d9203 --- /dev/null +++ b/template/single.mako @@ -0,0 +1,54 @@ + + + + + + + Single Feed Player - ${app_name}: ${stream_name} + + + + + + + + diff --git a/template/webcall.mako b/template/webcall.mako new file mode 100644 index 0000000..1095ed7 --- /dev/null +++ b/template/webcall.mako @@ -0,0 +1,86 @@ + + + + + + + Webcall Player - ${app_name} + + + + + + + + diff --git a/viewer.py b/viewer.py new file mode 100644 index 0000000..10e8fe9 --- /dev/null +++ b/viewer.py @@ -0,0 +1,54 @@ +from pathlib import Path + +import cherrypy +from mako.template import Template + +import config +import ovenapi + + +class Viewer: + def __init__(self): + self.webcall_template = Path("template/webcall.mako").read_text(encoding="utf-8") + self.single_template = Path("template/single.mako").read_text(encoding="utf-8") + self.api = ovenapi.OvenAPI(config.API_USER, config.API_PASS) + + def app_is_okay(self, app_name): + # If we can't access the API, we can't check if an app exists, just okay it + if not config.is_api_ready(): + return True + + return self.api.app_exists(app_name) + + def _cp_dispatch(self, vpath): + # Extract an app and a page, a la match1/slot1 + if len(vpath): + cherrypy.request.params["app"] = vpath.pop(0) + if len(vpath): + cherrypy.request.params["page"] = vpath.pop(0) + + # This should leave `/` as our path, triggering index() + return self + + @cherrypy.expose + def index(self, **params: dict) -> bytes | str: + if "app" in params and isinstance(params["app"], str): + # Check if the app even exists. If not, fast 404 + if not self.app_is_okay(params["app"]): + cherrypy.response.status = 404 + return "App not found" + + # Get domain for templates + domain = cherrypy.request.base.split("/")[-1].split(":")[0] + + # Any subpath is presumed to be a single player interface for app/stream + if "page" in params: + return Template(self.single_template).render( + domain=domain, app_name=params["app"], stream_name=params["page"] + ) + + # No stream key = pass webcall interface + return Template(self.webcall_template).render(domain=domain, app_name=params["app"]) + + # If we have no subpath at all, return a blank page + return ""