Pythonic Deepstream.
NVidia Deepstream is an excellent gstreamer framework which allows to build ai-powered, performant applications running on nvidia hardware. Its python API and bindings, however, have a bunch of painpoints which we've here collected and addressed with pythia:
- Metadata Extraction: Deepstream metadata extraction requires using buffer probes: pythia provides an easy to use interface which splits metadata extraction and processing.
- Metadata Iteration: pyds api iterators are not pythonic: pythia provides intuitive
deepstream metadata iterators to use, like
for frame in frames_per_batch(buffer)
wrapping pybind c++ casting and iteration. - Python boilerplate: Python gstreamer apps get very large very fast. Pythia abstracts away common stuff and lets you focus on your application.
- Quick prototyping: Sometimes you just want to check the performance of a new model
(eg after exporting from Nvidia TAO), or verify
the environment. Pythia comes with ready-to-run demo applications, and a
gst-launch
-like cli.
pythia offers:
- Common metadata extraction and parsing utilities.
- Workers and queues management in the background, to offload processing outside of the buffer probe.
- Ready to use Docker images for both aarch64 (jetson) and x86_64 (nvidia gpu).
gst-pylaunch
You can run familiar pipelines and attach buffer probes from simple python modules.
- Create a file
probe.py
with:
from pythia import objects_per_batch
def gen_detections(batch_meta):
for frame, detection in objects_per_batch(batch_meta):
box = detection.rect_params
yield {
"frame_num": frame.frame_num,
"label": detection.obj_label,
"left": box.left,
"top": box.top,
"width": box.width,
"height": box.height,
"confidence": detection.confidence,
}
- Create a file
pipeline.txt
with:
uridecodebin
uri=file://{input}
! identity
eos-after=30
! nvvideoconvert
! muxer.sink_0
nvstreammux
name=muxer width=1280 height=720 batch-size=1
! nvinfer
name=pgie
config-file-path={pgie-conf}
! nvvideoconvert
! nvdsosd
! nvvideoconvert
! queue
! x264enc
! mp4mux
! filesink location={output}
- run the application with:
$ gst-pylaunch \
-p ./pipeline.txt \
--pgie-conf=/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.txt \
--input=/opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4 \
--output=/tmp/overlayed.mp4 \
--probe=probe.py:gen_detections@pgie.src
Note the --pgie-conf
, --input
, and --output
cli args were dynamically parsed and
added from the pipeline file.
This command instructed pythia to do the following:
- Load a pipeline from a file located at
./pipeline.txt
, which containsgst-launch
-like syntax with some parameters to be inserted (input
,pgie-conf
,output
). - Format the pipelie with
input
,pgie-conf
andoutput
from received parameters. (For a more complex syntax, you can installpythia[jinja]
to use jinja as a template backend. See the documentation for more details.) - Setup a buffer probe which internally calls the
gen_detections
method defined in theprobe.py
file. - Attach said buffer probe in the
source pad
of thepgie
-named element of the pipeline. - Send incoming metadata to a logger which prints jsonified metadata to console.
- Check your console to see the incoming detections.
- Want to do something else with the detections? You can choose between several
backends: logging (stdout , stderr, file available), in-memory (deque),
kafka, redis, or implement your own streaming connector with the
PYTHIA_STREAM_URI
env var. Check the documentation for more details.
python API
If you want more granular control over the behavior of the application, its signals, events, and messages, you can instead program an aplication using pythia's API.
Continuing with the same pipeline as in the previous example,
- Create a file
myscript.py
with:
import json
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from pythia import Application, Gst, objects_per_batch
class App(Application):
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
self.manual_kafka = KafkaProducer(
bootstrap_servers="kafka:9092"
)
def on_message_error(self, *a, **kw):
err, debug = super().on_message_error(*a, **kw)
self.manual_kafka.send(
"app_events",
json.dumps({ "CONDITION":"ERROR", "ERR": err, "DEBUG": debug}).encode()
)
raise RuntimeError("Unhandled pipeline error")
def on_message_eos(self, bus, message):
self.manual_kafka.send(
"app_events",
json.dumps({ "CONDITION":"EOS", "SENT_BY": str(message.src)}).encode()
)
super().on_message_eos(bus, message)
app = App.from_pipeline_file(
"pipeline.txt",
params={
"pgie-conf": "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.txt",
"input": "/opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4",
"output": "/tmp/overlayed.mp4",
}
)
@app.probe(
"pgie",
pad_direction="src",
backend_uri="kafka://kafka:9092?stream=raw_detections"
)
def pgie_srcprobe(batch_meta):
for frame, detection in objects_per_batch(batch_meta):
frame_num = frame.frame_num
box = detection.rect_params
yield {
"frame_num": frame_num,
"label": detection.obj_label,
"left": box.left,
"top": box.top,
"width": box.width,
"height": box.height,
"confidence": detection.confidence,
}
@app.probe(
"muxer",
pad_direction="src",
)
def source_probe(pad, info):
app.manual_kafka.send(
"app_events",
json.dumps({
"CONDITION":"STARTED",
"PAD_CAPS": pad.props.caps.to_string(),
"PAD_DIRECTION": pad.props.direction,
"PAD_OFFSET": pad.props.offset,
}).encode()
)
return Gst.PadProbeReturn.REMOVE
if __name__ == "__main__":
admin = KafkaAdminClient(bootstrap_servers="kafka:9092")
if "app_events" not in admin.list_topics():
admin.create_topics(
new_topics=[
NewTopic(name="app_events", num_partitions=1,replication_factor=1)
],
validate_only=False,
)
app()
- run the application with:
$ python myscript.py
In this mode, you have more control over the application behavior:
- Subclass application
- instantiate a custom message handler (a kafka producer in this example)
- forward error and EOS messages to a custom kafka topic
- interpolate the pipeline template file with python variables to construct the app
- use the
@app.probe
decorator as a generator, letting pythia handle the messages internally - use the
@app.probe
decorator as a probe, handling manually the buffer flow and messaging.
Want to do something else while the application is running? you can run the application
with app(background=True)
instead. See the documentation for details and more
examples.
- Check the kafka topics to see the incoming detections.
- nvidia hardware (either jetson or gpu)
- One of
- recent docker (with support
--gpus=all
) nvidia-docker
installed,- environment with deepstream 6.1.1 and these bindings
- recent docker (with support
pip install pythiags
docker login ghcr.io
(See instructions here)docker pull ghcr.io/rmclabs-io/pythia
orghcr.io/rmclabs-io/pythia-l4t
- Build your image using
FROM ghcr.io/rmclabs-io/pythia
orFROM ghcr.io/rmclabs-io/pythia-l4t
NOTE: latest
tag is deliberately not published.
-
run docker with the following flags at least:
$ docker run \ --gpus=all \ -v /tmp/.X11-unix:/tmp/.X11-unix \ -e DISPLAY \ ghcr.io/rmclabs-io/pythia-dev:1.2.1
Alternatively, you could use ghcr.io/rmclabs-io/pythia-dev
or
FROM ghcr.io/rmclabs-io/pythia-l4t-dev
.
Note: If running from docker, make sure you've properly configured the container and its environment, see the reference upstream
For more examples and tutorials, visit the examples section of the documentation.
Check out ongoing and future development here
-
Q: Package installation fails:
- A1: upgrade your pip:
pip install --upgrade pip
(Required "pip>=10"). - A2: Make sure you've installed the build prerequisites, as listed in
reqs/apt.build.list
.
- A1: upgrade your pip:
-
Q: My application is running slow on Jetson
-
A: Ensure to enable jetson-clocks and maxn (See reference) :
sudo nvpmodel -m 0 sudo jetson_clocks
-
-
Q: Program exits with error
Unable to get a Window, abort.
-
A: Make sure x11 is properly configured. This is common when running through ssh sessions. In most of the cases, this just means you need to have the
DISPLAY
environment variable correctly set. To list available displays, run thew
command:$ w 09:53:38 up 2 days, 17:26, 1 user, load average: 0,36, 0,33, 0,23 USER TTY FROM LOGIN@ IDLE JCPU PCPU WHAT rmclabs :0 :0 lun16 ?xdm? 3:06m 0.02s /usr/lib/gdm3/gdm-x-session --run-script /usr/lib/gnome-session/run-systemd-session unity-session. target rmclabs pts/11 10.100.10.79 09:57 1.00s 0.10s 0.00s w
From here, choose a display corresponding to a local connection (
:0
in this case, including the colon). Then, export the environment variable and run again your program:export DISPLAY=:0 # run your program here
-
-
Q: Program exits with error (from docker):
X Error of failed request: BadShmSeg (invalidshared segment parameter) Major opcode of failed request: 150 (XVideo) Minor opcode of failed request: 19 () Segment id in failed request: 0x121 Serial number of failed request: 57 Current serial number in output stream: 58 python: ../../src/hb-object-private.hh:154: Type* hb_object_reference(Type*) [with Type = hb_unicode_funcs_t]: Assertion `hb_object_is_valid (obj)' failed.
- A: Add
--ipc=host
flag to docker run.
- A: Add
-
Q: Python segfaults when several applications are run subsequently:
- A: It seems to be a race condition produced by the
uridecodebin
element usingnvjpegenc
(maybe others?). You can either add a timeout between runs (1 sec seems to do it), or changenvjpegenc
- seepythia.utils.gst:demote_plugin
.
- A: It seems to be a race condition produced by the
-
Q: Python segfaults when several applications are run subsequently:
- A: It seems to be a race condition produced by the
uridecodebin
element usingnvjpegenc
(maybe others?). You can either add a timeout between runs (1 sec seems to do it), or changenvjpegenc
- seepythia.utils.gst:demote_plugin
.
- A: It seems to be a race condition produced by the
-
Q: I am unable to build the devcontainer:
- A: Make sure to update the
devcontainer.json
with a properBASE_IMAGE
andDOCKER_GROUP_ID
.
- A: Make sure to update the
The workflow is slightly different for internal and external constributors. The general steps, however, remain the same:
- fork
- clone
- Pull Request from new branch
- [Optional, recommended]: use provided devcontainer.
- [Optional, recommended]: run
pre-commit install
to validate commits. - add tests, docs, code, scripts, etc
- [Optional] check code manually, with
./scripts/format
,./scripts/lint
,./scripts/docs
,./scripts/test
, etc. - Commit using Conventional commits.
- push, wait for ci and/or maintainers feedback
- repeat 6-9 until success!
For more instructions, visit the Developers section of the documentation.