import requests API_HANDLE = None class OvenAPI: def __init__(self, username: str, password: str, api_path: str = "http://localhost:8081/v1") -> None: self.opener = requests.Session() self.opener.auth = (username, password) self.api_path = api_path def __get_api_data(self, rel_path: str, timeout: int = 3) -> dict: abs_path = f"{self.api_path}/{rel_path.strip('/')}" return self.opener.get(abs_path, timeout=timeout).json() def get_vhosts(self) -> list: return self.__get_api_data("/vhosts").get("response", []) def get_vhost_info(self, vhost: str) -> dict: return self.__get_api_data(f"/vhosts/{vhost}").get("response", {}) def get_vhost_apps(self, vhost: str) -> list: return self.__get_api_data(f"/vhosts/{vhost}/apps").get("response", []) def get_vhost_stats(self, vhost: str) -> dict: return self.__get_api_data(f"/stats/current/vhosts/{vhost}").get("response", {}) def get_app_info(self, vhost: str, app: str) -> dict: return self.__get_api_data(f"/vhosts/{vhost}/apps/{app}").get("response", {}) def get_app_streams(self, vhost: str, app: str) -> list: return self.__get_api_data(f"/vhosts/{vhost}/apps/{app}/streams").get("response", []) def get_stream_info(self, vhost: str, app: str, stream: str) -> dict: return self.__get_api_data(f"/vhosts/{vhost}/apps/{app}/streams/{stream}").get("response", {}) def get_stream_list(self) -> list[tuple]: streams = set() for vhost in self.get_vhosts(): for app in self.get_vhost_apps(vhost): for stream in self.get_app_streams(vhost, app): streams.add((vhost, app, stream)) return list(streams) def get_all_stream_info(self) -> dict: data = {"vhosts": {}} for vhost in self.get_vhosts(): this_vhost = {"apps": {}} for app in self.get_vhost_apps(vhost): this_app = {"streams": {}} for stream in self.get_app_streams(vhost, app): resp = self.get_stream_info(vhost, app, stream) # Simple data: streamer IP, type, start time this_stream = { "ip_address": resp["input"]["sourceUrl"].split("://")[1].split(":")[0], "type": resp["input"]["sourceType"].lower(), "created": resp["input"]["createdTime"], } # Video data: FPS, bitrate fps_advertised = 0 bitrate_advertised = 0 fps_actual = 0 bitrate_actual = 0 has_bframes = False for track in resp["input"]["tracks"]: track_type = track.get("type", "none").lower() this_track = track.get(track_type, {}) bitrate_advertised += int(this_track.get("bitrate", 0)) bitrate_actual += int(this_track.get("bitrateLatest", 0)) fps_advertised = max(this_track.get("framerate", 0), fps_advertised) fps_actual = max(this_track.get("framerateLatest", 0), fps_actual) has_bframes = any([has_bframes, this_track.get("hasBframes", False)]) this_stream["fps_advertised"] = fps_advertised this_stream["fps_actual"] = fps_actual this_stream["bitrate_advertised"] = bitrate_advertised this_stream["bitrate_actual"] = bitrate_actual this_stream["has_bframes"] = has_bframes # Stats: We need a different endpoint for this stats = self.__get_api_data(f"/stats/current/vhosts/{vhost}/apps/{app}/streams/{stream}").get( "response", {} ) this_stream["viewers"] = sum(stats.get("connections", {}).values()) # Save this out to the main dict this_app["streams"][stream] = this_stream this_vhost["apps"][app] = this_app data["vhosts"][vhost] = this_vhost return data def app_exists(self, app_name: str) -> bool: return app_name in self.__get_api_data("/vhosts/default/apps").get("response", {}) def disconnect_all(self) -> None: for stream in self.get_stream_list(): self.disconnect_key(stream[0], stream[1], stream[2]) def disconnect_key(self, vhost: str, app: str, stream: str) -> None: self.opener.delete(f"{self.api_path}/vhosts/{vhost}/apps/{app}/streams/{stream}") def get_stream_ip(self, vhost: str, app: str, stream: str) -> str | None: try: resp = self.get_stream_info(vhost, app, stream) return resp["response"]["input"]["sourceUrl"].split("://")[1].split(":")[0] except Exception: return None def get_api_handle(**kwargs): """Cache and return a single canonical API handle.""" global API_HANDLE # noqa: PLW0603 if API_HANDLE is None: API_HANDLE = OvenAPI(**kwargs) return API_HANDLE