"""Boombox endpoints.
Endpoints are classes defining possible inputs and outputs of
:py:class:`.Boombox`. Each endpoint represents a different media format and has
appropriate attributes that describe it.
Examples:
* MP4("path/to/file.mp4") - an endpoint defining an MP4 container. If
provided for input, then Boombox will read a MP4 file from this location. If
provided for output, then Boombox will create a file at that location and
store the produced stream in it in MP4 format.
* HLS("path/to/playlist") - an endpoint defining a HLS playlist. Only
output is supported. If used, a playlist will be created in the specified
location.
* WebRTC("ws://host:port") - an endpoint defining a WebRTC connection.
Both input and output is supported. Websocket at the provided URL is used as
a signaling channel.
"""
import abc
from ._vendor.term import Atom
from dataclasses import dataclass, KW_ONLY, fields, is_dataclass
from typing import Any, Literal, TypeAlias
from typing_extensions import override
AudioSampleFormat: TypeAlias = Literal[
"s8",
"u8",
"s16le",
"u16le",
"s16be",
"u16be",
"s24le",
"u24le",
"s24be",
"u24be",
"s32le",
"u32le",
"s32be",
"u32be",
"f32le",
"f32be",
"f64le",
"f64be",
]
[docs]
@dataclass
class BoomboxEndpoint(abc.ABC):
"""Abstract base class of a Boombox endpoint.
Boombox endpoints are the definitions of Boombox inputs and outputs. This
class is a base class for these definitions. When creating a Boombox
instance it expects a specification of it's input and output as
BoomboxEndpoints.
Attributes
----------
transcoding_policy : {None, "if_needed", "always", "never"}, optional
Allowed only for output. The default transcoding behavior is "if_needed",
which means that if the format of the media is the same for input and
output, then the stream is not decoded and encoded. This approach saves
resources and time, but in some cases transcoding can be necessary. To
force transcoding regardless if the formats differ or not, this option
can be set to "always". If set to "never", Boombox will never transcode,
raising if the desired operation can't be done without transcoding.
"""
_: KW_ONLY
transcoding_policy: Literal["if_needed", "always", "never"] | None = None
def get_atom_fields(self) -> set[str]:
""":meta private:"""
return {"transcoding_policy"}
# TODO: consider checking whether an endpoint with given attributes is
# valid for a direction, like so:
# def validate_direction(self, direction: Literal['input', 'output']) ->
# bool: ...
[docs]
def serialize(self, direction: Literal["input", "output"]) -> tuple:
"""Serializes itself to an Elixir-compatible term.
To allow Pyrlang to send the endpoint definition to Elixir it first
needs to be serialized into an Elixir-compatible term. This function
serializes the endpoint to a tuple that matches the structure of
Boombox endpoints in Elixir.
Returns
-------
endpoint_tuple : tuple
A tuple representing the endpoint. The first element is an Atom
representing the endpoint name, next elements are required field
values and in case there are any keyword fields they are in the
last element.
"""
assert is_dataclass(self)
atom_fields = self.get_atom_fields()
required_field_values = [
self._serialize_field(f.name, atom_fields)
for f in fields(self)
if not f.kw_only
]
keyword_fields = [
(Atom(f.name), self._serialize_field(f.name, atom_fields))
for f in fields(self)
if f.kw_only and self.__dict__[f.name] is not None
]
if keyword_fields:
return (
self.get_endpoint_name(direction),
*required_field_values,
keyword_fields,
)
else:
return (self.get_endpoint_name(direction), *required_field_values)
def get_endpoint_name(self, direction: Literal["input", "output"]) -> Atom:
""":meta private:"""
return Atom(self.__class__.__name__.lower())
def _serialize_field(self, field_name: str, atom_fields: set[str]) -> Any:
field_value = self.__dict__[field_name]
if not isinstance(field_value, str):
return field_value
elif field_name in atom_fields:
return Atom(field_value)
else:
return field_value.encode()
[docs]
@dataclass
class RawData(BoomboxEndpoint):
"""Endpoint for communication through numpy arrays containing raw media
data.
This endpoint defines the behavior of Boombox allowing for interacting
with Python code directly. For more details refer to
:py:class:`.Boombox` class.
Attributes
----------
audio, video : bool
Determines whether this endpoint will accept/produce video packets,
audio packets, or both.
audio_rate : int, optional
Applicable only when `audio` is set to True and the endpoint defines
the output. Determines the sample rate of the produced stream (number
of samples per second).
audio_channels : int, optional
Applicable only when `audio` is set to True and the endpoint defines
the output. Determines how many channels does the produced stream have.
The channels are interleaved.
audio_format : AudioSampleFormat, optional
Applicable only when `audio` is set to True and the endpoint defines
the output. Determines the sample format of the produced stream.
video_width, video_height : int, optional
Applicable only when `video` is set to True and the endpoint defines
the output. Determines the dimensions of the produced video stream.
pace_control : bool, optional
Allowed only for output. If true the incoming streams will be passed to
the output according to their timestamps, if not they will be passed as
fast as possible. True by default.
is_live : bool, optional
Allowed only for input. If true then Boombox will assume that packets
will be provided in realtime and won't control their pace when passing
them to the output. False by default.
"""
_: KW_ONLY
audio: bool
video: bool
audio_rate: int | None = None
audio_channels: int | None = None
audio_format: AudioSampleFormat | None = None
video_width: int | None = None
video_height: int | None = None
pace_control: bool | None = None
is_live: bool | None = None
@override
def get_endpoint_name(self, direction) -> Atom:
match direction:
case "input":
return Atom("writer")
case "output":
return Atom("reader")
@override
def get_atom_fields(self) -> set[str]:
return {"audio_format"} | super().get_atom_fields()
[docs]
@dataclass
class StorageEndpoint(BoomboxEndpoint, abc.ABC):
"""Abstract base class for storage endpoints.
Storage endpoints are endpoints that are used for putting the media in
some type of storage.
Attributes
----------
location : str
A path to a file or an HTTP URL, location where the media should be
read from or written to.
transport : {None, "file", "http"}, optional:
An optional attribute that explicitly states whether a file or HTTP
storage should be assumed. If not provided transport will be determined
from `location` - paths will resolve to "file" location,
HTTP URLs to "http".
"""
location: str
_: KW_ONLY
transport: Literal["file", "http"] | None = None
@override
def get_atom_fields(self) -> set[str]:
return {"transport"} | super().get_atom_fields()
[docs]
@dataclass
class H264(StorageEndpoint):
"""Storage endpoint for H264 codec.
When used for output the stored stream will have Annex-B format, and when
reading the stream has to already be in Annex-B format.
Attributes
----------
framerate : tuple[int, int], default=(30, 1)
Framerate of the stream, where the tuple defines the numerator and
denominator of it. If not provided 30 FPS will be assumed.
"""
_: KW_ONLY
framerate: tuple[int, int] = (30, 1)
[docs]
@dataclass
class H265(StorageEndpoint):
"""Storage endpoint for H265 codec.
When used for output the stored stream will have Annex-B format, when
reading the stream has to already be in Annex-B format.
Attributes
----------
framerate : tuple[int, int], default=(30, 1)
Framerate of the stream, where the tuple defines the numerator and
denominator of it. If not provided 30 FPS will be assumed.
"""
_: KW_ONLY
framerate: tuple[int, int] = (30, 1)
[docs]
@dataclass
class MP4(StorageEndpoint):
"""Endpoint for MP4 container format."""
pass
[docs]
@dataclass
class AAC(StorageEndpoint):
"""Endpoint for AAC codec."""
pass
[docs]
@dataclass
class WAV(StorageEndpoint):
"""Endpoint for WAV format."""
pass
[docs]
@dataclass
class MP3(StorageEndpoint):
"""Endpoint for MP3 format."""
pass
[docs]
@dataclass
class IVF(StorageEndpoint):
"""Endpoint for IVF container format."""
pass
[docs]
@dataclass
class Ogg(StorageEndpoint):
"""Endpoint for Ogg container format."""
pass
[docs]
@dataclass
class WebRTC(BoomboxEndpoint):
"""Endpoint for communication over WebRTC.
Attributes
----------
signaling : str
URL of the WebSocket that is the signaling channel of the WebRTC
connection.
"""
signaling: str
_: KW_ONLY
[docs]
@dataclass
class WHIP(BoomboxEndpoint):
"""Endpoint for communication over WebRTC-HTTP ingestion protocol (WHIP).
Attributes
----------
url : str
HTTP url of the WHIP server.
token : str
Token used for authentication and authorization.
"""
url: str
_: KW_ONLY
token: str
[docs]
@dataclass
class HLS(BoomboxEndpoint):
"""Endpoint for HTTP Live Streaming. Boombox supports fetching HLS streams
as input and creating HLS playlists as output.
Attributes
----------
location : str
If set for input it should be an URL to location from which to fetch
the HLS stream. If set for output it's a path to the location where
the HLS playlist will be created. If the path is to a directory, then
an "index.m3u8" manifest file and the other files will be created
If a `.m3u8` path is provided, the manifest will be created at that location,
and all associated HLS files will be stored in the same directory.
mode : {"vod", "live"}, optional
If set for output then it determines if the session is live or a VOD
type of broadcast. It can influence type of metadata inserted into the
playlist's manifest.
"""
location: str
_: KW_ONLY
mode: Literal["vod", "live"] = "vod"
@override
def get_atom_fields(self) -> set[str]:
return {"mode"} | super().get_atom_fields()
[docs]
@dataclass
class RTMP(BoomboxEndpoint):
"""Endpoint for communication over Real-Time Messaging Protocol (RTMP).
Currently Boombox supports only client-side functionality - streaming
media to a RTMP server.
Attributes
----------
url : str
URL of a RTMP server.
"""
url: str
[docs]
@dataclass
class RTSP(BoomboxEndpoint):
"""Endpoint for communication over Real-Time Streaming Protocol (RTSP).
Currently Boombox supports only client-side functionality - receiving
media from a RTSP server.
Attributes
----------
url : str
URL of a RTSP server.
"""
url: str
[docs]
@dataclass
class RTP(BoomboxEndpoint):
"""Endpoint for communication over Real-time Transport Protocol (RTP).
Since RTP doesn't incorporate any negotiation a lot of information about
streamed media has to be provided manually via the attributes.
Attributes
----------
port : int
Port on which Boombox will receive the stream or to which it will
send it to.
address : str, optional
IP address which Boombox will send the stream to.
audio_encoding, video_encoding : str, optional
Encoding name of given media type. Has to be the same as it would be
in `rtpmap` attribute of a SDP description.
audio_payload_type, video_payload_type : int, optional
Payload type of given media type. Has to be the same as it would be
in `rtpmap` attribute of a SDP description.
audio_clock_rate, video_clock_rate : int, optional
Clock rate of given media type. Has to be the same as it would be
in `rtpmap` attribute of a SDP description.
aac_bitrate_mode : {None, "lbr", "hbr"}
Applicable only for AAC payload. Defines which mode should be
assumed/set when depayloading/payloading.
audio_specific_config : bytes, optional
Applicable only for AAC payload. Contains crucial information about
the stream and has to be obtained from a side channel.
vps, pps, sps : bytes, optional
Applicable only for H264 and H265 payloads. Parameter sets, could be
obtained from a side channel. They contain information about the
encoded stream.
"""
_: KW_ONLY
port: int
address: str | None = None
video_encoding: str | None = None
video_payload_type: int | None = None
video_clock_rate: int | None = None
audio_encoding: str | None = None
audio_payload_type: int | None = None
audio_clock_rate: int | None = None
aac_bitrate_mode: Literal["lbr", "hbr"] | None = None
audio_specific_config: bytes | None = None
vps: bytes | None = None
pps: bytes | None = None
sps: bytes | None = None
@override
def get_atom_fields(self) -> set[str]:
return {
"video_encoding",
"audio_encoding",
"aac_bitrate_mode",
} | super().get_atom_fields()
[docs]
@dataclass
class SRT(BoomboxEndpoint):
"""Endpoint for communication over Secure Reliable Transport (SRT) protocol.
When using this endpoint as input, Boombox will act as an SRT server and expect
connections from clients at the provided address. When using as output,
Boombox will act as an SRT client and try to connect to a server under the
provided address.
Attributes
----------
url : str
Address of the SRT server in the form of <ip>:<port>. When using this
endpoint as input, this field determines on which address the server
will listen for connections. When using for output, it will determine
to what address to connect.
stream_id : str, optional
ID of the stream. When using this endpoint as input, this field
determines the ID of the stream which the server will accept. When
using for output, it will determine ID of the stream being sent.
password : str, optional
Password used to authenticate the connection. When using this endpoint
as input, this field determines the password the server will require
from clients when connecting. When using for output, it will determine
what password the client will use to authenticate.
"""
url: str
_: KW_ONLY
stream_id: str
password: str
[docs]
@dataclass
class Player(BoomboxEndpoint):
"""Endpoint that plays the output stream with the SDL2 media player."""
pass