This opens doors in the future to storing state in the API class, and is just cleaner in general.
125 lines
5.1 KiB
Python
125 lines
5.1 KiB
Python
import requests
|
|
|
|
|
|
API_HANDLE = None
|
|
|
|
|
|
class OvenAPI:
|
|
def __init__(self, username: str, password: str, api_path: str = "http://localhost:8081/v1") -> None:
|
|
self.opener = requests.Session()
|
|
self.opener.auth = (username, password)
|
|
self.api_path = api_path
|
|
|
|
def __get_api_data(self, rel_path: str, timeout: int = 3) -> dict:
|
|
abs_path = f"{self.api_path}/{rel_path.strip('/')}"
|
|
|
|
return self.opener.get(abs_path, timeout=timeout).json()
|
|
|
|
def get_vhosts(self) -> list:
|
|
return self.__get_api_data("/vhosts").get("response", [])
|
|
|
|
def get_vhost_info(self, vhost: str) -> dict:
|
|
return self.__get_api_data(f"/vhosts/{vhost}").get("response", {})
|
|
|
|
def get_vhost_apps(self, vhost: str) -> list:
|
|
return self.__get_api_data(f"/vhosts/{vhost}/apps").get("response", [])
|
|
|
|
def get_vhost_stats(self, vhost: str) -> dict:
|
|
return self.__get_api_data(f"/stats/current/vhosts/{vhost}").get("response", {})
|
|
|
|
def get_app_info(self, vhost: str, app: str) -> dict:
|
|
return self.__get_api_data(f"/vhosts/{vhost}/apps/{app}").get("response", {})
|
|
|
|
def get_app_streams(self, vhost: str, app: str) -> list:
|
|
return self.__get_api_data(f"/vhosts/{vhost}/apps/{app}/streams").get("response", [])
|
|
|
|
def get_stream_info(self, vhost: str, app: str, stream: str) -> dict:
|
|
return self.__get_api_data(f"/vhosts/{vhost}/apps/{app}/streams/{stream}").get("response", {})
|
|
|
|
def get_stream_list(self) -> list[tuple]:
|
|
streams = set()
|
|
|
|
for vhost in self.get_vhosts():
|
|
for app in self.get_vhost_apps(vhost):
|
|
for stream in self.get_app_streams(vhost, app):
|
|
streams.add((vhost, app, stream))
|
|
|
|
return list(streams)
|
|
|
|
def get_all_stream_info(self) -> dict:
|
|
data = {"vhosts": {}}
|
|
|
|
for vhost in self.get_vhosts():
|
|
this_vhost = {"apps": {}}
|
|
for app in self.get_vhost_apps(vhost):
|
|
this_app = {"streams": {}}
|
|
for stream in self.get_app_streams(vhost, app):
|
|
resp = self.get_stream_info(vhost, app, stream)
|
|
|
|
# Simple data: streamer IP, type, start time
|
|
this_stream = {
|
|
"ip_address": resp["input"]["sourceUrl"].split("://")[1].split(":")[0],
|
|
"type": resp["input"]["sourceType"].lower(),
|
|
"created": resp["input"]["createdTime"],
|
|
}
|
|
|
|
# Video data: FPS, bitrate
|
|
fps_advertised = 0
|
|
bitrate_advertised = 0
|
|
fps_actual = 0
|
|
bitrate_actual = 0
|
|
has_bframes = False
|
|
for track in resp["input"]["tracks"]:
|
|
track_type = track.get("type", "none").lower()
|
|
this_track = track.get(track_type, {})
|
|
|
|
bitrate_advertised += int(this_track.get("bitrate", 0))
|
|
bitrate_actual += int(this_track.get("bitrateLatest", 0))
|
|
fps_advertised = max(this_track.get("framerate", 0), fps_advertised)
|
|
fps_actual = max(this_track.get("framerateLatest", 0), fps_actual)
|
|
has_bframes = any([has_bframes, this_track.get("hasBframes", False)])
|
|
|
|
this_stream["fps_advertised"] = fps_advertised
|
|
this_stream["fps_actual"] = fps_actual
|
|
this_stream["bitrate_advertised"] = bitrate_advertised
|
|
this_stream["bitrate_actual"] = bitrate_actual
|
|
this_stream["has_bframes"] = has_bframes
|
|
|
|
# Stats: We need a different endpoint for this
|
|
stats = self.__get_api_data(f"/stats/current/vhosts/{vhost}/apps/{app}/streams/{stream}").get(
|
|
"response", {}
|
|
)
|
|
this_stream["viewers"] = sum(stats.get("connections", {}).values())
|
|
|
|
# Save this out to the main dict
|
|
this_app["streams"][stream] = this_stream
|
|
this_vhost["apps"][app] = this_app
|
|
data["vhosts"][vhost] = this_vhost
|
|
|
|
return data
|
|
|
|
def app_exists(self, app_name: str) -> bool:
|
|
return app_name in self.__get_api_data("/vhosts/default/apps").get("response", {})
|
|
|
|
def disconnect_all(self) -> None:
|
|
for stream in self.get_stream_list():
|
|
self.disconnect_key(stream[0], stream[1], stream[2])
|
|
|
|
def disconnect_key(self, vhost: str, app: str, stream: str) -> None:
|
|
self.opener.delete(f"{self.api_path}/vhosts/{vhost}/apps/{app}/streams/{stream}")
|
|
|
|
def get_stream_ip(self, vhost: str, app: str, stream: str) -> str | None:
|
|
try:
|
|
resp = self.get_stream_info(vhost, app, stream)
|
|
return resp["response"]["input"]["sourceUrl"].split("://")[1].split(":")[0]
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def get_api_handle(**kwargs):
|
|
"""Cache and return a single canonical API handle."""
|
|
global API_HANDLE # noqa: PLW0603
|
|
if API_HANDLE is None:
|
|
API_HANDLE = OvenAPI(**kwargs)
|
|
|
|
return API_HANDLE
|