ovenemprex/ovenapi.py
Trysdyn Black 3e64cf0612 Use the same API handle across the entire app
This opens doors in the future to storing state in the API class, and is
just cleaner in general.
2025-03-06 20:26:11 -08:00

125 lines
5.1 KiB
Python

import requests
API_HANDLE = None
class OvenAPI:
def __init__(self, username: str, password: str, api_path: str = "http://localhost:8081/v1") -> None:
self.opener = requests.Session()
self.opener.auth = (username, password)
self.api_path = api_path
def __get_api_data(self, rel_path: str, timeout: int = 3) -> dict:
abs_path = f"{self.api_path}/{rel_path.strip('/')}"
return self.opener.get(abs_path, timeout=timeout).json()
def get_vhosts(self) -> list:
return self.__get_api_data("/vhosts").get("response", [])
def get_vhost_info(self, vhost: str) -> dict:
return self.__get_api_data(f"/vhosts/{vhost}").get("response", {})
def get_vhost_apps(self, vhost: str) -> list:
return self.__get_api_data(f"/vhosts/{vhost}/apps").get("response", [])
def get_vhost_stats(self, vhost: str) -> dict:
return self.__get_api_data(f"/stats/current/vhosts/{vhost}").get("response", {})
def get_app_info(self, vhost: str, app: str) -> dict:
return self.__get_api_data(f"/vhosts/{vhost}/apps/{app}").get("response", {})
def get_app_streams(self, vhost: str, app: str) -> list:
return self.__get_api_data(f"/vhosts/{vhost}/apps/{app}/streams").get("response", [])
def get_stream_info(self, vhost: str, app: str, stream: str) -> dict:
return self.__get_api_data(f"/vhosts/{vhost}/apps/{app}/streams/{stream}").get("response", {})
def get_stream_list(self) -> list[tuple]:
streams = set()
for vhost in self.get_vhosts():
for app in self.get_vhost_apps(vhost):
for stream in self.get_app_streams(vhost, app):
streams.add((vhost, app, stream))
return list(streams)
def get_all_stream_info(self) -> dict:
data = {"vhosts": {}}
for vhost in self.get_vhosts():
this_vhost = {"apps": {}}
for app in self.get_vhost_apps(vhost):
this_app = {"streams": {}}
for stream in self.get_app_streams(vhost, app):
resp = self.get_stream_info(vhost, app, stream)
# Simple data: streamer IP, type, start time
this_stream = {
"ip_address": resp["input"]["sourceUrl"].split("://")[1].split(":")[0],
"type": resp["input"]["sourceType"].lower(),
"created": resp["input"]["createdTime"],
}
# Video data: FPS, bitrate
fps_advertised = 0
bitrate_advertised = 0
fps_actual = 0
bitrate_actual = 0
has_bframes = False
for track in resp["input"]["tracks"]:
track_type = track.get("type", "none").lower()
this_track = track.get(track_type, {})
bitrate_advertised += int(this_track.get("bitrate", 0))
bitrate_actual += int(this_track.get("bitrateLatest", 0))
fps_advertised = max(this_track.get("framerate", 0), fps_advertised)
fps_actual = max(this_track.get("framerateLatest", 0), fps_actual)
has_bframes = any([has_bframes, this_track.get("hasBframes", False)])
this_stream["fps_advertised"] = fps_advertised
this_stream["fps_actual"] = fps_actual
this_stream["bitrate_advertised"] = bitrate_advertised
this_stream["bitrate_actual"] = bitrate_actual
this_stream["has_bframes"] = has_bframes
# Stats: We need a different endpoint for this
stats = self.__get_api_data(f"/stats/current/vhosts/{vhost}/apps/{app}/streams/{stream}").get(
"response", {}
)
this_stream["viewers"] = sum(stats.get("connections", {}).values())
# Save this out to the main dict
this_app["streams"][stream] = this_stream
this_vhost["apps"][app] = this_app
data["vhosts"][vhost] = this_vhost
return data
def app_exists(self, app_name: str) -> bool:
return app_name in self.__get_api_data("/vhosts/default/apps").get("response", {})
def disconnect_all(self) -> None:
for stream in self.get_stream_list():
self.disconnect_key(stream[0], stream[1], stream[2])
def disconnect_key(self, vhost: str, app: str, stream: str) -> None:
self.opener.delete(f"{self.api_path}/vhosts/{vhost}/apps/{app}/streams/{stream}")
def get_stream_ip(self, vhost: str, app: str, stream: str) -> str | None:
try:
resp = self.get_stream_info(vhost, app, stream)
return resp["response"]["input"]["sourceUrl"].split("://")[1].split(":")[0]
except Exception:
return None
def get_api_handle(**kwargs):
"""Cache and return a single canonical API handle."""
global API_HANDLE # noqa: PLW0603
if API_HANDLE is None:
API_HANDLE = OvenAPI(**kwargs)
return API_HANDLE