{"id":139,"date":"2014-01-08T11:43:19","date_gmt":"2014-01-08T10:43:19","guid":{"rendered":"https:\/\/coaxion.net\/blog\/?p=139"},"modified":"2014-05-17T00:28:59","modified_gmt":"2014-05-16T22:28:59","slug":"gstreamer-dynamic-pipelines","status":"publish","type":"post","link":"https:\/\/coaxion.net\/blog\/2014\/01\/gstreamer-dynamic-pipelines\/","title":{"rendered":"GStreamer Dynamic Pipelines"},"content":{"rendered":"<p>Another recurring topic with <a href=\"http:\/\/gstreamer.freedesktop.org\" title=\"GStreamer\" target=\"_blank\">GStreamer<\/a> since a long time is how to build applications with dynamic pipelines. That is, pipelines in which elements are relinked while the pipeline is playing and without stopping the pipeline.<\/p>\n<p>So, let&#8217;s write a bit about it and explain how it all works.<\/p>\n<p>Note however that I&#8217;m not covering the most common and simple case here: a demuxer or decodebin adding pads when set to PLAYING, and then connecting to these pads. My example code does this however, but there&#8217;s enough documentation about this already.<\/p>\n<p>Also these two examples unfortunately need GStreamer 1.2.3 or newer because of some bugfixes.<\/p>\n<h3>The Theory<\/h3>\n<p>What&#8217;s difficult about dynamic pipelines? Why can&#8217;t you just relink elements and their pads at any time like you do when the pipeline is not running? Let&#8217;s consider the example of the plumbing in your house. If you want to change something there in the pipeline, you better make sure nothing is flowing through the pipes at that time or otherwise there will be a big mess \ud83d\ude42<\/p>\n<h4>Pad Probes<\/h4>\n<p>In GStreamer this is handled with the <a href=\"http:\/\/gstreamer.freedesktop.org\/data\/doc\/gstreamer\/head\/gstreamer\/html\/GstPad.html#gst-pad-add-probe\" title=\"gst_pad_add_probe()\" target=\"_blank\">pad probe mechanism<\/a>. Pad probes allow to register a <a href=\"http:\/\/gstreamer.freedesktop.org\/data\/doc\/gstreamer\/head\/gstreamer\/html\/GstPad.html#GstPadProbeCallback\" title=\"GstPadProbeCallback\" target=\"_blank\">callback<\/a> that is called when ever a specific condition is met. These conditions are expressed with <a href=\"http:\/\/gstreamer.freedesktop.org\/data\/doc\/gstreamer\/head\/gstreamer\/html\/GstPad.html#GstPadProbeType\" title=\"GstPadProbeType\" target=\"_blank\">a flags type<\/a>, and are e.g. <em>GST_PAD_PROBE_TYPE_BUFFER<\/em> for a buffer arriving at the pad or <em>GST_PAD_PROBE_TYPE_QUERY_UPSTREAM<\/em> for an upstream query. Additionally these flags specify the scheduling type (not so important), and can specify a blocking type: <em>GST_PAD_PROBE_TYPE_IDLE<\/em> and <em>GST_PAD_PROBE_TYPE_BLOCK<\/em>.<\/p>\n<p><em>gst_pad_add_probe()<\/em> adds a probe and returns an identifier, which can later be used to remove the probe again from the pad with <em>gst_pad_remove_probe()<\/em>.<\/p>\n<h4>The Callback<\/h4>\n<p>The probe callback is called whenever the condition is met. In this callback we get an <a href=\"http:\/\/gstreamer.freedesktop.org\/data\/doc\/gstreamer\/head\/gstreamer\/html\/GstPad.html#GstPadProbeInfo\" title=\"GstPadProbeInfo\" target=\"_blank\">info<\/a> structure passed, which contains the exact condition that caused the callback to be called and the data that is associated with this. This can be for example the current buffer, the current event or the current query.<\/p>\n<p>From the callback this data can be inspected but it&#8217;s also possible to replace the data stored in the info structure.<\/p>\n<p>Once everything we want to do is done inside the callback, the callback has to return a <a href=\"http:\/\/gstreamer.freedesktop.org\/data\/doc\/gstreamer\/head\/gstreamer\/html\/GstPad.html#GstPadProbeReturn\" title=\"GstPadProbeReturn\" target=\"_blank\">return value<\/a>. This specifies if the data should be passed on (<em>GST_PAD_PROBE_PASS<\/em>), should be dropped (<em>GST_PAD_PROBE_DROP<\/em>), the probe should be removed and the data should be passed (<em>GST_PAD_PROBE_REMOVE<\/em>) or the default action for this probe type should happen (<em>GST_PAD_PROBE_OK<\/em>, more on that later).<\/p>\n<p>Note that the callback can be called from an arbitrary thread, and especially is not guaranteed to be called from your main application thread. For all serialized events, buffers and queries it will be called from the corresponding streaming thread.<\/p>\n<p>Also it is important to keep in mind that the callback can be called multiple times (also at once), and that it can also still be called when returning <em>GST_PAD_PROBE_REMOVE<\/em> from it (another thread might&#8217;ve just called into it). It is the job of the callback to protect against that.<\/p>\n<h4>Blocking Types<\/h4>\n<p>The blocking types of the conditions are of further interest here. Without a blocking type the probe callback can be used to get notified whenever the condition is met, or intercept data flow or even modify events or buffers. That can also be very useful but not for our topic.<\/p>\n<p>Whenever one of the blocking types is specified in the condition, triggering the probe will cause the pad to be blocked. That means that the pad will not pass on any data related to the condition until the probe is removed (with <em>gst_pad_remove_probe()<\/em> or by returning <em>GST_PAD_PROBE_REMOVE<\/em>), unless <em>GST_PAD_PROBE_PASS<\/em> is returned from the callback. This guarantees that nothing else that matches the condition can pass and the callback can safely do it&#8217;s work. Especially if <em>GST_PAD_PROBE_TYPE_DATA_BOTH<\/em> is specified, no data flow can happen and downstream of the pad until the next queue can be safely relinked. To be able to relink parts after the next queues you additionally need to make sure that all data flow has finished until that point too, which can be done with further pad probes (see also the advanced variant of the first example).<\/p>\n<p>Probes with the <em>GST_PAD_PROBE_TYPE_IDLE<\/em> blocking type will be called the next time the pad is idle, i.e. there is no data flow happening currently. This can also happen immediately if <em>gst_pad_add_probe()<\/em> is called, directly from the thread that calls <em>gst_pad_add_probe()<\/em>. Or after the next buffer, event or query is handled.<\/p>\n<p>Probes with the <em>GST_PAD_PROBE_TYPE_BLOCK<\/em> blocking type will be called the next time the conditions match, and will block the pad before passing on the data. This allows to inspect the buffer, event or query that is currently pending for the pad while still blocking the pad from doing anything else.<\/p>\n<p>The main advantage of <em>GST_PAD_PROBE_TYPE_BLOCK<\/em> probes is that they provide the data that is currently pending, while the main advantage of <em>GST_PAD_PROBE_TYPE_IDLE<\/em> is that it is guaranteed to be called as soon as possible (independent of any data coming or not, there might not be any further data at all). It comes with the disadvantage that it might be called directly from the thread that calls <em>gst_pad_add_probe()<\/em> though. Depending on the use case, one or both of them should be chosen.<\/p>\n<p>Now to the examples.<\/p>\n<h3>Example 1: Inserting &amp; removing a filter<\/h3>\n<p>In this example we will have a decodebin, connected to a video sink with the navseek element. This allows us to watch any supported video file and seek with the cursor keys. Every 5 seconds a video effect filter will be inserted in front of the sink, or removed if it was inserted last time. All this without ever stopping playback or breaking because of seeking. The code is available <a href=\"https:\/\/github.com\/sdroege\/gst-snippets\/blob\/217ae015aaddfe3f7aa66ffc936ce93401fca04e\/dynamic-filter.c\" title=\"dynamic-filter.c\" target=\"_blank\">here<\/a>.<\/p>\n<h4>Setting up everything<\/h4>\n<p>In <a href=\"https:\/\/github.com\/sdroege\/gst-snippets\/blob\/217ae015aaddfe3f7aa66ffc936ce93401fca04e\/dynamic-filter.c#L209\" title=\"main()\" target=\"_blank\">main()<\/a> we set up the pipeline and link all parts we can already link, connect to the <a href=\"http:\/\/gstreamer.freedesktop.org\/data\/doc\/gstreamer\/head\/gstreamer\/html\/GstElement.html#GstElement-pad-added\" title=\"pad-added\" target=\"_blank\">GstElement::pad-added<\/a> signal of decodebin and then start a mainloop.<\/p>\n<p>From the <a href=\"https:\/\/github.com\/sdroege\/gst-snippets\/blob\/217ae015aaddfe3f7aa66ffc936ce93401fca04e\/dynamic-filter.c#L176\" title=\"pad_added_cb\" target=\"_blank\">pad-added callback<\/a> we then connect the first video pad that is added on decodebin to the converter in front of the video sink. We also add our periodic 5 second timeout, which will insert\/remove the filter here. After this point the pipeline will be PLAYING and the video will be shown.<\/p>\n<h4>The insertion\/removal of the filter<\/h4>\n<p>The <a href=\"https:\/\/github.com\/sdroege\/gst-snippets\/blob\/217ae015aaddfe3f7aa66ffc936ce93401fca04e\/dynamic-filter.c#L166\" title=\"timeout_cb\" target=\"_blank\">timeout callback<\/a> is quite boring, nothing is happening here other than calling <em>gst_pad_add_probe()<\/em> to add an <em>IDLE<\/em> probe. And here we also initialize a variable that protects our probe callback from multiple concurrent calls. We use an <em>IDLE<\/em> probe here as we&#8217;re not interested in the data causing the callback call, and also just want to get the callback called as soon as possible, even from the current thread.<\/p>\n<p>Now the actual insertion or removal of the filter happens in the <a href=\"https:\/\/github.com\/sdroege\/gst-snippets\/blob\/217ae015aaddfe3f7aa66ffc936ce93401fca04e\/dynamic-filter.c#L166\" title=\"pad_probe_cb\" target=\"_blank\">probe callback<\/a>. This is the actually interesting part. Here we first check if the callback was already called with an atomic operation, and afterwards either insert or remove the filter. In both cases we need to make sure that all elements are properly linked on their pads afterwards and have the appropriate states. We also have to insert a video convert in front of the filter to make sure that output of the decoder can be handled by our filter.<\/p>\n<h4>A slightly more advanced variant<\/h4>\n<p>And that&#8217;s already all to know about this case. A slightly more complex variant of this is also in <a href=\"http:\/\/cgit.freedesktop.org\/gstreamer\/gst-plugins-base\/tree\/tests\/icles\/test-effect-switch.c\" title=\"test-effect-switch.c\" target=\"_blank\">gst-plugins-base<\/a>. The main difference is that <em>BLOCK<\/em> probes are used here, and the filter is drained with an EOS event before it is replaced. This is done by first adding a <em>BLOCK<\/em> probe in front of the filter, then from the callback adding another one after the filter and then sending an EOS event to the filter. From the probe after the filter we pass through all data until the EOS event is received and only then remove the filter. This is done for the case that the filter has multiple buffers queued internally. <em>BLOCK<\/em> probes instead of <em>IDLE<\/em> probes are used here because we would otherwise potentially send the EOS event from the application&#8217;s main thread, which would then block until the EOS event arrived on the other side of the filter and the filter was removed.<\/p>\n<h3>Example 2: Adding &amp; removing sinks<\/h3>\n<p>The second example also plays a video with decodebin, but randomly adds or removes another video sink every 3 seconds. This uses the tee element for duplicating the video stream. The code can be found <a href=\"https:\/\/github.com\/sdroege\/gst-snippets\/blob\/217ae015aaddfe3f7aa66ffc936ce93401fca04e\/dynamic-tee-vsink.c\" title=\"dynamic-tee-vsink.c\" target=\"_blank\">here<\/a>.<\/p>\n<h4>Setting up everything<\/h4>\n<p>In <a href=\"https:\/\/github.com\/sdroege\/gst-snippets\/blob\/217ae015aaddfe3f7aa66ffc936ce93401fca04e\/dynamic-tee-vsink.c#L227\" title=\"main()\" target=\"_blank\">main()<\/a> we set up the pipeline and link all parts we can already link, connect to the <a href=\"http:\/\/gstreamer.freedesktop.org\/data\/doc\/gstreamer\/head\/gstreamer\/html\/GstElement.html#GstElement-pad-added\" title=\"pad-added\" target=\"_blank\">GstElement::pad-added<\/a> signal of decodebin and then start a mainloop. Same as in the previous example. We don&#8217;t add a sink here yet.<\/p>\n<p>From the <a href=\"https:\/\/github.com\/sdroege\/gst-snippets\/blob\/217ae015aaddfe3f7aa66ffc936ce93401fca04e\/dynamic-tee-vsink.c#L176\" title=\"pad_added_cb\" target=\"_blank\">pad-added callback<\/a> we now link decodebin to the tee element, request a first srcpad from tee and link a first sink. This first sink is a fakesink (with <em>sync=TRUE<\/em> to play in realtime), and is always present. This makes sure that the video is always playing in realtime, even if we have no visible sinks currently. At the end of the callback we add our 3 seconds, periodic timer.<\/p>\n<h4>Addition of sinks<\/h4>\n<p>In the <a href=\"https:\/\/github.com\/sdroege\/gst-snippets\/blob\/217ae015aaddfe3f7aa66ffc936ce93401fca04e\/dynamic-tee-vsink.c#L126\" title=\"tick_cb\" target=\"_blank\">timeout callback<\/a> we first get a random number to decide if we now add or remove a sink. If we add a new sink this is all done from the timeout callback (i.e. the application&#8217;s main thread) directly. We can do all this from the main thread and without pad probes because there&#8217;s no data flow to disrupt. The new tee srcpad is just created here and if tee pushes any buffer through it now it will just be dropped. For adding a sink we just request a new srcpad from the tee and link it to a queue, video converter and sink, sync all the states and remember that we added this sink. A queue is necessary after every tee srcpad because otherwise the tee will lock up (because all tee srcpads are served from a single thread).<\/p>\n<h4>Removal of sinks<\/h4>\n<p>Removal of sinks is a bit more complicated as now we have to block the relevant pad because there might be data flow happening just now. For this we add an <em>IDLE<\/em> probe and from the <a href=\"https:\/\/github.com\/sdroege\/gst-snippets\/blob\/217ae015aaddfe3f7aa66ffc936ce93401fca04e\/dynamic-tee-vsink.c#L93\" title=\"unlink_cb\" target=\"_blank\">callback<\/a> unlink and destroy the sink. Again we protect against multiple calls to the callback, and we pass our sink information structure to the callback to know which sink actually should be removed. Note here that we pass <em>g_free()<\/em> to <em>gst_pad_add_probe()<\/em> as destroy notify for the sink information structure and don&#8217;t free the memory from the callback. This is necessary because the callback can still be called after we released the sink, and we would access already freed memory then.<\/p>\n<p>I hope this helps to understand how dynamic pipelines can be implemented with GStreamer. It should be easily possible to extend these example s to real, more complicated use cases. The concepts are the same in all cases.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Another recurring topic with GStreamer since a long time is how to build applications with dynamic pipelines. That is, pipelines in which elements are relinked while the pipeline is playing and without stopping the pipeline. So, let&#8217;s write a bit about it and explain how it all works. Note however that I&#8217;m not covering the &hellip; <a href=\"https:\/\/coaxion.net\/blog\/2014\/01\/gstreamer-dynamic-pipelines\/\" class=\"more-link\">Continue reading <span class=\"screen-reader-text\">GStreamer Dynamic Pipelines<\/span><\/a><\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"jetpack_post_was_ever_published":false,"_jetpack_newsletter_access":"","_jetpack_dont_email_post_to_subs":false,"_jetpack_newsletter_tier_id":0,"_jetpack_memberships_contains_paywalled_content":false,"_jetpack_memberships_contains_paid_content":false,"footnotes":""},"categories":[3,5],"tags":[10,26],"class_list":["post-139","post","type-post","status-publish","format-standard","hentry","category-free-software","category-gstreamer","tag-gstreamer-2","tag-programming"],"jetpack_featured_media_url":"","jetpack_sharing_enabled":true,"_links":{"self":[{"href":"https:\/\/coaxion.net\/blog\/wp-json\/wp\/v2\/posts\/139","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/coaxion.net\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/coaxion.net\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/coaxion.net\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/coaxion.net\/blog\/wp-json\/wp\/v2\/comments?post=139"}],"version-history":[{"count":7,"href":"https:\/\/coaxion.net\/blog\/wp-json\/wp\/v2\/posts\/139\/revisions"}],"predecessor-version":[{"id":148,"href":"https:\/\/coaxion.net\/blog\/wp-json\/wp\/v2\/posts\/139\/revisions\/148"}],"wp:attachment":[{"href":"https:\/\/coaxion.net\/blog\/wp-json\/wp\/v2\/media?parent=139"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/coaxion.net\/blog\/wp-json\/wp\/v2\/categories?post=139"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/coaxion.net\/blog\/wp-json\/wp\/v2\/tags?post=139"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}