GStreamer Dynamic Pipelines

Another recurring topic with GStreamer since a long time is how to build applications with dynamic pipelines. That is, pipelines in which elements are relinked while the pipeline is playing and without stopping the pipeline.

So, let’s write a bit about it and explain how it all works.

Note however that I’m not covering the most common and simple case here: a demuxer or decodebin adding pads when set to PLAYING, and then connecting to these pads. My example code does this however, but there’s enough documentation about this already.

Also these two examples unfortunately need GStreamer 1.2.3 or newer because of some bugfixes.

The Theory

What’s difficult about dynamic pipelines? Why can’t you just relink elements and their pads at any time like you do when the pipeline is not running? Let’s consider the example of the plumbing in your house. If you want to change something there in the pipeline, you better make sure nothing is flowing through the pipes at that time or otherwise there will be a big mess 🙂

Pad Probes

In GStreamer this is handled with the pad probe mechanism. Pad probes allow to register a callback that is called when ever a specific condition is met. These conditions are expressed with a flags type, and are e.g. GST_PAD_PROBE_TYPE_BUFFER for a buffer arriving at the pad or GST_PAD_PROBE_TYPE_QUERY_UPSTREAM for an upstream query. Additionally these flags specify the scheduling type (not so important), and can specify a blocking type: GST_PAD_PROBE_TYPE_IDLE and GST_PAD_PROBE_TYPE_BLOCK.

gst_pad_add_probe() adds a probe and returns an identifier, which can later be used to remove the probe again from the pad with gst_pad_remove_probe().

The Callback

The probe callback is called whenever the condition is met. In this callback we get an info structure passed, which contains the exact condition that caused the callback to be called and the data that is associated with this. This can be for example the current buffer, the current event or the current query.

From the callback this data can be inspected but it’s also possible to replace the data stored in the info structure.

Once everything we want to do is done inside the callback, the callback has to return a return value. This specifies if the data should be passed on (GST_PAD_PROBE_PASS), should be dropped (GST_PAD_PROBE_DROP), the probe should be removed and the data should be passed (GST_PAD_PROBE_REMOVE) or the default action for this probe type should happen (GST_PAD_PROBE_OK, more on that later).

Note that the callback can be called from an arbitrary thread, and especially is not guaranteed to be called from your main application thread. For all serialized events, buffers and queries it will be called from the corresponding streaming thread.

Also it is important to keep in mind that the callback can be called multiple times (also at once), and that it can also still be called when returning GST_PAD_PROBE_REMOVE from it (another thread might’ve just called into it). It is the job of the callback to protect against that.

Blocking Types

The blocking types of the conditions are of further interest here. Without a blocking type the probe callback can be used to get notified whenever the condition is met, or intercept data flow or even modify events or buffers. That can also be very useful but not for our topic.

Whenever one of the blocking types is specified in the condition, triggering the probe will cause the pad to be blocked. That means that the pad will not pass on any data related to the condition until the probe is removed (with gst_pad_remove_probe() or by returning GST_PAD_PROBE_REMOVE), unless GST_PAD_PROBE_PASS is returned from the callback. This guarantees that nothing else that matches the condition can pass and the callback can safely do it’s work. Especially if GST_PAD_PROBE_TYPE_DATA_BOTH is specified, no data flow can happen and downstream of the pad until the next queue can be safely relinked. To be able to relink parts after the next queues you additionally need to make sure that all data flow has finished until that point too, which can be done with further pad probes (see also the advanced variant of the first example).

Probes with the GST_PAD_PROBE_TYPE_IDLE blocking type will be called the next time the pad is idle, i.e. there is no data flow happening currently. This can also happen immediately if gst_pad_add_probe() is called, directly from the thread that calls gst_pad_add_probe(). Or after the next buffer, event or query is handled.

Probes with the GST_PAD_PROBE_TYPE_BLOCK blocking type will be called the next time the conditions match, and will block the pad before passing on the data. This allows to inspect the buffer, event or query that is currently pending for the pad while still blocking the pad from doing anything else.

The main advantage of GST_PAD_PROBE_TYPE_BLOCK probes is that they provide the data that is currently pending, while the main advantage of GST_PAD_PROBE_TYPE_IDLE is that it is guaranteed to be called as soon as possible (independent of any data coming or not, there might not be any further data at all). It comes with the disadvantage that it might be called directly from the thread that calls gst_pad_add_probe() though. Depending on the use case, one or both of them should be chosen.

Now to the examples.

Example 1: Inserting & removing a filter

In this example we will have a decodebin, connected to a video sink with the navseek element. This allows us to watch any supported video file and seek with the cursor keys. Every 5 seconds a video effect filter will be inserted in front of the sink, or removed if it was inserted last time. All this without ever stopping playback or breaking because of seeking. The code is available here.

Setting up everything

In main() we set up the pipeline and link all parts we can already link, connect to the GstElement::pad-added signal of decodebin and then start a mainloop.

From the pad-added callback we then connect the first video pad that is added on decodebin to the converter in front of the video sink. We also add our periodic 5 second timeout, which will insert/remove the filter here. After this point the pipeline will be PLAYING and the video will be shown.

The insertion/removal of the filter

The timeout callback is quite boring, nothing is happening here other than calling gst_pad_add_probe() to add an IDLE probe. And here we also initialize a variable that protects our probe callback from multiple concurrent calls. We use an IDLE probe here as we’re not interested in the data causing the callback call, and also just want to get the callback called as soon as possible, even from the current thread.

Now the actual insertion or removal of the filter happens in the probe callback. This is the actually interesting part. Here we first check if the callback was already called with an atomic operation, and afterwards either insert or remove the filter. In both cases we need to make sure that all elements are properly linked on their pads afterwards and have the appropriate states. We also have to insert a video convert in front of the filter to make sure that output of the decoder can be handled by our filter.

A slightly more advanced variant

And that’s already all to know about this case. A slightly more complex variant of this is also in gst-plugins-base. The main difference is that BLOCK probes are used here, and the filter is drained with an EOS event before it is replaced. This is done by first adding a BLOCK probe in front of the filter, then from the callback adding another one after the filter and then sending an EOS event to the filter. From the probe after the filter we pass through all data until the EOS event is received and only then remove the filter. This is done for the case that the filter has multiple buffers queued internally. BLOCK probes instead of IDLE probes are used here because we would otherwise potentially send the EOS event from the application’s main thread, which would then block until the EOS event arrived on the other side of the filter and the filter was removed.

Example 2: Adding & removing sinks

The second example also plays a video with decodebin, but randomly adds or removes another video sink every 3 seconds. This uses the tee element for duplicating the video stream. The code can be found here.

Setting up everything

In main() we set up the pipeline and link all parts we can already link, connect to the GstElement::pad-added signal of decodebin and then start a mainloop. Same as in the previous example. We don’t add a sink here yet.

From the pad-added callback we now link decodebin to the tee element, request a first srcpad from tee and link a first sink. This first sink is a fakesink (with sync=TRUE to play in realtime), and is always present. This makes sure that the video is always playing in realtime, even if we have no visible sinks currently. At the end of the callback we add our 3 seconds, periodic timer.

Addition of sinks

In the timeout callback we first get a random number to decide if we now add or remove a sink. If we add a new sink this is all done from the timeout callback (i.e. the application’s main thread) directly. We can do all this from the main thread and without pad probes because there’s no data flow to disrupt. The new tee srcpad is just created here and if tee pushes any buffer through it now it will just be dropped. For adding a sink we just request a new srcpad from the tee and link it to a queue, video converter and sink, sync all the states and remember that we added this sink. A queue is necessary after every tee srcpad because otherwise the tee will lock up (because all tee srcpads are served from a single thread).

Removal of sinks

Removal of sinks is a bit more complicated as now we have to block the relevant pad because there might be data flow happening just now. For this we add an IDLE probe and from the callback unlink and destroy the sink. Again we protect against multiple calls to the callback, and we pass our sink information structure to the callback to know which sink actually should be removed. Note here that we pass g_free() to gst_pad_add_probe() as destroy notify for the sink information structure and don’t free the memory from the callback. This is necessary because the callback can still be called after we released the sink, and we would access already freed memory then.

I hope this helps to understand how dynamic pipelines can be implemented with GStreamer. It should be easily possible to extend these example s to real, more complicated use cases. The concepts are the same in all cases.

GStreamer 1.0 examples for iOS, Android and in general

As the folks at gstreamer.com (not to be confused with the GStreamer project) are still at the old and unmaintained GStreamer 0.10 release series, I started to port all their tutorials and examples to 1.x. You can find the code here: http://cgit.freedesktop.org/~slomo/gst-sdk-tutorials/

This includes the generic tutorials and examples, and ones for iOS and Android. Over the past months many people wanted to try the 1.x binaries for iOS and Android and were asking for examples how to use them. Especially the fourth and fifth tutorials should help to get people started fast, you can find them here (Android) and here (iOS).

If there are any problems with these, please report them to myself or if you suspect any GStreamer bugs report them in Bugzilla. The XCode OS X project files and the Visual Studio project files are ported but I didn’t test them, please report if they work 🙂

Streaming GStreamer pipelines via HTTP

In the past many people joined the GStreamer IRC channel on FreeNode and were asking how to stream a GStreamer pipeline to multiple clients via HTTP. Just explaining how to do it and that it’s actually quite easy might not be that convincing, so here’s a small tool that does exactly that. I called it http-launch and you can get it from GitHub here.

Given a GStreamer pipeline in GstParse syntax (same as e.g. gst-launch), it will start an HTTP server on port 8080, will start the pipeline once the first client connects and then serves from a single pipeline all following clients with the data that it produces.

For example you could call it like this to stream a WebM stream:

Note that this is just a simple example of what you can do with GStreamer and not meant for production use. Something like gst-streaming-server would be better suited for that, especially once it gets support for HLS/DASH or similar protocols.

Now let’s walk through the most important parts of the code.

The HTTP server

First some short words about the HTTP server part. Instead of just using libsoup, I implemented a trivial HTTP server with GIO. Probably not 100% standards compliant or bug-free, but good enough for demonstration purposes :). Also this should be a good example of how the different network classes of GIO go together.

The HTTP server is based on a GSocketService, which listens on a specific port for new connections via a GLib main context, and notifies via a signal whenever there is a new connection. These new connections are provided as a GSocketConnection. These are line 424 and following, and line 240 and following.

In lines 240 and following we start polling the GIOStream of the connection, to be notified whenever new data can be read from the stream. Based on this non-blocking reading from the connection is implemented in line 188 and following. Something like this pattern for non-blocking reading/writing to a socket is also implemented in GStreamer’s GstRTSPConnection.

Here we trivially read data until a complete HTTP message is received (i.e. “\r\n\r\n” is detected in what we read), which is then parsed with the GLib string functions. Only GET and HEAD requests are handled in very simple ways. The GET request will then lead us to the code that connects this HTTP server with GStreamer.

Really, consider using libsoup if you want to implement an HTTP server or client!

The GStreamer pipeline

Now to the GStreamer part of this small application. The actual pipeline is, as explained above, passed via the commandline. This is then parsed and properly set up in line 362 and following. For this GstParse is used, which parses a pipeline string into a real GstBin.

As the pipeline string passed to http-launch must not contain a sink element but end in an element with the name “stream”, we’ll have to get this element now and add our own sink to the bin. We do this by getting the “stream” element via gst_bin_get_by_name(), setting up a GstGhostPad that proxies the source pad of it as a source pad of the bin, and then putting the bin created from the pipeline string and a sink element into a GstPipeline, where both (the bin and the sink) are then connected.

The sink we are using here is multisocketsink, which sends all data received to a set of aplication-provided GSockets. In line 390 and following we set up some properties on the sink that makes sure that newly connected clients start from a keyframe and that the buffering for all clients inside multisocketsink is handled in a sensible way. Instead of letting new clients wait for the next keyframe we could also explicitly request the pipeline to generate a new keyframe each time a client connects.

Now the last part missing is that whenever we successfully received a GET request from a client, we will stop handling any reads/writes from the socket ourselves and pass it to multisocketsink. This is done in line 146. From this point onwards the socket for this client is only handled by multisocketsink. Additionally we start the pipeline here for the first client that has successfully connected.

I hope this showed a bit how one of the lesser known GStreamer elements can be used to stream media to multiple clients, and that GStreamer provides building blocks for almost everything already 😉