"""Module containing Boombox class and media packet classes."""
from __future__ import annotations
import numpy as np
import asyncio
import uuid
import subprocess
import atexit
import os
import sys
import warnings
import dataclasses
import threading
import tqdm
import urllib.request
import platformdirs
import platform
import importlib.metadata
import tarfile
import logging
from ._vendor.pyrlang import process, node
from ._vendor.term import Atom, Pid
from .endpoints import BoomboxEndpoint, AudioSampleFormat
from typing import Generator, ClassVar, Literal, Optional, Any, get_args
from typing_extensions import override
RELEASES_URL = "https://github.com/membraneframework/boombox/releases"
PACKAGE_NAME = "boomboxlib"
[docs]
class Boombox(process.Process):
"""
Boombox is a tool that allows to transform a media stream from one
format into another.
When creating an instance of Boombox the input and output are
specified by providing an appropriate endpoint object for each of them.
These objects define the format and its parameters that are used for
the input or output, whichever they were provided for.
For example, if an ``RTMP("rtmp://my.stream.source:2137/app/key")`` endpoint
was provided for input and ``MP4("path/to/target.mp4")`` for output, then
Boombox will become a RTMP server, wait for clients to connect and save
the acquired stream to a ``.mp4`` file at the provided location.
Input and output can also be specified by strings alone, as in
``"rtmp://my.stream.source:2137/app/key"`` or ``"path/to/target.mp4"``,
and Boombox will automatically interpret them as :py:class:`.RTMP` and
:py:class:`.MP4` endpoints.
For more information about endpoints and to see supported formats refer to
:py:mod:`.endpoints`.
One of the main reasons to use Boombox in a Python project is to interact
with it from Python code directly. This can be achieved with
:py:class:`.RawData` endpoint, that enables methods allowing for this
interactions.
If the input is defined by :py:class:`.RawData`, Boombox will accept packets
provided by :py:meth:`.write` method. This method will return once Boombox
has processed the provided packet and is ready for the next one. Once
Boombox should stop accepting more packets, :py:meth:`.close` should be
called to informing it about the end of the stream. Calling this method can
be skipped if opening Boombox using a context manager.
If the output is defined by :py:class:`.RawData`, Boombox will produce
packets that are yielded by a generator returned by :py:meth:`.read`.
These methods operate on :py:class:`.AudioPacket` and
:py:class:`.VideoPacket` objects. These objects contain raw media data and
accompanying metadata.
If not using :py:class:`.RawData` endpoints Boombox operates fully
asynchronously, it'll work in the background if not interacted with. For
more control regarding the termination of Boombox refer to
:py:meth:`.close`, :py:meth:`.wait` and :py:meth:`.kill` methods.
Parameters
----------
input, output : BoomboxEndpoint or str
Definition of an input or output of Boombox. Can be provided explicitly
by an appropriate :py:class:`.BoomboxEndpoint` or a string of a path to
a file or an URL, that Boombox will attempt to interpret as an endpoint.
Attributes
----------
logger : ClassVar[logging.Logger]
Logger used in this class
"""
logger: ClassVar[logging.Logger]
_python_node_name: ClassVar[str]
_cookie: ClassVar[str]
_data_dir: str
_server_release_path: str
_version: str
_process_name: Atom
_erlang_node_name: Atom
_receiver: tuple[Atom, Atom] | Pid
_response: Optional[asyncio.Future]
_terminated: asyncio.Future
_finished: bool
_erlang_process: subprocess.Popen
_boombox_mode: Atom
_python_node_name = f"{uuid.uuid4()}@127.0.0.1"
_cookie = str(uuid.uuid4())
_node = node.Node(node_name=_python_node_name, cookie=_cookie)
logger = logging.getLogger(__name__)
threading.Thread(target=_node.run, daemon=True).start()
def __init__(
self, input: BoomboxEndpoint | str, output: BoomboxEndpoint | str
) -> None:
self._process_name = Atom(uuid.uuid4())
self._erlang_node_name = Atom(f"{self._process_name}@127.0.0.1")
env = {
"BOOMBOX_NODE_TO_PING": self._python_node_name,
"RELEASE_NODE": self._erlang_node_name,
"RELEASE_COOKIE": self._cookie,
"RELEASE_DISTRIBUTION": "name",
}
self._download_elixir_boombox_release()
self._erlang_process = subprocess.Popen(
[self._server_release_path, "start"], env=env
)
atexit.register(lambda: self._erlang_process.kill())
super().__init__(True)
self.get_node().register_name(self, self._process_name)
self._terminated = self.get_node().get_loop().create_future()
self._finished = False
self._receiver = (self._erlang_node_name, Atom("boombox_server"))
self._receiver = self._call(Atom("get_pid"))
self.get_node().monitor_process(self.pid_, self._receiver)
boombox_arg = [
(Atom("input"), self._serialize_endpoint(input, "input")),
(Atom("output"), self._serialize_endpoint(output, "output")),
]
self._boombox_mode = self._call((Atom("run"), boombox_arg))
[docs]
def read(self) -> Generator[AudioPacket | VideoPacket, None, None]:
"""Read media packets produced by Boombox.
Enabled only if Boombox has been initialized with output defined with
an :py:class:`.RawData` endpoint.
This generator yields packets as fast as Boombox produces them.
Yields
------
AudioPacket or VideoPacket
Raw media packets produced by Boombox.
Raises
------
RuntimeError
If Boombox's output was not defined by an :py:class:`.RawData` endpoint.
"""
while True:
match self._call(Atom("produce_packet")):
case (Atom("ok"), packet):
yield self._deserialize_packet(packet)
case Atom("finished"):
return
case (Atom("error"), Atom("incompatible_mode")):
raise RuntimeError("Output not defined with an RawData endpoint.")
case other:
raise RuntimeError(f"Unknown response: {other}")
[docs]
def write(self, packet: AudioPacket | VideoPacket) -> bool:
"""Write packets to Boombox.
Enabled only if Boombox has been initialized with input defined with an
:py:class:`.RawData` endpoint and if Boombox hasn't already finished
accepting packets.
This method provides Boombox with a packet to process and returns once
Boombox is ready to accept the next packet.
Parameters
----------
packet : AudioPacket or VideoPacket
Raw media packet to be consumed by Boombox.
Returns
-------
finished : bool
If true then Boombox has finished accepting packets and closed its
input for any further ones. Once it finishes processing the
previously provided packet, it will terminate.
Raises
------
RuntimeError
If Boombox's input was not defined with an :py:class:`.RawData`
endpoint or if Boombox has already finished accepting packets.
"""
if self._finished:
raise RuntimeError("Boombox has already finished accepting packets.")
serialized_packet = self._serialize_packet(packet)
match self._call((Atom("consume_packet"), serialized_packet)):
case Atom("finished"):
self._finished = True
return True
case Atom("ok"):
return False
case (Atom("error"), Atom("incompatible_mode")):
raise RuntimeError("Input should be defined with an RawData endpoint.")
case other:
raise RuntimeError(f"Unknown response: {other}")
[docs]
def close(self, wait: bool = True, kill: bool = False) -> None:
"""Closes Boombox for writing or reading.
Enabled only if Boombox has been initialized with input or output defined
with a :py:class:`.RawData` endpoint.
This method informs Boombox that it shouldn't expect or produce any more packets.
Parameters
----------
wait : bool, default=True
Determines whether this method should wait until Boombox finishes
it's operation and only then return, or if it should return
immediately and let Boombox finish in the background. Ignored if
`kill` set to true.
kill : bool, default=False
Determines whether Boombox should be killed without waiting for it
to gracefully finish it's operation. If True this method will
return immediately.
Raises
------
RuntimeError
If neither of Boombox's input or output was defined with a
:py:class:`.RawData` endpoint.
"""
match self._boombox_mode:
case Atom("consuming"):
request = Atom("finish_consuming")
case Atom("producing"):
request = Atom("finish_producing")
case other:
raise RuntimeError(
"Can't close boombox if not using a RawData endpoint"
)
match self._call(request):
case Atom("ok"):
if kill:
self.kill()
elif wait:
self.wait()
case other:
raise RuntimeError(f"Unknown response: {other}")
[docs]
def wait(self) -> None:
"""Waits until Boombox finishes it's operation and then returns."""
asyncio.run_coroutine_threadsafe(
self._await_future(self._terminated), self.get_node().get_loop()
).result()
[docs]
def kill(self) -> None:
"""Forces Boombox to exit without waiting for it to gracefully finish
it's operation."""
self._erlang_process.kill()
def __enter__(self) -> Boombox:
return self
def __exit__(self, *_) -> None:
self.close()
@override
def handle_one_inbox_message(self, msg: Any) -> None:
""":meta private:"""
assert self._response is not None
match msg:
case (Atom("response"), response):
if not self._response.done():
self._response.set_result(response)
case (Atom("DOWN"), _, Atom("process"), _, Atom("normal")):
self._terminated.set_result(Atom("normal"))
case (Atom("DOWN"), _, Atom("process"), _, reason):
self._terminated.set_result(reason)
if not self._response.done():
self._response.set_exception(
RuntimeError(f"Boombox crashed with reason {reason}")
)
@override
def exit(self, reason: Any = None) -> None:
""":meta private:"""
self._terminated.set_result(None)
super().exit(reason)
def _call(self, request: Any) -> Any:
message = (
Atom("call"),
(Atom(self._process_name), Atom(self.node_name_)),
request,
)
self._handle_termination()
self.get_node().send_nowait(
sender=self, receiver=self._receiver, message=message
)
self._response = self.get_node().get_loop().create_future()
response = asyncio.run_coroutine_threadsafe(
self._await_future(self._response), self.get_node().get_loop()
).result()
self._handle_termination()
return response
def _handle_termination(self) -> None:
if self._terminated.done():
if (reason := self._terminated.result()) != Atom("normal"):
raise RuntimeError(f"Boombox crashed with reason {reason}")
async def _await_future(self, response_future):
return await response_future
def _download_elixir_boombox_release(self) -> None:
class TqdmUpTo(tqdm.tqdm):
def update_to(self, b=1, bsize=1, tsize=None):
if tsize is not None:
self.total = tsize
self.update(b * bsize - self.n)
try:
self._version = importlib.metadata.version(PACKAGE_NAME)
except importlib.metadata.PackageNotFoundError:
self._version = "dev"
self._data_dir = platformdirs.user_data_dir(
appname=PACKAGE_NAME, ensure_exists=True, version=self._version
)
self._server_release_path = os.path.join(self._data_dir, "bin", "server")
if os.path.exists(self._server_release_path):
self.logger.info("Elixir boombox release already present.")
return
self.logger.info("Elixir boombox release not found, downloading...")
if self._version == "dev":
release_url = os.path.join(RELEASES_URL, "latest/download")
else:
release_url = os.path.join(RELEASES_URL, f"download/v{self._version}")
system = platform.system().lower()
arch = platform.machine().lower()
if system == "linux" and arch == "x86_64":
release_tarball = "boombox-server-linux-x86.tar.gz"
elif system == "darwin" and arch == "arm64":
release_tarball = "boombox-server-macos-arm.tar.gz"
else:
raise RuntimeError(f"Unsupported platform: {system} {arch}")
download_url = os.path.join(release_url, release_tarball)
tarball_path = os.path.join(self._data_dir, release_tarball)
with TqdmUpTo(
unit="B",
unit_scale=True,
unit_divisor=1024,
miniters=1,
desc=f"Downloading {release_tarball} from {release_url}",
) as t:
urllib.request.urlretrieve(
download_url, filename=tarball_path, reporthook=t.update_to
)
self.logger.info("Download complete. Extracting...")
with tarfile.open(tarball_path) as tar:
tar.extractall(self._data_dir)
os.remove(tarball_path)
@staticmethod
def _dtype_to_sample_format(dtype: np.dtype) -> tuple[AudioSampleFormat, np.dtype]:
type_mapping = {"i": "s", "u": "u", "f": "f"}
data_type = type_mapping.get(dtype.kind)
if data_type is None:
warnings.warn(
f"Arrays of dtype.kind == {dtype.kind} not allowed, supported kinds are 'i', 'u', 'f', casting to 'f'."
)
data_type = "f"
if dtype.itemsize in [1, 2, 4, 8]:
bit_size = str(dtype.itemsize * 8)
else:
warnings.warn(
f"Item size {dtype.itemsize} not allowed, supported item sizes are 1, 2, 4 and 8, casting to 4"
)
bit_size = "32"
match dtype.byteorder:
case "<":
endian = "le"
case ">":
endian = "be"
case "=":
endian = "le" if sys.byteorder == "little" else "be"
case "|":
endian = ""
sample_format = data_type + bit_size + endian
# trick to ensure that sample_format is of AudioSampleFormat type
audio_sample_formats: tuple[AudioSampleFormat, ...] = get_args(
AudioSampleFormat
)
assert sample_format in audio_sample_formats
new_dtype = Boombox._sample_format_to_dtype(sample_format)
return sample_format, new_dtype
@staticmethod
def _sample_format_to_dtype(sample_format: AudioSampleFormat) -> np.dtype:
type_mapping = {"s": "i", "u": "u", "f": "f"}
if len(sample_format) == 2: # Handle 8-bit case (without endian)
data_type = sample_format[0]
dtype_str = type_mapping[data_type] + "1"
else:
data_type = sample_format[0]
bit_size = sample_format[1:-2]
endian = sample_format[-2:]
# numpy doesn't support 24-bit size values
bit_size = 32 if bit_size == "24" else int(bit_size)
endian_symbol = "<" if endian == "le" else ">"
dtype_str = (
endian_symbol + type_mapping[data_type] + str(int(bit_size) // 8)
)
return np.dtype(dtype_str)
@staticmethod
def _audio_bytes_to_array(
data: bytes, sample_format: AudioSampleFormat
) -> np.ndarray:
dtype = Boombox._sample_format_to_dtype(sample_format)
if sample_format not in ["s24le", "u24le", "s24be", "u24be"]:
return np.frombuffer(data, dtype)
else:
# numpy doesn't support 24-bit samples, this transforms them to 32-bit.
endian = "little" if sample_format[-2:] == "le" else "big"
is_signed = sample_format[0] == "s"
return np.array(
int.from_bytes(data[i : i + 3], endian, signed=is_signed)
for i in range(0, len(data), 3)
)
@staticmethod
def _deserialize_packet(packet: dict[Atom, Any]) -> AudioPacket | VideoPacket:
media_type, payload = packet[Atom("payload")]
if media_type == Atom("audio"):
deserialized_payload = Boombox._audio_bytes_to_array(
payload[Atom("data")], payload[Atom("sample_format")]
)
return AudioPacket(
deserialized_payload,
packet[Atom("timestamp")],
sample_rate=payload[Atom("sample_rate")],
channels=payload[Atom("channels")],
)
else:
shape = (
payload[Atom("height")],
payload[Atom("width")],
payload[Atom("channels")],
)
deserialized_payload = np.frombuffer(
payload[Atom("data")], np.uint8
).reshape(shape)
return VideoPacket(deserialized_payload, packet[Atom("timestamp")])
@staticmethod
def _serialize_packet(packet: AudioPacket | VideoPacket) -> dict[Atom, Any]:
match packet:
case AudioPacket():
sample_format, new_dtype = Boombox._dtype_to_sample_format(
packet.payload.dtype
)
serialized_payload = (
Atom("audio"),
{
Atom("data"): packet.payload.astype(new_dtype).tobytes(),
Atom("sample_format"): Atom(sample_format),
Atom("sample_rate"): packet.sample_rate,
Atom("channels"): packet.channels,
},
)
case VideoPacket():
frame = packet.payload.clip(0, 255)
raw_frame = frame.tobytes()
serialized_payload = (
Atom("video"),
{
Atom("data"): raw_frame,
Atom("height"): frame.shape[0],
Atom("width"): frame.shape[1],
Atom("channels"): frame.shape[2],
},
)
return {
Atom("payload"): serialized_payload,
Atom("timestamp"): packet.timestamp,
}
@staticmethod
def _serialize_endpoint(
endpoint: BoomboxEndpoint | str, direction: Literal["input", "output"]
) -> Any:
if isinstance(endpoint, str):
return endpoint.encode()
else:
return endpoint.serialize(direction)
[docs]
@dataclasses.dataclass
class VideoPacket:
"""A Boombox packet containing raw video.
Objects of this class are used when writing/reading raw video data to/from
Boombox through the usage of :py:class:`.RawData` endpoint.
Attributes
----------
payload : np.ndarray
Raw data of a single frame of the video stream. Shape of the array is
(width, height, channels).
timestamp : int
Timestamp of the frame in milliseconds.
"""
payload: np.ndarray
timestamp: int
[docs]
@dataclasses.dataclass
class AudioPacket:
"""A Boombox packet containing raw audio.
Objects of this class are used when writing/reading raw audio data to/from
Boombox through the usage of :py:class:`.RawData` endpoint.
Attributes
----------
payload : np.ndarray
Raw data of an audio chunk of the video stream. The array is
one-dimentional.
timestamp : int
Timestamp of the first sample in the chunk in milliseconds.
sample_rate : int
Number of audio samples per second.
channels : int
Number of channels interleaved in the stream.
"""
payload: np.ndarray
timestamp: int
sample_rate: int
channels: int