Another recurring topic with GStreamer since a long time is how to build applications with dynamic pipelines. That is, pipelines in which elements are relinked while the pipeline is playing and without stopping the pipeline.
So, let’s write a bit about it and explain how it all works.
Note however that I’m not covering the most common and simple case here: a demuxer or decodebin adding pads when set to PLAYING, and then connecting to these pads. My example code does this however, but there’s enough documentation about this already.
Also these two examples unfortunately need GStreamer 1.2.3 or newer because of some bugfixes.
What’s difficult about dynamic pipelines? Why can’t you just relink elements and their pads at any time like you do when the pipeline is not running? Let’s consider the example of the plumbing in your house. If you want to change something there in the pipeline, you better make sure nothing is flowing through the pipes at that time or otherwise there will be a big mess 🙂
In GStreamer this is handled with the pad probe mechanism. Pad probes allow to register a callback that is called when ever a specific condition is met. These conditions are expressed with a flags type, and are e.g. GST_PAD_PROBE_TYPE_BUFFER for a buffer arriving at the pad or GST_PAD_PROBE_TYPE_QUERY_UPSTREAM for an upstream query. Additionally these flags specify the scheduling type (not so important), and can specify a blocking type: GST_PAD_PROBE_TYPE_IDLE and GST_PAD_PROBE_TYPE_BLOCK.
gst_pad_add_probe() adds a probe and returns an identifier, which can later be used to remove the probe again from the pad with gst_pad_remove_probe().
The probe callback is called whenever the condition is met. In this callback we get an info structure passed, which contains the exact condition that caused the callback to be called and the data that is associated with this. This can be for example the current buffer, the current event or the current query.
From the callback this data can be inspected but it’s also possible to replace the data stored in the info structure.
Once everything we want to do is done inside the callback, the callback has to return a return value. This specifies if the data should be passed on (GST_PAD_PROBE_PASS), should be dropped (GST_PAD_PROBE_DROP), the probe should be removed and the data should be passed (GST_PAD_PROBE_REMOVE) or the default action for this probe type should happen (GST_PAD_PROBE_OK, more on that later).
Note that the callback can be called from an arbitrary thread, and especially is not guaranteed to be called from your main application thread. For all serialized events, buffers and queries it will be called from the corresponding streaming thread.
Also it is important to keep in mind that the callback can be called multiple times (also at once), and that it can also still be called when returning GST_PAD_PROBE_REMOVE from it (another thread might’ve just called into it). It is the job of the callback to protect against that.
The blocking types of the conditions are of further interest here. Without a blocking type the probe callback can be used to get notified whenever the condition is met, or intercept data flow or even modify events or buffers. That can also be very useful but not for our topic.
Whenever one of the blocking types is specified in the condition, triggering the probe will cause the pad to be blocked. That means that the pad will not pass on any data related to the condition until the probe is removed (with gst_pad_remove_probe() or by returning GST_PAD_PROBE_REMOVE), unless GST_PAD_PROBE_PASS is returned from the callback. This guarantees that nothing else that matches the condition can pass and the callback can safely do it’s work. Especially if GST_PAD_PROBE_TYPE_DATA_BOTH is specified, no data flow can happen and downstream of the pad until the next queue can be safely relinked. To be able to relink parts after the next queues you additionally need to make sure that all data flow has finished until that point too, which can be done with further pad probes (see also the advanced variant of the first example).
Probes with the GST_PAD_PROBE_TYPE_IDLE blocking type will be called the next time the pad is idle, i.e. there is no data flow happening currently. This can also happen immediately if gst_pad_add_probe() is called, directly from the thread that calls gst_pad_add_probe(). Or after the next buffer, event or query is handled.
Probes with the GST_PAD_PROBE_TYPE_BLOCK blocking type will be called the next time the conditions match, and will block the pad before passing on the data. This allows to inspect the buffer, event or query that is currently pending for the pad while still blocking the pad from doing anything else.
The main advantage of GST_PAD_PROBE_TYPE_BLOCK probes is that they provide the data that is currently pending, while the main advantage of GST_PAD_PROBE_TYPE_IDLE is that it is guaranteed to be called as soon as possible (independent of any data coming or not, there might not be any further data at all). It comes with the disadvantage that it might be called directly from the thread that calls gst_pad_add_probe() though. Depending on the use case, one or both of them should be chosen.
Now to the examples.
Example 1: Inserting & removing a filter
In this example we will have a decodebin, connected to a video sink with the navseek element. This allows us to watch any supported video file and seek with the cursor keys. Every 5 seconds a video effect filter will be inserted in front of the sink, or removed if it was inserted last time. All this without ever stopping playback or breaking because of seeking. The code is available here.
Setting up everything
From the pad-added callback we then connect the first video pad that is added on decodebin to the converter in front of the video sink. We also add our periodic 5 second timeout, which will insert/remove the filter here. After this point the pipeline will be PLAYING and the video will be shown.
The insertion/removal of the filter
The timeout callback is quite boring, nothing is happening here other than calling gst_pad_add_probe() to add an IDLE probe. And here we also initialize a variable that protects our probe callback from multiple concurrent calls. We use an IDLE probe here as we’re not interested in the data causing the callback call, and also just want to get the callback called as soon as possible, even from the current thread.
Now the actual insertion or removal of the filter happens in the probe callback. This is the actually interesting part. Here we first check if the callback was already called with an atomic operation, and afterwards either insert or remove the filter. In both cases we need to make sure that all elements are properly linked on their pads afterwards and have the appropriate states. We also have to insert a video convert in front of the filter to make sure that output of the decoder can be handled by our filter.
A slightly more advanced variant
And that’s already all to know about this case. A slightly more complex variant of this is also in gst-plugins-base. The main difference is that BLOCK probes are used here, and the filter is drained with an EOS event before it is replaced. This is done by first adding a BLOCK probe in front of the filter, then from the callback adding another one after the filter and then sending an EOS event to the filter. From the probe after the filter we pass through all data until the EOS event is received and only then remove the filter. This is done for the case that the filter has multiple buffers queued internally. BLOCK probes instead of IDLE probes are used here because we would otherwise potentially send the EOS event from the application’s main thread, which would then block until the EOS event arrived on the other side of the filter and the filter was removed.
Example 2: Adding & removing sinks
The second example also plays a video with decodebin, but randomly adds or removes another video sink every 3 seconds. This uses the tee element for duplicating the video stream. The code can be found here.
Setting up everything
In main() we set up the pipeline and link all parts we can already link, connect to the GstElement::pad-added signal of decodebin and then start a mainloop. Same as in the previous example. We don’t add a sink here yet.
From the pad-added callback we now link decodebin to the tee element, request a first srcpad from tee and link a first sink. This first sink is a fakesink (with sync=TRUE to play in realtime), and is always present. This makes sure that the video is always playing in realtime, even if we have no visible sinks currently. At the end of the callback we add our 3 seconds, periodic timer.
Addition of sinks
In the timeout callback we first get a random number to decide if we now add or remove a sink. If we add a new sink this is all done from the timeout callback (i.e. the application’s main thread) directly. We can do all this from the main thread and without pad probes because there’s no data flow to disrupt. The new tee srcpad is just created here and if tee pushes any buffer through it now it will just be dropped. For adding a sink we just request a new srcpad from the tee and link it to a queue, video converter and sink, sync all the states and remember that we added this sink. A queue is necessary after every tee srcpad because otherwise the tee will lock up (because all tee srcpads are served from a single thread).
Removal of sinks
Removal of sinks is a bit more complicated as now we have to block the relevant pad because there might be data flow happening just now. For this we add an IDLE probe and from the callback unlink and destroy the sink. Again we protect against multiple calls to the callback, and we pass our sink information structure to the callback to know which sink actually should be removed. Note here that we pass g_free() to gst_pad_add_probe() as destroy notify for the sink information structure and don’t free the memory from the callback. This is necessary because the callback can still be called after we released the sink, and we would access already freed memory then.
I hope this helps to understand how dynamic pipelines can be implemented with GStreamer. It should be easily possible to extend these example s to real, more complicated use cases. The concepts are the same in all cases.