ovenemprex/ovenapi.py
Trysdyn Black c83acbdcfe Reinitialize repo to remove private data
10,000 hours mucking with `git filter-repo` and no reasonable use-case
found. On the plus side, anyone looking at this and curious what I nuked
isn't missing much. This lived in a monorepo up until about a week ago.
2024-12-20 14:45:49 -08:00

113 lines
4.9 KiB
Python

import requests
class OvenAPI:
def __init__(self, username: str, password: str, api_path: str = "http://localhost:8081/v1") -> None:
self.opener = requests.Session()
self.opener.auth = (username, password)
self.api_path = api_path
def __get_api_data(self, rel_path: str, timeout: int = 3) -> dict:
abs_path = f"{self.api_path}/{rel_path.strip('/')}"
return self.opener.get(abs_path, timeout=timeout).json()
def get_vhosts(self) -> list:
return self.__get_api_data("/vhosts").get("response", [])
def get_vhost_info(self, vhost: str) -> dict:
return self.__get_api_data(f"/vhosts/{vhost}").get("response", {})
def get_vhost_apps(self, vhost: str) -> list:
return self.__get_api_data(f"/vhosts/{vhost}/apps").get("response", [])
def get_vhost_stats(self, vhost: str) -> dict:
return self.__get_api_data(f"/stats/current/vhosts/{vhost}").get("response", {})
def get_app_info(self, vhost: str, app: str) -> dict:
return self.__get_api_data(f"/vhosts/{vhost}/apps/{app}").get("response", {})
def get_app_streams(self, vhost: str, app: str) -> list:
return self.__get_api_data(f"/vhosts/{vhost}/apps/{app}/streams").get("response", [])
def get_stream_info(self, vhost: str, app: str, stream: str) -> dict:
return self.__get_api_data(f"/vhosts/{vhost}/apps/{app}/streams/{stream}").get("response", {})
def get_stream_list(self) -> list[tuple]:
streams = set()
for vhost in self.get_vhosts():
for app in self.get_vhost_apps(vhost):
for stream in self.get_app_streams(vhost, app):
streams.add((vhost, app, stream))
return list(streams)
def get_all_stream_info(self) -> dict:
data = {"vhosts": {}}
for vhost in self.get_vhosts():
this_vhost = {"apps": {}}
for app in self.get_vhost_apps(vhost):
this_app = {"streams": {}}
for stream in self.get_app_streams(vhost, app):
resp = self.get_stream_info(vhost, app, stream)
# Simple data: streamer IP, type, start time
this_stream = {
"ip_address": resp["input"]["sourceUrl"].split("://")[1].split(":")[0],
"type": resp["input"]["sourceType"].lower(),
"created": resp["input"]["createdTime"],
}
# Video data: FPS, bitrate
fps_advertised = 0
bitrate_advertised = 0
fps_actual = 0
bitrate_actual = 0
has_bframes = False
for track in resp["input"]["tracks"]:
track_type = track.get("type", "none").lower()
this_track = track.get(track_type, {})
bitrate_advertised += int(this_track.get("bitrate", 0))
bitrate_actual += int(this_track.get("bitrateLatest", 0))
fps_advertised = max(this_track.get("framerate", 0), fps_advertised)
fps_actual = max(this_track.get("framerateLatest", 0), fps_actual)
has_bframes = any([has_bframes, this_track.get("hasBframes", False)])
this_stream["fps_advertised"] = fps_advertised
this_stream["fps_actual"] = fps_actual
this_stream["bitrate_advertised"] = bitrate_advertised
this_stream["bitrate_actual"] = bitrate_actual
this_stream["has_bframes"] = has_bframes
# Stats: We need a different endpoint for this
stats = self.__get_api_data(f"/stats/current/vhosts/{vhost}/apps/{app}/streams/{stream}").get(
"response", {}
)
this_stream["viewers"] = sum(stats.get("connections", {}).values())
# Save this out to the main dict
this_app["streams"][stream] = this_stream
this_vhost["apps"][app] = this_app
data["vhosts"][vhost] = this_vhost
return data
def app_exists(self, app_name: str) -> bool:
return app_name in self.__get_api_data("/vhosts/default/apps").get("response", {})
def disconnect_all(self) -> None:
for stream in self.get_stream_list():
self.disconnect_key(stream[0], stream[1], stream[2])
def disconnect_key(self, vhost: str, app: str, stream: str) -> None:
self.opener.delete(f"{self.api_path}/vhosts/{vhost}/apps/{app}/streams/{stream}")
def get_stream_ip(self, vhost: str, app: str, stream: str) -> str | None:
try:
resp = self.get_stream_info(vhost, app, stream)
return resp["response"]["input"]["sourceUrl"].split("://")[1].split(":")[0]
except Exception:
return None