GLib/GIO async operations and Rust futures + async/await

Unfortunately I was not able to attend the Rust+GNOME hackfest in Madrid last week, but I could at least spend some of my work time at Centricular on implementing one of the things I wanted to work on during the hackfest. The other one, more closely related to the gnome-class work, will be the topic of a future blog post once I actually have something to show.

So back to the topic. With the latest GIT version of the Rust bindings for GLib, GTK, etc it is now possible to make use of the Rust futures infrastructure for GIO async operations and various other functions. This should make writing of GNOME, and in general GLib-using, applications in Rust quite a bit more convenient.

For the impatient, the summary is that you can use Rust futures with GLib and GIO now, that it works both on the stable and nightly version of the compiler, and with the nightly version of the compiler it is also possible to use async/await. An example with the latter can be found here, and an example just using futures without async/await here.

Table of Contents

  1. Futures
    1. Futures in Rust
    2. Async/Await
    3. Tokio
  2. Futures & GLib/GIO
    1. Callbacks
    2. GLib Futures
    3. GIO Asynchronous Operations
    4. Async/Await
  3. The Future

Futures

First of all, what are futures and how do they work in Rust. In a few words, a future (also called promise elsewhere) is a value that represents the result of an asynchronous operation, e.g. establishing a TCP connection. The operation itself (usually) runs in the background, and only once the operation is finished (or fails), the future resolves to the result of that operation. There are all kinds of ways to combine futures, e.g. to execute some other (potentially async) code with the result once the first operation has finished.

It’s a concept that is also widely used in various other programming languages (e.g. C#, JavaScript, Python, …) for asynchronous programming and can probably be considered a proven concept at this point.

Futures in Rust

In Rust, a future is basically an implementation of relatively simple trait called Future. The following is the definition as of now, but there are discussions to change/simplify/generalize it currently and to also move it to the Rust standard library:

pub trait Future {
    type Item;
    type Error;

    fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error>;
}

Anything that implements this trait can be considered an asynchronous operation that resolves to either an Item or an Error. Consumers of the future would call the poll method to check if the future has resolved already (to a result or error), or if the future is not ready yet. In case of the latter, the future itself would at a later point, once it is ready to proceed, notify the consumer about that. It would get a way for notifications from the Context that is passed, and proceeding does not necessarily mean that the future will resolve after this but it could just advance its internal state closer to the final resolution.

Calling poll manually is kind of inconvenient, so generally this is handled by an Executor on which the futures are scheduled and which is running them until their resolution. Equally, it’s inconvenient to have to implement that trait directly so for most common operations there are combinators that can be used on futures to build new futures, usually via closures in one way or another. For example the following would run the passed closure with the successful result of the future, and then have it return another future (Ok(()) is converted via IntoFuture to the future that always resolves successfully with ()), and also maps any errors to ()

fn our_future() -> impl Future<Item = (), Err = ()> {
    some_future
        .and_then(|res| {
            do_something(res);
            Ok(())
        })
        .map_err(|_| ())
}

A future represents only a single value, but there is also a trait for something producing multiple values: a Stream. For more details, best to check the documentation.

Async/Await

The above way of combining futures via combinators and closures is still not too great, and is still close to callback hell. In other languages (e.g. C#, JavaScript, Python, …) this was solved by introducing new features to the language: async for declaring futures with normal code flow, and await for suspending execution transparently and resuming at that point in the code with the result of a future.

Of course this was also implemented in Rust. Currently based on procedural macros, but there are discussions to actually move this also directly into the language and standard library.

The above example would look something like the following with the current version of the macros

#[async]
fn our_future() -> Result<(), ()> {
    let res = await!(some_future)
        .map_err(|_| ())?;

    do_something(res);
    Ok(())
}

This looks almost like normal, synchronous code but is internally converted into a future and completely asynchronous.

Unfortunately this is currently only available on the nightly version of Rust until various bits and pieces get stabilized.

Tokio

Most of the time when people talk about futures in Rust, they implicitly also mean Tokio. Tokio is a pure Rust, cross-platform asynchronous IO library and based on the futures abstraction above. It provides a futures executor and various types for asynchronous IO, e.g. sockets and socket streams.

But while Tokio is a great library, we’re not going to use it here and instead implement a futures executor around GLib. And on top of that implement various futures, also around GLib’s sister library GIO, which is providing lots of API for synchronous and asynchronous IO.

Just like all IO operations in Tokio, all GLib/GIO asynchronous operations are dependent on running with their respective event loop (i.e. the futures executor) and while it’s possible to use both in the same process, each operation has to be scheduled on the correct one.

Futures & GLib/GIO

Asynchronous operations and generally everything event related (timeouts, …) are based on callbacks that you have to register, and are running via a GMainLoop that is executing events from a GMainContext. The latter is just something that stores everything that is scheduled and provides API for polling if something is ready to be executed now, while the former does exactly that: executing.

Callbacks

The callback based API is also available via the Rust bindings, and would for example look as follows

glib::timeout_add(20, || {
    do_something_after_20ms();
    glib::Continue(false) // don't call again
});

glib::idle_add(|| {
    do_something_from_the_main_loop();
    glib::Continue(false) // don't call again
});

some_async_operation(|res| {
    match res {
        Err(err) => report_error_somehow(),
        Ok(res) => {
            do_something_with_result(res);
            some_other_async_operation(|res| {
                do_something_with_other_result(res);
            });
        }
    }
});

As can be seen here already, the callback-based approach leads to quite non-linear code and deep indentation due to all the closures. Also error handling becomes quite tricky due to somehow having handle them from a completely different call stack.

Compared to C this is still far more convenient due to actually having closures that can capture their environment, but we can definitely do better in Rust.

The above code also assumes that somewhere a main loop is running on the default main context, which could be achieved with the following e.g. inside main()

let ctx = glib::MainContext::default();
let l = glib::MainLoop::new(Some(&ctx), false);
ctx.push_thread_default();

// All operations here would be scheduled on this main context
do_things(&l);

// Run everything until someone calls l.quit()
l.run();
ctx.pop_thread_default();

It is also possible to explicitly select for various operations on which main context they should run, but that’s just a minor detail.

GLib Futures

To make this situation a bit nicer, I’ve implemented support for futures in the Rust bindings. This means, that the GLib MainContext is now a futures executor (and arbitrary futures can be scheduled on it), all the GSource related operations in GLib (timeouts, UNIX signals, …) have futures- or stream-based variants and all the GIO asynchronous operations also come with futures variants now. The latter are autogenerated with the gir bindings code generator.

For enabling usage of this, the futures feature of the glib and gio crates have to be enabled, but that’s about it. It is currently still hidden behind a feature gate because the futures infrastructure is still going to go through some API incompatible changes in the near future.

So let’s take a look at how to use it. First of all, setting up the main context and executing a trivial future on it

let c = glib::MainContext::default();
let l = glib::MainLoop::new(Some(&c), false);

c.push_thread_default();

// Spawn a future that is called from the main context
// and after printing something just quits the main loop
let l_clone = l.clone();
c.spawn(futures::lazy(move |_| {
    println!("we're called from the main context");
    l_clone.quit();
    Ok(())
});

l.run();

c.pop_thread_default();

Apart from spawn(), there is also a spawn_local(). The former can be called from any thread but requires the future to implement the Send trait (that is, it must be safe to send it to other threads) while the latter can only be called from the thread that owns the main context but it allows any kind of future to be spawned. In addition there is also a block_on() function on the main context, which allows to run non-static futures up to their completion and returns their result. The spawn functions only work with static futures (i.e. they have no references to any stack frame) and requires the futures to be infallible and resolve to ().

The above code already showed one of the advantages of using futures: it is possible to use all generic futures (that don’t require a specific executor), like futures::lazy or the mpsc/oneshot channels with GLib now. And any of the combinators that are available on futures

let c = MainContext::new();
                                                                                                       
let res = c.block_on(timeout_future(20)
    .and_then(move |_| {
        // Called after 20ms
        Ok(1)
    })
);

assert_eq!(res, Ok(1));

This example also shows the block_on functionality to return an actual value from the future (1 in this case).

GIO Asynchronous Operations

Similarly, all asynchronous GIO operations are now available as futures. For example to open a file asynchronously and getting a gio::InputStream to read from, the following could be done

let file = gio::File::new_for_path("Cargo.toml");

let l_clone = l.clone();
c.spawn_local(
    // Try to open the file
    file.read_async_future(glib::PRIORITY_DEFAULT)
        .map_err(|(_file, err)| {
            format!("Failed to open file: {}", err)
        })
        .and_then(move |(_file, strm)| {
            // Here we could now read from the stream, but
            // instead we just quit the main loop
            l_clone.quit();

            Ok(())
        })
);

A bigger example can be found in the gtk-rs examples repository here. This example is basically reading a file asynchronously in 64 byte chunks and printing it to stdout, then closing the file.

In the same way, network operations or any other asynchronous operation can be handled via futures now.

Async/Await

Compared to a callback-based approach, that bigger example is already a lot nicer but still quite heavy to read. With the async/await extension that I mentioned above already, the code looks much nicer in comparison and really almost like synchronous code. Except that it is not synchronous.

#[async]
fn read_file(file: gio::File) -> Result<(), String> {
    // Try to open the file
    let (_file, strm) = await!(file.read_async_future(glib::PRIORITY_DEFAULT))
        .map_err(|(_file, err)| format!("Failed to open file: {}", err))?;

    Ok(())
}

fn main() {
    [...]
    let future = async_block! {
        match await!(read_file(file)) {
            Ok(()) => (),
            Err(err) => eprintln!("Got error: {}", err),
        }
        l_clone.quit();
        Ok(())
    };

    c.spawn_local(future);
    [...]
}

For compiling this code, the futures-nightly feature has to be enabled for the glib crate, and a nightly compiler must be used.

The bigger example from before with async/await can be found here.

With this we’re already very close in Rust to having the same convenience as in other languages with asynchronous programming. And also it is very similar to what is possible in Vala with GIO asynchronous operations.

The Future

For now this is all finished and available from GIT of the glib and gio crates. This will have to be updated in the future whenever the futures API is changing, but it is planned to stabilize all this in Rust until the end of this year.

In the future it might also make sense to add futures variants for all the GObject signal handlers, so that e.g. handling a click on a GTK+ button could be done similarly from a future (or rather from a Stream as a signal can be emitted multiple times). If this is in the end more convenient than the callback-based approach that is currently used, is to be seen. Some experimentation would be necessary here. Also how to handle return values of signal handlers would have to be figured out.

Improving GStreamer performance on a high number of network streams by sharing threads between elements with Rust’s tokio crate

For one of our customers at Centricular we were working on a quite interesting project. Their use-case was basically to receive an as-high-as-possible number of audio RTP streams over UDP, transcode them, and then send them out via UDP again. Due to how GStreamer usually works, they were running into some performance issues.

This blog post will describe the first set of improvements that were implemented for this use-case, together with a minimal benchmark and the results. My colleague Mathieu will follow up with one or two other blog posts with the other improvements and a more full-featured benchmark.

The short version is that CPU usage decreased by about 65-75%, i.e. allowing 3-4x more streams with the same CPU usage. Also parallelization works better and usage of different CPU cores is more controllable, allowing for better scalability. And a fixed, but configurable number of threads is used, which is independent of the number of streams.

The code for this blog post can be found here.

Table of Contents

  1. GStreamer & Threads
  2. Thread-Sharing GStreamer Elements
  3. Available Elements
  4. Little Benchmark
  5. Conclusion

GStreamer & Threads

In GStreamer, by default each source is running from its own OS thread. Additionally, for receiving/sending RTP, there will be another thread in the RTP jitterbuffer, yet another thread for receiving RTCP (another source) and a last thread for sending RTCP at the right times. And RTCP has to be received and sent for the receiver and sender side part of the pipeline, so the number of threads doubles. In the sum this gives at least 1 + 1 + (1 + 1) * 2 = 6 threads per RTP stream in this scenario. In a normal audio scenario, there will be one packet received/sent e.g. every 20ms on each stream, and every now and then an RTCP packet. So most of the time all these threads are only waiting.

Apart from the obvious waste of OS resources (1000 streams would be 6000 threads), this also brings down performance as all the time threads are being woken up. This means that context switches have to happen basically all the time.

To solve this we implemented a mechanism to share threads, and in the end as a result we have a fixed, but configurable number of threads that is independent from the number of streams. And can run e.g. 500 streams just fine on a single thread with a single core, which was completely impossible before. In addition we also did some work to reduce the number of allocations for each packet, so that after startup no additional allocations happen per packet anymore for buffers. See Mathieu’s upcoming blog post for details.

In this blog post, I’m going to write about a generic mechanism for sources, queues and similar elements to share their threads between each other. For the RTP related bits (RTP jitterbuffer and RTCP timer) this was not used due to reuse of existing C codebases.

Thread-Sharing GStreamer Elements

The code in question can be found here, a small benchmark is in the examples directory and it is going to be used for the results later. A full-featured benchmark will come in Mathieu’s blog post.

This is a new GStreamer plugin, written in Rust and around the Tokio crate for asynchronous IO and generally a “task scheduler”.

While this could certainly also have been written in C around something like libuv, doing this kind of work in Rust is simply more productive and fun due to its safety guarantees and the strong type system, which definitely reduced the amount of debugging a lot. And in addition “modern” language features like closures, which make working with futures much more ergonomic.

When using these elements it is important to have full control over the pipeline and its elements, and the dataflow inside the pipeline has to be carefully considered to properly configure how to share threads. For example the following two restrictions should be kept in mind all the time:

  1. Downstream of such an element, the streaming thread must never ever block for considerable amounts of time. Otherwise all other elements inside the same thread-group would be blocked too, even if they could do any work now
  2. This generally all works better in live pipelines, where media is produced in real-time and not as fast as possible

Available Elements

So this repository currently contains the generic infrastructure (see the src/iocontext.rs source file) and a couple of elements:

  • an UDP source: ts-udpsrc, a replacement for udpsrc
  • an app source: ts-appsrc, a replacement for appsrc to inject packets into the pipeline from the application
  • a queue: ts-queue, a replacement for queue that is useful for adding buffering to a pipeline part. The upstream side of the queue will block if not called from another thread-sharing element, but if called from another thread-sharing element it will pause the current task asynchronously. That is, stop the upstream task from producing more data.
  • a proxysink/src element: ts-proxysrc, ts-proxysink, replacements for proxysink/proxysrc for connecting two pipelines with each other. This basically works like the queue, but split into two elements.
  • a tone generator source around spandsp: ts-tonesrc, a replacement for tonegeneratesrc. This also contains some minimal FFI bindings for that part of the spandsp C library.

All these elements have more or less the same API as their non-thread-sharing counterparts.

API-wise, each of these elements has a set of properties for controlling how it is sharing threads with other elements, and with which elements:

  • context: A string that defines in which group this element is. All elements with the same context are running on the same thread or group of threads,
  • context-threads: Number of threads to use in this context. -1 means exactly one thread, 1 and above used N+1 threads (1 thread for polling fds, N worker threads) and 0 sets N to the number of available CPU cores. As long as no considerable work is done in these threads, -1 has shown to be the most efficient. See also this tokio GitHub issue
  • context-wait: Number of milliseconds that the threads will wait on each iteration. This allows to reduce CPU usage even further by handling all events/packets that arrived during that timespan to be handled all at once instead of waking up the thread every time a little event happens, thus reducing context switches again

The elements are all pushing data downstream from a tokio thread whenever data is available, assuming that downstream does not block. If downstream is another thread-sharing element and it would have to block (e.g. a full queue), it instead returns a new future to upstream so that upstream can asynchronously wait on that future before producing more output. By this, back-pressure is implemented between different GStreamer elements without ever blocking any of the tokio threads. All this is implemented around the normal GStreamer data-flow mechanisms, there is no “tokio fast-path” between elements.

Little Benchmark

As mentioned above, there’s a small benchmark application in the examples directory. This basically sets up a configurable number of streams and directly connects them to a fakesink, throwing away all packets. Additionally there is another thread that is sending all these packets. As such, this is really the most basic benchmark and not very realistic but nonetheless it shows the same performance improvement as the real application. Again, see Mathieu’s upcoming blog post for a more realistic and complete benchmark.

When running it, make sure that your user can create enough fds. The benchmark will just abort if not enough fds can be allocated. You can control this with ulimit -n SOME_NUMBER, and allowing a couple of thousands is generally a good idea. The benchmarks below were running with 10000.

After running cargo build –release to build the plugin itself, you can run the benchmark with:

cargo run --release --example udpsrc-benchmark -- 1000 ts-udpsrc -1 1 20

and in another shell the UDP sender with

cargo run --release --example udpsrc-benchmark-sender -- 1000

This runs 1000 streams, uses ts-udpsrc (alternative would be udpsrc), configures exactly one thread -1, 1 context, and a wait time of 20ms. See above for what these settings mean. You can check CPU usage with e.g. top. Testing was done on an Intel i7-4790K, with Rust 1.25 and GStreamer 1.14. One packet is sent every 20ms for each stream.

Source Streams Threads Contexts Wait CPU
udpsrc 1000 1000 x x 44%
ts-udpsrc 1000 -1 1 0 18%
ts-udpsrc 1000 -1 1 20 13%
ts-udpsrc 1000 -1 2 20 15%
ts-udpsrc 1000 2 1 20 16%
ts-udpsrc 1000 2 2 20 27%
Source Streams Threads Contexts Wait CPU
udpsrc 2000 2000 x x 95%
ts-udpsrc 2000 -1 1 20 29%
ts-udpsrc 2000 -1 2 20 31%
Source Streams Threads Contexts Wait CPU
ts-udpsrc 3000 -1 1 20 36%
ts-udpsrc 3000 -1 2 20 47%

Results for 3000 streams for the old udpsrc are not included as starting up that many threads needs too long.

The best configuration is apparently a single thread per context (see this tokio GitHub issue) and waiting 20ms for every iterations. Compared to the old udpsrc, CPU usage is about one third in that setting, and generally it seems to parallelize well. It’s not clear to me why the last test has 11% more CPU with two contexts, while in every other test the number of contexts does not really make a difference, and also not for that many streams in the real test-case.

The waiting does not reduce CPU usage a lot in this benchmark, but on the real test-case it does. The reason is most likely that this benchmark basically sends all packets at once, then waits for the remaining time, then sends the next packets.

Take these numbers with caution, the real test-case in Mathieu’s blog post will show the improvements in the bigger picture, where it was generally a quarter of CPU usage and almost perfect parallelization when increasing the number of contexts.

Conclusion

Generally this was a fun exercise and we’re quite happy with the results, especially the real results. It took me some time to understand how tokio works internally so that I can implement all kinds of customizations on top of it, but for normal usage of tokio that should not be required and the overall design makes a lot of sense to me, as well as the way how futures are implemented in Rust. It requires some learning and understanding how exactly the API can be used and behaves, but once that point is reached it seems like a very productive and performant solution for asynchronous IO. And modelling asynchronous IO problems based on the Rust-style futures seems a nice and intuitive fit.

The performance measurements also showed that GStreamer’s default usage of threads is not always optimal, and a model like in upipe or pipewire (or rather SPA) can provide better performance. But as this also shows, it is possible to implement something like this on top of GStreamer and for the common case, using threads like in GStreamer reduces the cognitive load on the developer a lot.

For a future version of GStreamer, I don’t think we should make the threading “manual” like in these two other projects, but instead provide some API additions that make it nicer to implement thread-sharing elements and to add ways in the GStreamer core to make streaming threads non-blocking. All this can be implemented already, but it could be nicer.

All this “only” improved the number of threads, and thus the threading and context switching overhead. Many other optimizations in other areas are still possible on top of this, for example optimizing receive performance and reducing the number of memory copies inside the pipeline even further. If that’s something you would be interested in, feel free to get in touch.

And with that: Read Mathieu’s upcoming blog posts about the other parts, RTP jitterbuffer / RTCP timer thread sharing, and no allocations, and the full benchmark.

GStreamer Rust bindings 0.11 / plugin writing infrastructure 0.2 release

Following the GStreamer 1.14 release and the new round of gtk-rs releases, there are also new releases for the GStreamer Rust bindings (0.11) and the plugin writing infrastructure (0.2).

Thanks also to all the contributors for making these releases happen and adding lots of valuable changes and API additions.

GStreamer Rust Bindings

The main changes in the Rust bindings were the update to GStreamer 1.14 (which brings in quite some new API, like GstPromise), a couple of API additions (GstBufferPool specifically) and the addition of the GstRtspServer and GstPbutils crates. The former allows writing a full RTSP server in a couple of lines of code (with lots of potential for customizations), the latter provides access to the GstDiscoverer helper object that allows inspecting files and streams for their container format, codecs, tags and all kinds of other metadata.

The GstPbutils crate will also get other features added in the near future, like encoding profile bindings to allow using the encodebin GStreamer element (a helper element for automatically selecting/configuring encoders and muxers) from Rust.

But the biggest changes in my opinion is some refactoring that was done to the Event, Message and Query APIs. Previously you would have to use a view on a newly created query to be able to use the type-specific functions on it

let mut q = gst::Query::new_position(gst::Format::Time);
if pipeline.query(q.get_mut().unwrap()) {
    match q.view() {
        QueryView::Position(ref p) => Some(p.get_result()),
        _ => None,
    }
} else {
    None
}

Now you can directly use the type-specific functions on a newly created query

let mut q = gst::Query::new_position(gst::Format::Time);
if pipeline.query(&mut q) {
    Some(q.get_result())
} else {
    None
}

In addition, the views can now dereference directly to the event/message/query itself and provide access to their API, which simplifies some code even more.

Plugin Writing Infrastructure

While the plugin writing infrastructure did not see that many changes apart from a couple of bugfixes and updating to the new versions of everything else, this does not mean that development on it stalled. Quite the opposite. The existing code works very well already and there was just no need for adding anything new for the projects I and others did on top of it, most of the required API additions were in the GStreamer bindings.

So the status here is the same as last time, get started writing GStreamer plugins in Rust. It works well!

How to write GStreamer Elements in Rust Part 2: A raw audio sine wave source

A bit later than anticipated, this is now part two of the blog post series about writing GStreamer elements in Rust. Part one can be found here, and I’ll assume that everything written there is known already.

In this part, a raw audio sine wave source element is going to be written. It will be similar to the one Mathieu was writing in his blog post about writing such a GStreamer element in Python. Various details will be different though, but more about that later.

The final code can be found here.

Table of Contents

  1. Boilerplate
  2. Caps Negotiation
  3. Query Handling
  4. Buffer Creation
  5. (Pseudo) Live Mode
  6. Unlocking
  7. Seeking

Boilerplate

The first part here will be all the boilerplate required to set up the element. You can safely skip this if you remember all this from the previous blog post.

Our sine wave element is going to produce raw audio, with a number of channels and any possible sample rate with both 32 bit and 64 bit floating point samples. It will produce a simple sine wave with a configurable frequency, volume/mute and number of samples per audio buffer. In addition it will be possible to configure the element in (pseudo) live mode, meaning that it will only produce data in real-time according to the pipeline clock. And it will be possible to seek to any time/sample position on our source element. It will basically be a more simply version of the audiotestsrc element from gst-plugins-base.

So let’s get started with all the boilerplate. This time our element will be based on the BaseSrc base class instead of BaseTransform.

use glib;
use gst;
use gst::prelude::*;
use gst_base::prelude::*;
use gst_audio;

use byte_slice_cast::*;

use gst_plugin::properties::*;
use gst_plugin::object::*;
use gst_plugin::element::*;
use gst_plugin::base_src::*;

use std::{i32, u32};
use std::sync::Mutex;
use std::ops::Rem;

use num_traits::float::Float;
use num_traits::cast::NumCast;

// Default values of properties
const DEFAULT_SAMPLES_PER_BUFFER: u32 = 1024;
const DEFAULT_FREQ: u32 = 440;
const DEFAULT_VOLUME: f64 = 0.8;
const DEFAULT_MUTE: bool = false;
const DEFAULT_IS_LIVE: bool = false;

// Property value storage
#[derive(Debug, Clone, Copy)]
struct Settings {
    samples_per_buffer: u32,
    freq: u32,
    volume: f64,
    mute: bool,
    is_live: bool,
}

impl Default for Settings {
    fn default() -> Self {
        Settings {
            samples_per_buffer: DEFAULT_SAMPLES_PER_BUFFER,
            freq: DEFAULT_FREQ,
            volume: DEFAULT_VOLUME,
            mute: DEFAULT_MUTE,
            is_live: DEFAULT_IS_LIVE,
        }
    }
}

// Metadata for the properties
static PROPERTIES: [Property; 5] = [
    Property::UInt(
        "samples-per-buffer",
        "Samples Per Buffer",
        "Number of samples per output buffer",
        (1, u32::MAX),
        DEFAULT_SAMPLES_PER_BUFFER,
        PropertyMutability::ReadWrite,
    ),
    Property::UInt(
        "freq",
        "Frequency",
        "Frequency",
        (1, u32::MAX),
        DEFAULT_FREQ,
        PropertyMutability::ReadWrite,
    ),
    Property::Double(
        "volume",
        "Volume",
        "Output volume",
        (0.0, 10.0),
        DEFAULT_VOLUME,
        PropertyMutability::ReadWrite,
    ),
    Property::Boolean(
        "mute",
        "Mute",
        "Mute",
        DEFAULT_MUTE,
        PropertyMutability::ReadWrite,
    ),
    Property::Boolean(
        "is-live",
        "Is Live",
        "(Pseudo) live output",
        DEFAULT_IS_LIVE,
        PropertyMutability::ReadWrite,
    ),
];

// Stream-specific state, i.e. audio format configuration
// and sample offset
struct State {
    info: Option<gst_audio::AudioInfo>,
    sample_offset: u64,
    sample_stop: Option<u64>,
    accumulator: f64,
}

impl Default for State {
    fn default() -> State {
        State {
            info: None,
            sample_offset: 0,
            sample_stop: None,
            accumulator: 0.0,
        }
    }
}

// Struct containing all the element data
struct SineSrc {
    cat: gst::DebugCategory,
    settings: Mutex<Settings>,
    state: Mutex<State>,
}

impl SineSrc {
    // Called when a new instance is to be created
    fn new(element: &BaseSrc) -> Box<BaseSrcImpl<BaseSrc>> {
        // Initialize live-ness and notify the base class that
        // we'd like to operate in Time format
        element.set_live(DEFAULT_IS_LIVE);
        element.set_format(gst::Format::Time);

        Box::new(Self {
            cat: gst::DebugCategory::new(
                "rssinesrc",
                gst::DebugColorFlags::empty(),
                "Rust Sine Wave Source",
            ),
            settings: Mutex::new(Default::default()),
            state: Mutex::new(Default::default()),
        })
    }

    // Called exactly once when registering the type. Used for
    // setting up metadata for all instances, e.g. the name and
    // classification and the pad templates with their caps.
    //
    // Actual instances can create pads based on those pad templates
    // with a subset of the caps given here. In case of basesrc,
    // a "src" and "sink" pad template are required here and the base class
    // will automatically instantiate pads for them.
    //
    // Our element here can output f32 and f64
    fn class_init(klass: &mut BaseSrcClass) {
        klass.set_metadata(
            "Sine Wave Source",
            "Source/Audio",
            "Creates a sine wave",
            "Sebastian Dröge <sebastian@centricular.com>",
        );

        // On the src pad, we can produce F32/F64 with any sample rate
        // and any number of channels
        let caps = gst::Caps::new_simple(
            "audio/x-raw",
            &[
                (
                    "format",
                    &gst::List::new(&[
                        &gst_audio::AUDIO_FORMAT_F32.to_string(),
                        &gst_audio::AUDIO_FORMAT_F64.to_string(),
                    ]),
                ),
                ("layout", &"interleaved"),
                ("rate", &gst::IntRange::<i32>::new(1, i32::MAX)),
                ("channels", &gst::IntRange::<i32>::new(1, i32::MAX)),
            ],
        );
        // The src pad template must be named "src" for basesrc
        // and specific a pad that is always there
        let src_pad_template = gst::PadTemplate::new(
            "src",
            gst::PadDirection::Src,
            gst::PadPresence::Always,
            &caps,
        );
        klass.add_pad_template(src_pad_template);

        // Install all our properties
        klass.install_properties(&PROPERTIES);
    }
}

impl ObjectImpl<BaseSrc> for SineSrc {
    // Called whenever a value of a property is changed. It can be called
    // at any time from any thread.
    fn set_property(&self, obj: &glib::Object, id: u32, value: &glib::Value) {
        let prop = &PROPERTIES[id as usize];
        let element = obj.clone().downcast::<BaseSrc>().unwrap();

        match *prop {
            Property::UInt("samples-per-buffer", ..) => {
                let mut settings = self.settings.lock().unwrap();
                let samples_per_buffer = value.get().unwrap();
                gst_info!(
                    self.cat,
                    obj: &element,
                    "Changing samples-per-buffer from {} to {}",
                    settings.samples_per_buffer,
                    samples_per_buffer
                );
                settings.samples_per_buffer = samples_per_buffer;
                drop(settings);

                let _ =
                    element.post_message(&gst::Message::new_latency().src(Some(&element)).build());
            }
            Property::UInt("freq", ..) => {
                let mut settings = self.settings.lock().unwrap();
                let freq = value.get().unwrap();
                gst_info!(
                    self.cat,
                    obj: &element,
                    "Changing freq from {} to {}",
                    settings.freq,
                    freq
                );
                settings.freq = freq;
            }
            Property::Double("volume", ..) => {
                let mut settings = self.settings.lock().unwrap();
                let volume = value.get().unwrap();
                gst_info!(
                    self.cat,
                    obj: &element,
                    "Changing volume from {} to {}",
                    settings.volume,
                    volume
                );
                settings.volume = volume;
            }
            Property::Boolean("mute", ..) => {
                let mut settings = self.settings.lock().unwrap();
                let mute = value.get().unwrap();
                gst_info!(
                    self.cat,
                    obj: &element,
                    "Changing mute from {} to {}",
                    settings.mute,
                    mute
                );
                settings.mute = mute;
            }
            Property::Boolean("is-live", ..) => {
                let mut settings = self.settings.lock().unwrap();
                let is_live = value.get().unwrap();
                gst_info!(
                    self.cat,
                    obj: &element,
                    "Changing is-live from {} to {}",
                    settings.is_live,
                    is_live
                );
                settings.is_live = is_live;
            }
            _ => unimplemented!(),
        }
    }

    // Called whenever a value of a property is read. It can be called
    // at any time from any thread.
    fn get_property(&self, _obj: &glib::Object, id: u32) -> Result<glib::Value, ()> {
        let prop = &PROPERTIES[id as usize];

        match *prop {
            Property::UInt("samples-per-buffer", ..) => {
                let settings = self.settings.lock().unwrap();
                Ok(settings.samples_per_buffer.to_value())
            }
            Property::UInt("freq", ..) => {
                let settings = self.settings.lock().unwrap();
                Ok(settings.freq.to_value())
            }
            Property::Double("volume", ..) => {
                let settings = self.settings.lock().unwrap();
                Ok(settings.volume.to_value())
            }
            Property::Boolean("mute", ..) => {
                let settings = self.settings.lock().unwrap();
                Ok(settings.mute.to_value())
            }
            Property::Boolean("is-live", ..) => {
                let settings = self.settings.lock().unwrap();
                Ok(settings.is_live.to_value())
            }
            _ => unimplemented!(),
        }
    }
}

// Virtual methods of gst::Element. We override none
impl ElementImpl<BaseSrc> for SineSrc { }

impl BaseSrcImpl<BaseSrc> for SineSrc {
    // Called when starting, so we can initialize all stream-related state to its defaults
    fn start(&self, element: &BaseSrc) -> bool {
        // Reset state
        *self.state.lock().unwrap() = Default::default();

        gst_info!(self.cat, obj: element, "Started");

        true
    }

    // Called when shutting down the element so we can release all stream-related state
    fn stop(&self, element: &BaseSrc) -> bool {
        // Reset state
        *self.state.lock().unwrap() = Default::default();

        gst_info!(self.cat, obj: element, "Stopped");

        true
    }
}

struct SineSrcStatic;

// The basic trait for registering the type: This returns a name for the type and registers the
// instance and class initializations functions with the type system, thus hooking everything
// together.
impl ImplTypeStatic<BaseSrc> for SineSrcStatic {
    fn get_name(&self) -> &str {
        "SineSrc"
    }

    fn new(&self, element: &BaseSrc) -> Box<BaseSrcImpl<BaseSrc>> {
        SineSrc::new(element)
    }

    fn class_init(&self, klass: &mut BaseSrcClass) {
        SineSrc::class_init(klass);
    }
}

// Registers the type for our element, and then registers in GStreamer under
// the name "sinesrc" for being able to instantiate it via e.g.
// gst::ElementFactory::make().
pub fn register(plugin: &gst::Plugin) {
    let type_ = register_type(SineSrcStatic);
    gst::Element::register(plugin, "rssinesrc", 0, type_);
}

If any of this needs explanation, please see the previous blog post and the comments in the code. The explanation for all the structs fields and what they’re good for will follow in the next sections.

With all of the above and a small addition to src/lib.rs this should compile now.

mod sinesrc;
[...]

fn plugin_init(plugin: &gst::Plugin) -> bool {
    [...]
    sinesrc::register(plugin);
    true
}

Also a couple of new crates have to be added to Cargo.toml and src/lib.rs, but you best check the code in the repository for details.

Caps Negotiation

The first part that we have to implement, just like last time, is caps negotiation. We already notified the base class about any caps that we can potentially handle via the caps in the pad template in class_init but there are still two more steps of behaviour left that we have to implement.

First of all, we need to get notified whenever the caps that our source is configured for are changing. This will happen once in the very beginning and then whenever the pipeline topology or state changes and new caps would be more optimal for the new situation. This notification happens via the BaseTransform::set_caps virtual method.

    fn set_caps(&self, element: &BaseSrc, caps: &gst::CapsRef) -> bool {
        use std::f64::consts::PI;

        let info = match gst_audio::AudioInfo::from_caps(caps) {
            None => return false,
            Some(info) => info,
        };

        gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);

        element.set_blocksize(info.bpf() * (*self.settings.lock().unwrap()).samples_per_buffer);

        let settings = *self.settings.lock().unwrap();
        let mut state = self.state.lock().unwrap();

        // If we have no caps yet, any old sample_offset and sample_stop will be
        // in nanoseconds
        let old_rate = match state.info {
            Some(ref info) => info.rate() as u64,
            None => gst::SECOND_VAL,
        };

        // Update sample offset and accumulator based on the previous values and the
        // sample rate change, if any
        let old_sample_offset = state.sample_offset;
        let sample_offset = old_sample_offset
            .mul_div_floor(info.rate() as u64, old_rate)
            .unwrap();

        let old_sample_stop = state.sample_stop;
        let sample_stop =
            old_sample_stop.map(|v| v.mul_div_floor(info.rate() as u64, old_rate).unwrap());

        let accumulator =
            (sample_offset as f64).rem(2.0 * PI * (settings.freq as f64) / (info.rate() as f64));

        *state = State {
            info: Some(info),
            sample_offset: sample_offset,
            sample_stop: sample_stop,
            accumulator: accumulator,
        };

        drop(state);

        let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build());

        true
    }

In here we parse the caps into a AudioInfo and then store that in our internal state, while updating various fields. We tell the base class about the number of bytes each buffer is usually going to hold, and update our current sample position, the stop sample position (when a seek with stop position happens, we need to know when to stop) and our accumulator. This happens by scaling both positions by the old and new sample rate. If we don’t have an old sample rate, we assume nanoseconds (this will make more sense once seeking is implemented). The scaling is done with the help of the muldiv crate, which implements scaling of integer types by a fraction with protection against overflows by doing up to 128 bit integer arithmetic for intermediate values.

The accumulator is the updated based on the current phase of the sine wave at the current sample position.

As a last step we post a new LATENCY message on the bus whenever the sample rate has changed. Our latency (in live mode) is going to be the duration of a single buffer, but more about that later.

BaseSrc is by default already selecting possible caps for us, if there are multiple options. However these defaults might not be (and often are not) ideal and we should override the default behaviour slightly. This is done in the BaseSrc::fixate virtual method.

    fn fixate(&self, element: &BaseSrc, caps: gst::Caps) -> gst::Caps {
        // Fixate the caps. BaseSrc will do some fixation for us, but
        // as we allow any rate between 1 and MAX it would fixate to 1. 1Hz
        // is generally not a useful sample rate.
        //
        // We fixate to the closest integer value to 48kHz that is possible
        // here, and for good measure also decide that the closest value to 1
        // channel is good.
        let mut caps = gst::Caps::truncate(caps);
        {
            let caps = caps.make_mut();
            let s = caps.get_mut_structure(0).unwrap();
            s.fixate_field_nearest_int("rate", 48_000);
            s.fixate_field_nearest_int("channels", 1);
        }

        // Let BaseSrc fixate anything else for us. We could've alternatively have
        // called Caps::fixate() here
        element.parent_fixate(caps)
    }

Here we take the caps that are passed in, truncate them (i.e. remove all but the very first Structure) and then manually fixate the sample rate to the closest value to 48kHz. By default, caps fixation would result in the lowest possible sample rate but this is usually not desired.

For good measure, we also fixate the number of channels to the closest value to 1, but this would already be the default behaviour anyway. And then chain up to the parent class’ implementation of fixate, which for now basically does the same as Caps::fixate(). After this, the caps are fixated, i.e. there is only a single Structure left and all fields have concrete values (no ranges or sets).

Query Handling

As our source element will work by generating a new audio buffer from a specific offset, and especially works in Time format, we want to notify downstream elements that we don’t want to run in Pull mode, only in Push mode. In addition would prefer sequential reading. However we still allow seeking later. For a source that does not know about Time, e.g. a file source, the format would be configured as Bytes. Other values than Time and Bytes generally don’t make any sense.

The main difference here is that otherwise the base class would ask us to produce data for arbitrary Byte offsets, and we would have to produce data for that. While possible in our case, it’s a bit annoying and for other audio sources it’s not easily possible at all.

Downstream elements will try to query this very information from us, so we now have to override the default query handling of BaseSrc and handle the SCHEDULING query differently. Later we will also handle other queries differently.

    fn query(&self, element: &BaseSrc, query: &mut gst::QueryRef) -> bool {
        use gst::QueryView;

        match query.view_mut() {
            // We only work in Push mode. In Pull mode, create() could be called with
            // arbitrary offsets and we would have to produce for that specific offset
            QueryView::Scheduling(ref mut q) => {
                q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0);
                q.add_scheduling_modes(&[gst::PadMode::Push]);
                return true;
            }
            _ => (),
        }
        BaseSrcBase::parent_query(element, query)
    }

To handle the SCHEDULING query specifically, we first have to match on a view (mutable because we want to modify the view) of the query check the type of the query. If it indeed is a scheduling query, we can set the SEQUENTIAL flag and specify that we handle only Push mode, then return true directly as we handled the query already.

In all other cases we fall back to the parent class’ implementation of the query virtual method.

Buffer Creation

Now we have everything in place for a working element, apart from the virtual method to actually generate the raw audio buffers with the sine wave. From a high-level BaseSrc works by calling the create virtual method over and over again to let the subclass produce a buffer until it returns an error or signals the end of the stream.

Let’s first talk about how to generate the sine wave samples themselves. As we want to operate on 32 bit and 64 bit floating point numbers, we implement a generic function for generating samples and storing them in a mutable byte slice. This is done with the help of the num_traits crate, which provides all kinds of useful traits for abstracting over numeric types. In our case we only need the Float and NumCast traits.

Instead of writing a generic implementation with those traits, it would also be possible to do the same with a simple macro that generates a function for both types. Which approach is nicer is a matter of taste in the end, the compiler output should be equivalent for both cases.

    fn process<F: Float + FromByteSlice>(
        data: &mut [u8],
        accumulator_ref: &mut f64,
        freq: u32,
        rate: u32,
        channels: u32,
        vol: f64,
    ) {
        use std::f64::consts::PI;

        // Reinterpret our byte-slice as a slice containing elements of the type
        // we're interested in. GStreamer requires for raw audio that the alignment
        // of memory is correct, so this will never ever fail unless there is an
        // actual bug elsewhere.
        let data = data.as_mut_slice_of::<F>().unwrap();

        // Convert all our parameters to the target type for calculations
        let vol: F = NumCast::from(vol).unwrap();
        let freq = freq as f64;
        let rate = rate as f64;
        let two_pi = 2.0 * PI;

        // We're carrying a accumulator with up to 2pi around instead of working
        // on the sample offset. High sample offsets cause too much inaccuracy when
        // converted to floating point numbers and then iterated over in 1-steps
        let mut accumulator = *accumulator_ref;
        let step = two_pi * freq / rate;

        for chunk in data.chunks_mut(channels as usize) {
            let value = vol * F::sin(NumCast::from(accumulator).unwrap());
            for sample in chunk {
                *sample = value;
            }

            accumulator += step;
            if accumulator >= two_pi {
                accumulator -= two_pi;
            }
        }

        *accumulator_ref = accumulator;
    }

This function takes the mutable byte slice from our buffer as argument, as well as the current value of the accumulator and the relevant settings for generating the sine wave.

As a first step, we “cast” the byte slice to one of the target type (f32 or f64) with the help of the byte_slice_cast crate. This ensures that alignment and sizes are all matching and returns a mutable slice of our target type if successful. In case of GStreamer, the buffer alignment is guaranteed to be big enough for our types here and we allocate the buffer of a correct size later.

Now we convert all the parameters to the types we will use later, and store them together with the current accumulator value in local variables. Then we iterate over the whole floating point number slice in chunks with all channels, and fill each channel with the current value of our sine wave.

The sine wave itself is calculated by val = volume * sin(2 * PI * frequency * (i + accumulator) / rate), but we actually calculate it by simply increasing the accumulator by 2 * PI * frequency / rate for every sample instead of doing the multiplication for each sample. We also make sure that the accumulator always stays between 0 and 2 * PI to prevent any inaccuracies from floating point numbers to affect our produced samples.

Now that this is done, we need to implement the BaseSrc::create virtual method for actually allocating the buffer, setting timestamps and other metadata and it and calling our above function.

    fn create(
        &self,
        element: &BaseSrc,
        _offset: u64,
        _length: u32,
    ) -> Result<gst::Buffer, gst::FlowReturn> {
        // Keep a local copy of the values of all our properties at this very moment. This
        // ensures that the mutex is never locked for long and the application wouldn't
        // have to block until this function returns when getting/setting property values
        let settings = *self.settings.lock().unwrap();

        // Get a locked reference to our state, i.e. the input and output AudioInfo
        let mut state = self.state.lock().unwrap();
        let info = match state.info {
            None => {
                gst_element_error!(element, gst::CoreError::Negotiation, ["Have no caps yet"]);
                return Err(gst::FlowReturn::NotNegotiated);
            }
            Some(ref info) => info.clone(),
        };

        // If a stop position is set (from a seek), only produce samples up to that
        // point but at most samples_per_buffer samples per buffer
        let n_samples = if let Some(sample_stop) = state.sample_stop {
            if sample_stop <= state.sample_offset {
                gst_log!(self.cat, obj: element, "At EOS");
                return Err(gst::FlowReturn::Eos);
            }

            sample_stop - state.sample_offset
        } else {
            settings.samples_per_buffer as u64
        };

        // Allocate a new buffer of the required size, update the metadata with the
        // current timestamp and duration and then fill it according to the current
        // caps
        let mut buffer =
            gst::Buffer::with_size((n_samples as usize) * (info.bpf() as usize)).unwrap();
        {
            let buffer = buffer.get_mut().unwrap();

            // Calculate the current timestamp (PTS) and the next one,
            // and calculate the duration from the difference instead of
            // simply the number of samples to prevent rounding errors
            let pts = state
                .sample_offset
                .mul_div_floor(gst::SECOND_VAL, info.rate() as u64)
                .unwrap()
                .into();
            let next_pts: gst::ClockTime = (state.sample_offset + n_samples)
                .mul_div_floor(gst::SECOND_VAL, info.rate() as u64)
                .unwrap()
                .into();
            buffer.set_pts(pts);
            buffer.set_duration(next_pts - pts);

            // Map the buffer writable and create the actual samples
            let mut map = buffer.map_writable().unwrap();
            let data = map.as_mut_slice();

            if info.format() == gst_audio::AUDIO_FORMAT_F32 {
                Self::process::<f32>(
                    data,
                    &mut state.accumulator,
                    settings.freq,
                    info.rate(),
                    info.channels(),
                    settings.volume,
                );
            } else {
                Self::process::<f64>(
                    data,
                    &mut state.accumulator,
                    settings.freq,
                    info.rate(),
                    info.channels(),
                    settings.volume,
                );
            }
        }
        state.sample_offset += n_samples;
        drop(state);

        gst_debug!(self.cat, obj: element, "Produced buffer {:?}", buffer);

        Ok(buffer)
    }

Just like last time, we start with creating a copy of our properties (settings) and keeping a mutex guard of the internal state around. If the internal state has no AudioInfo yet, we error out. This would mean that no caps were negotiated yet, which is something we can’t handle and is not really possible in our case.

Next we calculate how many samples we have to generate. If a sample stop position was set by a seek event, we have to generate samples up to at most that point. Otherwise we create at most the number of samples per buffer that were set via the property. Then we allocate a buffer of the corresponding size, with the help of the bpf field of the AudioInfo, and then set its metadata and fill the samples.

The metadata that is set is the timestamp (PTS), and the duration. The duration is calculated from the difference of the following buffer’s timestamp and the current buffer’s. By this we ensure that rounding errors are not causing the next buffer’s timestamp to have a different timestamp than the sum of the current’s and its duration. While this would not be much of a problem in GStreamer (inaccurate and jitterish timestamps are handled just fine), we can prevent it here and do so.

Afterwards we call our previously defined function on the writably mapped buffer and fill it with the sample values.

With all this, the element should already work just fine in any GStreamer-based application, for example gst-launch-1.0. Don’t forget to set the GST_PLUGIN_PATH environment variable correctly like last time. Before running this, make sure to turn down the volume of your speakers/headphones a bit.

export GST_PLUGIN_PATH=<code>{{EJS13}}</code>/target/debug
gst-launch-1.0 rssinesrc freq=440 volume=0.9 ! audioconvert ! autoaudiosink

You should hear a 440Hz sine wave now.

(Pseudo) Live Mode

Many audio (and video) sources can actually only produce data in real-time and data is produced according to some clock. So far our source element can produce data as fast as downstream is consuming data, but we optionally can change that. We simulate a live source here now by waiting on the pipeline clock, but with a real live source you would only ever be able to have the data in real-time without any need to wait on a clock. And usually that data is produced according to a different clock than the pipeline clock, in which case translation between the two clocks is needed but we ignore this aspect for now. For details check the GStreamer documentation.

For working in live mode, we have to add a few different parts in various places. First of all, we implement waiting on the clock in the create function.

    fn create(...
        [...]
        state.sample_offset += n_samples;
        drop(state);

        // If we're live, we are waiting until the time of the last sample in our buffer has
        // arrived. This is the very reason why we have to report that much latency.
        // A real live-source would of course only allow us to have the data available after
        // that latency, e.g. when capturing from a microphone, and no waiting from our side
        // would be necessary..
        //
        // Waiting happens based on the pipeline clock, which means that a real live source
        // with its own clock would require various translations between the two clocks.
        // This is out of scope for the tutorial though.
        if element.is_live() {
            let clock = match element.get_clock() {
                None => return Ok(buffer),
                Some(clock) => clock,
            };

            let segment = element
                .get_segment()
                .downcast::<gst::format::Time>()
                .unwrap();
            let base_time = element.get_base_time();
            let running_time = segment.to_running_time(buffer.get_pts() + buffer.get_duration());

            // The last sample's clock time is the base time of the element plus the
            // running time of the last sample
            let wait_until = running_time + base_time;
            if wait_until.is_none() {
                return Ok(buffer);
            }

            let id = clock.new_single_shot_id(wait_until).unwrap();

            gst_log!(
                self.cat,
                obj: element,
                "Waiting until {}, now {}",
                wait_until,
                clock.get_time()
            );
            let (res, jitter) = id.wait();
            gst_log!(
                self.cat,
                obj: element,
                "Waited res {:?} jitter {}",
                res,
                jitter
            );
        }

        gst_debug!(self.cat, obj: element, "Produced buffer {:?}", buffer);

        Ok(buffer)
    }

To be able to wait on the clock, we first of all need to calculate the clock time until when we want to wait. In our case that will be the clock time right after the end of the last sample in the buffer we just produced. Simply because you can’t capture a sample before it was produced.

We calculate the running time from the PTS and duration of the buffer with the help of the currently configured segment and then add the base time of the element on this to get the clock time as result. Please check the GStreamer documentation for details, but in short the running time of a pipeline is the time since the start of the pipeline (or the last reset of the running time) and the running time of a buffer can be calculated from its PTS and the segment, which provides the information to translate between the two. The base time is the clock time when the pipeline went to the Playing state, so just an offset.

Next we wait and then return the buffer as before.

Now we also have to tell the base class that we’re running in live mode now. This is done by calling set_live(true) on the base class before changing the element state from Ready to Paused. For this we override the Element::change_state virtual method.

impl ElementImpl<BaseSrc> for SineSrc {
    fn change_state(
        &self,
        element: &BaseSrc,
        transition: gst::StateChange,
    ) -> gst::StateChangeReturn {
        // Configure live'ness once here just before starting the source
        match transition {
            gst::StateChange::ReadyToPaused => {
                element.set_live(self.settings.lock().unwrap().is_live);
            }
            _ => (),
        }

        element.parent_change_state(transition)
    }
}

And as a last step, we also need to notify downstream elements about our latency. Live elements always have to report their latency so that synchronization can work correctly. As the clock time of each buffer is equal to the time when it was created, all buffers would otherwise arrive late in the sinks (they would appear as if they should’ve been played already at the time when they were created). So all the sinks will have to compensate for the latency that it took from capturing to the sink, and they have to do that in a coordinated way (otherwise audio and video would be out of sync if both have different latencies). For this the pipeline is querying each sink for the latency on its own branch, and then configures a global latency on all sinks according to that.

This querying is done with the LATENCY query, which we will now also have to handle.

    fn query(&self, element: &BaseSrc, query: &mut gst::QueryRef) -> bool {
        use gst::QueryView;

        match query.view_mut() {
            // We only work in Push mode. In Pull mode, create() could be called with
            // arbitrary offsets and we would have to produce for that specific offset
            QueryView::Scheduling(ref mut q) => {
                [...]
            }
            // In Live mode we will have a latency equal to the number of samples in each buffer.
            // We can't output samples before they were produced, and the last sample of a buffer
            // is produced that much after the beginning, leading to this latency calculation
            QueryView::Latency(ref mut q) => {
                let settings = *self.settings.lock().unwrap();
                let state = self.state.lock().unwrap();

                if let Some(ref info) = state.info {
                    let latency = gst::SECOND
                        .mul_div_floor(settings.samples_per_buffer as u64, info.rate() as u64)
                        .unwrap();
                    gst_debug!(self.cat, obj: element, "Returning latency {}", latency);
                    q.set(settings.is_live, latency, gst::CLOCK_TIME_NONE);
                    return true;
                } else {
                    return false;
                }
            }
            _ => (),
        }
        BaseSrcBase::parent_query(element, query)
    }

The latency that we report is the duration of a single audio buffer, because we’re simulating a real live source here. A real live source won’t be able to output the buffer before the last sample of it is captured, and the difference between when the first and last sample were captured is exactly the latency that we add here. Other elements further downstream that introduce further latency would then add their own latency on top of this.

Inside the latency query we also signal that we are indeed a live source, and additionally how much buffering we can do (in our case, infinite) until data would be lost. The last part is important if e.g. the video branch has a higher latency, causing the audio sink to have to wait some additional time (so that audio and video stay in sync), which would then require the whole audio branch to buffer some data. As we have an artificial live source, we can always generate data for the next time but a real live source would only have a limited buffer and if no data is read and forwarded once that runs full, data would get lost.

You can test this again with e.g. gst-launch-1.0 by setting the is-live property to true. It should write in the output now that the pipeline is live.

In Mathieu’s blog post this was implemented without explicit waiting and the usage of the get_times virtual method, but as this is only really useful for pseudo live sources like this one I decided to explain how waiting on the clock can be achieved correctly and even more important how that relates to the next section.

Unlocking

With the addition of the live mode, the create function is now blocking and waiting on the clock for some time. This is suboptimal as for example a (flushing) seek would have to wait now until the clock waiting is done, or when shutting down the application would have to wait.

To prevent this, all waiting/blocking in GStreamer streaming threads should be interruptible/cancellable when requested. And for example the ClockID that we got from the clock for waiting can be cancelled by calling unschedule() on it. We only have to do it from the right place and keep it accessible. The right place is the BaseSrc::unlock virtual method.

struct ClockWait {
    clock_id: Option<gst::ClockId>,
    flushing: bool,
}

struct SineSrc {
    cat: gst::DebugCategory,
    settings: Mutex<Settings>,
    state: Mutex<State>,
    clock_wait: Mutex<ClockWait>,
}

[...]

    fn unlock(&self, element: &BaseSrc) -> bool {
        // This should unblock the create() function ASAP, so we
        // just unschedule the clock it here, if any.
        gst_debug!(self.cat, obj: element, "Unlocking");
        let mut clock_wait = self.clock_wait.lock().unwrap();
        if let Some(clock_id) = clock_wait.clock_id.take() {
            clock_id.unschedule();
        }
        clock_wait.flushing = true;

        true
    }

We store the clock ID in our struct, together with a boolean to signal whether we’re supposed to flush already or not. And then inside unlock unschedule the clock ID and set this boolean flag to true.

Once everything is unlocked, we need to reset things again so that data flow can happen in the future. This is done in the unlock_stop virtual method.

    fn unlock_stop(&self, element: &BaseSrc) -> bool {
        // This signals that unlocking is done, so we can reset
        // all values again.
        gst_debug!(self.cat, obj: element, "Unlock stop");
        let mut clock_wait = self.clock_wait.lock().unwrap();
        clock_wait.flushing = false;

        true
    }

To make sure that this struct is always initialized correctly, we also call unlock from stop, and unlock_stop from start.

Now as a last step, we need to actually make use of the new struct we added around the code where we wait for the clock.

            // Store the clock ID in our struct unless we're flushing anyway.
            // This allows to asynchronously cancel the waiting from unlock()
            // so that we immediately stop waiting on e.g. shutdown.
            let mut clock_wait = self.clock_wait.lock().unwrap();
            if clock_wait.flushing {
                gst_debug!(self.cat, obj: element, "Flushing");
                return Err(gst::FlowReturn::Flushing);
            }

            let id = clock.new_single_shot_id(wait_until).unwrap();
            clock_wait.clock_id = Some(id.clone());
            drop(clock_wait);

            gst_log!(
                self.cat,
                obj: element,
                "Waiting until {}, now {}",
                wait_until,
                clock.get_time()
            );
            let (res, jitter) = id.wait();
            gst_log!(
                self.cat,
                obj: element,
                "Waited res {:?} jitter {}",
                res,
                jitter
            );
            self.clock_wait.lock().unwrap().clock_id.take();

            // If the clock ID was unscheduled, unlock() was called
            // and we should return Flushing immediately.
            if res == gst::ClockReturn::Unscheduled {
                gst_debug!(self.cat, obj: element, "Flushing");
                return Err(gst::FlowReturn::Flushing);
            }

The important part in this code is that we first have to check if we are already supposed to unlock, before even starting to wait. Otherwise we would start waiting without anybody ever being able to unlock. Then we need to store the clock id in the struct and make sure to drop the mutex guard so that the unlock function can take it again for unscheduling the clock ID. And once waiting is done, we need to remove the clock id from the struct again and in case of ClockReturn::Unscheduled we directly return FlowReturn::Flushing instead of the error.

Similarly when using other blocking APIs it is important that they are woken up in a similar way when unlock is called. Otherwise the application developer’s and thus user experience will be far from ideal.

Seeking

As a last feature we implement seeking on our source element. In our case that only means that we have to update the sample_offset and sample_stop fields accordingly, other sources might have to do more work than that.

Seeking is implemented in the BaseSrc::do_seek virtual method, and signalling whether we can actually seek in the is_seekable virtual method.

    fn is_seekable(&self, _element: &BaseSrc) -> bool {
        true
    }

    fn do_seek(&self, element: &BaseSrc, segment: &mut gst::Segment) -> bool {
        // Handle seeking here. For Time and Default (sample offset) seeks we can
        // do something and have to update our sample offset and accumulator accordingly.
        //
        // Also we should remember the stop time (so we can stop at that point), and if
        // reverse playback is requested. These values will all be used during buffer creation
        // and for calculating the timestamps, etc.

        if segment.get_rate() < 0.0 {
            gst_error!(self.cat, obj: element, "Reverse playback not supported");
            return false;
        }

        let settings = *self.settings.lock().unwrap();
        let mut state = self.state.lock().unwrap();

        // We store sample_offset and sample_stop in nanoseconds if we
        // don't know any sample rate yet. It will be converted correctly
        // once a sample rate is known.
        let rate = match state.info {
            None => gst::SECOND_VAL,
            Some(ref info) => info.rate() as u64,
        };

        if let Some(segment) = segment.downcast_ref::<gst::format::Time>() {
            use std::f64::consts::PI;

            let sample_offset = segment
                .get_start()
                .unwrap()
                .mul_div_floor(rate, gst::SECOND_VAL)
                .unwrap();

            let sample_stop = segment
                .get_stop()
                .map(|v| v.mul_div_floor(rate, gst::SECOND_VAL).unwrap());

            let accumulator =
                (sample_offset as f64).rem(2.0 * PI * (settings.freq as f64) / (rate as f64));

            gst_debug!(
                self.cat,
                obj: element,
                "Seeked to {}-{:?} (accum: {}) for segment {:?}",
                sample_offset,
                sample_stop,
                accumulator,
                segment
            );

            *state = State {
                info: state.info.clone(),
                sample_offset: sample_offset,
                sample_stop: sample_stop,
                accumulator: accumulator,
            };

            true
        } else if let Some(segment) = segment.downcast_ref::<gst::format::Default>() {
            use std::f64::consts::PI;

            if state.info.is_none() {
                gst_error!(
                    self.cat,
                    obj: element,
                    "Can only seek in Default format if sample rate is known"
                );
                return false;
            }

            let sample_offset = segment.get_start().unwrap();
            let sample_stop = segment.get_stop().0;

            let accumulator =
                (sample_offset as f64).rem(2.0 * PI * (settings.freq as f64) / (rate as f64));

            gst_debug!(
                self.cat,
                obj: element,
                "Seeked to {}-{:?} (accum: {}) for segment {:?}",
                sample_offset,
                sample_stop,
                accumulator,
                segment
            );

            *state = State {
                info: state.info.clone(),
                sample_offset: sample_offset,
                sample_stop: sample_stop,
                accumulator: accumulator,
            };

            true
        } else {
            gst_error!(
                self.cat,
                obj: element,
                "Can't seek in format {:?}",
                segment.get_format()
            );

            false
        }
    }

Currently no support for reverse playback is implemented here, that is left as an exercise for the reader. So as a first step we check if the segment has a negative rate, in which case we just fail and return false.

Afterwards we again take a copy of the settings, keep a mutable mutex guard of our state and then start handling the actual seek.

If no caps are known yet, i.e. the AudioInfo is None, we assume a rate of 1 billion. That is, we just store the time in nanoseconds for now and let the set_caps function take care of that (which we already implemented accordingly) once the sample rate is known.

Then, if a Time seek is performed, we convert the segment start and stop position from time to sample offsets and save them. And then update the accumulator in a similar way as in the set_caps function. If a seek is in Default format (i.e. sample offsets for raw audio), we just have to store the values and update the accumulator but only do so if the sample rate is known already. A sample offset seek does not make any sense until the sample rate is known, so we just fail here to prevent unexpected surprises later.

Try the following pipeline for testing seeking. You should be able to seek the current time drawn over the video, and with the left/right cursor key you can seek. Also this shows that we create a quite nice sine wave.

gst-launch-1.0 rssinesrc ! audioconvert ! monoscope ! timeoverlay ! navseek ! glimagesink

And with that all features are implemented in our sine wave raw audio source.

Speeding up RGB to grayscale conversion in Rust by a factor of 2.2 – and various other multimedia related processing loops

In the previous blog post I wrote about how to write a RGB to grayscale conversion filter for GStreamer in Rust. In this blog post I’m going to write about how to optimize the processing loop of that filter, without resorting to unsafe code or SIMD instructions by staying with plain, safe Rust code.

I also tried to implement the processing loop with faster, a Rust crate for writing safe SIMD code. It looks very promising, but unless I missed something in the documentation it currently is missing some features to be able to express this specific algorithm in a meaningful way. Once it works on stable Rust (waiting for SIMD to be stabilized) and includes runtime CPU feature detection, this could very well be a good replacement for the ORC library used for the same purpose in GStreamer in various places. ORC works by JIT-compiling a minimal “array operation language” to SIMD assembly for your specific CPU (and has support for x86 MMX/SSE, PPC Altivec, ARM NEON, etc.).

If someone wants to prove me wrong and implement this with faster, feel free to do so and I’ll link to your solution and include it in the benchmark results below.

All code below can be found in this GIT repository.

Table of Contents

  1. Baseline Implementation
  2. First Optimization – Assertions
  3. First Optimization – Assertions Try 2
  4. Second Optimization – Iterate a bit more
  5. Third Optimization – Getting rid of the bounds check finally
  6. Summary
  7. Addendum: slice::split_at
  8. Addendum 2: SIMD with faster

Baseline Implementation

This is how the baseline implementation looks like.

pub fn bgrx_to_gray_chunks_no_asserts(
    in_data: &[u8],
    out_data: &mut [u8],
    in_stride: usize,
    out_stride: usize,
    width: usize,
) {
    let in_line_bytes = width * 4;
    let out_line_bytes = width * 4;

    for (in_line, out_line) in in_data
        .chunks(in_stride)
        .zip(out_data.chunks_mut(out_stride))
    {
        for (in_p, out_p) in in_line[..in_line_bytes]
            .chunks(4)
            .zip(out_line[..out_line_bytes].chunks_mut(4))
        {
            let b = u32::from(in_p[0]);
            let g = u32::from(in_p[1]);
            let r = u32::from(in_p[2]);
            let x = u32::from(in_p[3]);

            let grey = ((r * RGB_Y[0]) + (g * RGB_Y[1]) + (b * RGB_Y[2]) + (x * RGB_Y[3])) / 65536;
            let grey = grey as u8;
            out_p[0] = grey;
            out_p[1] = grey;
            out_p[2] = grey;
            out_p[3] = grey;
        }
    }
}

This basically iterates over each line of the input and output frame (outer loop), and then for each BGRx chunk of 4 bytes in each line it converts the values to u32, multiplies with a constant array, converts back to u8 and stores the same value in the whole output BGRx chunk.

Note: This is only doing the actual conversion from linear RGB to grayscale (and in BT.601 colorspace). To do this conversion correctly you need to know your colorspaces and use the correct coefficients for conversion, and also do gamma correction. See this about why it is important.

So what can be improved on this? For starters, let’s write a small benchmark for this so that we know whether any of our changes actually improve something. This is using the (unfortunately still) unstable benchmark feature of Cargo.

#![feature(test)]
#![feature(exact_chunks)]

extern crate test;

pub fn bgrx_to_gray_chunks_no_asserts(...)
    [...]
}

#[cfg(test)]
mod tests {
    use super::*;
    use test::Bencher;
    use std::iter;

    fn create_vec(w: usize, h: usize) -> Vec<u8> {
        iter::repeat(0).take(w * h * 4).collect::<_>()
    }

    #[bench]
    fn bench_chunks_1920x1080_no_asserts(b: &mut Bencher) {
        let i = test::black_box(create_vec(1920, 1080));
        let mut o = test::black_box(create_vec(1920, 1080));

        b.iter(|| bgrx_to_gray_chunks_no_asserts(&i, &mut o, 1920 * 4, 1920 * 4, 1920));
    }
}

This can be run with cargo bench and then prints the amount of nanoseconds each iterator of the closure was taking. To only really measure the processing itself, allocations and initializations of the input/output frame are happening outside of the closure. We’re not interested in times for that.

First Optimization – Assertions

To actually start optimizing this function, let’s take a look at the assembly that the compiler is outputting. The easiest way of doing that is via the Godbolt Compiler Explorer website. Select “rustc nightly” and use “-C opt-level=3” for the compiler flags, and then copy & paste your code in there. Once it compiles, to find the assembly that corresponds to a line, simply right-click on the line and “Scroll to assembly”.

Alternatively you can use cargo rustc –release — -C opt-level=3 –emit asm and check the assembly file that is output in the target/release/deps directory.

What we see then for our inner loop is something like the following

.LBB4_19:
  cmp r15, r11
  mov r13, r11
  cmova r13, r15
  mov rdx, r8
  sub rdx, r13
  je .LBB4_34
  cmp rdx, 3
  jb .LBB4_35
  inc r9
  movzx edx, byte ptr [rbx - 1]
  movzx ecx, byte ptr [rbx - 2]
  movzx esi, byte ptr [rbx]
  imul esi, esi, 19595
  imul edx, edx, 38470
  imul ecx, ecx, 7471
  add ecx, edx
  add ecx, esi
  shr ecx, 16
  mov byte ptr [r10 - 3], cl
  mov byte ptr [r10 - 2], cl
  mov byte ptr [r10 - 1], cl
  mov byte ptr [r10], cl
  add r10, 4
  add r8, -4
  add r15, -4
  add rbx, 4
  cmp r9, r14
  jb .LBB4_19

This is already quite optimized. For each loop iteration the first few instructions are doing some bounds checking and if they fail jump to the .LBB4_34 or .LBB4_35 labels. How to understand that this is bounds checking? Scroll down in the assembly to where these labels are defined and you’ll see something like the following

.LBB4_34:
  lea rdi, [rip + .Lpanic_bounds_check_loc.D]
  xor esi, esi
  xor edx, edx
  call core::panicking::panic_bounds_check@PLT
  ud2
.LBB4_35:
  cmp r15, r11
  cmova r11, r15
  sub r8, r11
  lea rdi, [rip + .Lpanic_bounds_check_loc.F]
  mov esi, 2
  mov rdx, r8
  call core::panicking::panic_bounds_check@PLT
  ud2

Also if you check (with the colors, or the “scroll to source” feature) which Rust code these correspond to, you’ll see that it’s the first and third access to the 4-byte slice that contains our BGRx values.

Afterwards in the assembly, the following steps are happening: 0) incrementing of the “loop counter” representing the number of iterations we’re going to do (r9), 1) actual reading of the B, G and R value and conversion to u32 (the 3 movzx, note that the reading of the x value is optimized away as the compiler sees that it is always multiplied by 0 later), 2) the multiplications with the array elements (the 3 imul), 3) combining of the results and division (i.e. shift) (the 2 add and the shr), 4) storing of the result in the output (the 4 mov). Afterwards the slice pointers are increased by 4 (rbx and r10) and the lengths (used for bounds checking) are decreased by 4 (r8 and r15). Finally there’s a check (cmp) to see if r9 (our loop) counter is at the end of the slice, and if not we jump back to the beginning and operate on the next BGRx chunk.

Generally what we want to do for optimizations is to get rid of unnecessary checks (bounds checking), memory accesses, conditions (cmp, cmov) and jumps (the instructions starting with j). These are all things that are slowing down our code.

So the first thing that seems useful to optimize here is the bounds checking at the beginning. It definitely seems not useful to do two checks instead of one for the two slices (the checks are for the both slices at once but Godbolt does not detect that and believes it’s only the input slice). And ideally we could teach the compiler that no bounds checking is needed at all.

As I wrote in the previous blog post, often this knowledge can be given to the compiler by inserting assertions.

To prevent two checks and just have a single check, you can insert a assert_eq!(in_p.len(), 4) at the beginning of the inner loop and the same for the output slice. Now we only have a single bounds check left per iteration.

As a next step we might want to try to move this knowledge outside the inner loop so that there is no bounds checking at all in there anymore. We might want to add assertions like the following outside the outer loop then to give all knowledge we have to the compiler

assert_eq!(in_data.len() % 4, 0);
assert_eq!(out_data.len() % 4, 0);
assert_eq!(out_data.len() / out_stride, in_data.len() / in_stride);

assert!(in_line_bytes <= in_stride);
assert!(out_line_bytes <= out_stride);

Unfortunately adding those has no effect at all on the inner loop, but having them outside the outer loop for good measure is not the worst idea so let’s just keep them. At least it can be used as some kind of documentation of the invariants of this code for future readers.

So let’s benchmark these two implementations now. The results on my machine are the following

test tests::bench_chunks_1920x1080_no_asserts ... bench:   4,420,145 ns/iter (+/- 139,051)
test tests::bench_chunks_1920x1080_asserts    ... bench:   4,897,046 ns/iter (+/- 166,555)

This is surprising, our version without the assertions is actually faster by a factor of ~1.1 although it had fewer conditions. So let’s take a closer look at the assembly at the top of the loop again, where the bounds checking happens, in the version with assertions

.LBB4_19:
  cmp rbx, r11
  mov r9, r11
  cmova r9, rbx
  mov r14, r12
  sub r14, r9
  lea rax, [r14 - 1]
  mov qword ptr [rbp - 120], rax
  mov qword ptr [rbp - 128], r13
  mov qword ptr [rbp - 136], r10
  cmp r14, 5
  jne .LBB4_33
  inc rcx
  [...]

While this indeed has only one jump as expected for the bounds checking, the number of comparisons is the same and even worse: 3 memory writes to the stack are happening right before the jump. If we follow to the .LBB4_33 label we will see that the assert_eq! macro is going to do something with core::fmt::Debug. This is setting up the information needed for printing the assertion failure, the “expected X equals to Y” output. This is certainly not good and the reason why everything is slower now.

First Optimization – Assertions Try 2

All the additional instructions and memory writes were happening because the assert_eq! macro is outputting something user friendly that actually contains the values of both sides. Let’s try again with the assert! macro instead

test tests::bench_chunks_1920x1080_no_asserts ... bench:   4,420,145 ns/iter (+/- 139,051)
test tests::bench_chunks_1920x1080_asserts    ... bench:   4,897,046 ns/iter (+/- 166,555)
test tests::bench_chunks_1920x1080_asserts_2  ... bench:   3,968,976 ns/iter (+/- 97,084)

This already looks more promising. Compared to our baseline version this gives us a speedup of a factor of 1.12, and compared to the version with assert_eq! 1.23. If we look at the assembly for the bounds checks (everything else stays the same), it also looks more like what we would’ve expected

.LBB4_19:
  cmp rbx, r12
  mov r13, r12
  cmova r13, rbx
  add r13, r14
  jne .LBB4_33
  inc r9
  [...]

One cmp less, only one jump left. And no memory writes anymore!

So keep in mind that assert_eq! is more user-friendly but quite a bit more expensive even in the “good case” compared to assert!.

Second Optimization – Iterate a bit more

This is still not very satisfying though. No bounds checking should be needed at all as each chunk is going to be exactly 4 bytes. We’re just not able to convince the compiler that this is the case. While it may be possible (let me know if you find a way!), let’s try something different. The zip iterator is done when the shortest iterator of both is done, and there are optimizations specifically for zipped slice iterators implemented. Let’s try that and replace the grayscale value calculation with

let grey = in_p.iter()
    .zip(RGB_Y.iter())
    .map(|(i, c)| u32::from(*i) * c)
    .sum::<u32>() / 65536;

If we run that through our benchmark after removing the assert!(in_p.len() == 4) (and the same for the output slice), these are the results

test tests::bench_chunks_1920x1080_asserts_2  ... bench:   3,968,976 ns/iter (+/- 97,084)
test tests::bench_chunks_1920x1080_iter_sum   ... bench:  11,393,600 ns/iter (+/- 347,958)

We’re actually 2.9 times slower! Even when adding back the assert!(in_p.len() == 4) assertion (and the same for the output slice) we’re still slower

test tests::bench_chunks_1920x1080_asserts_2  ... bench:   3,968,976 ns/iter (+/- 97,084)
test tests::bench_chunks_1920x1080_iter_sum   ... bench:  11,393,600 ns/iter (+/- 347,958)
test tests::bench_chunks_1920x1080_iter_sum_2 ... bench:  10,420,442 ns/iter (+/- 242,379)

If we look at the assembly of the assertion-less variant, it’s a complete mess now

.LBB0_19:
  cmp rbx, r13
  mov rcx, r13
  cmova rcx, rbx
  mov rdx, r8
  sub rdx, rcx
  cmp rdx, 4
  mov r11d, 4
  cmovb r11, rdx
  test r11, r11
  je .LBB0_20
  movzx ecx, byte ptr [r15 - 2]
  imul ecx, ecx, 19595
  cmp r11, 1
  jbe .LBB0_22
  movzx esi, byte ptr [r15 - 1]
  imul esi, esi, 38470
  add esi, ecx
  movzx ecx, byte ptr [r15]
  imul ecx, ecx, 7471
  add ecx, esi
  test rdx, rdx
  jne .LBB0_23
  jmp .LBB0_35
.LBB0_20:
  xor ecx, ecx
.LBB0_22:
  test rdx, rdx
  je .LBB0_35
.LBB0_23:
  shr ecx, 16
  mov byte ptr [r10 - 3], cl
  mov byte ptr [r10 - 2], cl
  cmp rdx, 3
  jb .LBB0_36
  inc r9
  mov byte ptr [r10 - 1], cl
  mov byte ptr [r10], cl
  add r10, 4
  add r8, -4
  add rbx, -4
  add r15, 4
  cmp r9, r14
  jb .LBB0_19

In short, there are now various new conditions and jumps for short-circuiting the zip iterator in the various cases. And because of all the noise added, the compiler was not even able to optimize the bounds check for the output slice away anymore (.LBB0_35 cases). While it was able to unroll the iterator (note that the 3 imul multiplications are not interleaved with jumps and are actually 3 multiplications instead of yet another loop), which is quite impressive, it couldn’t do anything meaningful with that information it somehow got (it must’ve understood that each chunk has 4 bytes!). This looks like something going wrong somewhere in the optimizer to me.

If we take a look at the variant with the assertions, things look much better

.LBB3_19:
  cmp r11, r12
  mov r13, r12
  cmova r13, r11
  add r13, r14
  jne .LBB3_33
  inc r9
  movzx ecx, byte ptr [rdx - 2]
  imul r13d, ecx, 19595
  movzx ecx, byte ptr [rdx - 1]
  imul ecx, ecx, 38470
  add ecx, r13d
  movzx ebx, byte ptr [rdx]
  imul ebx, ebx, 7471
  add ebx, ecx
  shr ebx, 16
  mov byte ptr [r10 - 3], bl
  mov byte ptr [r10 - 2], bl
  mov byte ptr [r10 - 1], bl
  mov byte ptr [r10], bl
  add r10, 4
  add r11, -4
  add r14, 4
  add rdx, 4
  cmp r9, r15
  jb .LBB3_19

This is literally the same as the assertion version we had before, except that the reading of the input slice, the multiplications and the additions are happening in iterator order instead of being batched all together. It’s quite impressive that the compiler was able to completely optimize away the zip iterator here, but unfortunately it’s still many times slower than the original version. The reason must be the instruction-reordering. The previous version had all memory reads batched and then the operations batched, which is apparently much better for the internal pipelining of the CPU (it is going to perform the next instructions without dependencies on the previous ones already while waiting for the pending instructions to finish).

It’s also not clear to me why the LLVM optimizer is not able to schedule the instructions the same way here. It apparently has all information it needs for that if no iterator is involved, and both versions are leading to exactly the same assembly except for the order of instructions. This also seems like something fishy.

Nonetheless, we still have our manual bounds check (the assertion) left here and we should really try to get rid of that. No progress so far.

Third Optimization – Getting rid of the bounds check finally

Let’s tackle this from a different angle now. Our problem is apparently that the compiler is not able to understand that each chunk is exactly 4 bytes.

So why don’t we write a new chunks iterator that has always exactly the requested amount of items, instead of potentially less for the very last iteration. And instead of panicking if there are leftover elements, it seems useful to just ignore them. That way we have API that is functionally different from the existing chunks iterator and provides behaviour that is useful in various cases. It’s basically the slice equivalent of the exact_chunks iterator of the ndarray crate.

By having it functionally different from the existing one, and not just an optimization, I also submitted it for inclusion in Rust’s standard library and it’s nowadays available as an unstable feature in nightly. Like all newly added API. Nonetheless, the same can also be implemented inside your code with basically the same effect, there are no dependencies on standard library internals.

So, let’s use our new exact_chunks iterator that is guaranteed (by API) to always give us exactly 4 bytes. In our case this is exactly equivalent to the normal chunks as by construction our slices always have a length that is a multiple of 4, but the compiler can’t infer that information. The resulting code looks as follows

pub fn bgrx_to_gray_exact_chunks(
    in_data: &[u8],
    out_data: &mut [u8],
    in_stride: usize,
    out_stride: usize,
    width: usize,
) {
    assert_eq!(in_data.len() % 4, 0);
    assert_eq!(out_data.len() % 4, 0);
    assert_eq!(out_data.len() / out_stride, in_data.len() / in_stride);

    let in_line_bytes = width * 4;
    let out_line_bytes = width * 4;

    assert!(in_line_bytes <= in_stride);
    assert!(out_line_bytes <= out_stride);

    for (in_line, out_line) in in_data
        .exact_chunks(in_stride)
        .zip(out_data.exact_chunks_mut(out_stride))
    {
        for (in_p, out_p) in in_line[..in_line_bytes]
            .exact_chunks(4)
            .zip(out_line[..out_line_bytes].exact_chunks_mut(4))
        {
            assert!(in_p.len() == 4);
            assert!(out_p.len() == 4);

            let b = u32::from(in_p[0]);
            let g = u32::from(in_p[1]);
            let r = u32::from(in_p[2]);
            let x = u32::from(in_p[3]);

            let grey = ((r * RGB_Y[0]) + (g * RGB_Y[1]) + (b * RGB_Y[2]) + (x * RGB_Y[3])) / 65536;
            let grey = grey as u8;
            out_p[0] = grey;
            out_p[1] = grey;
            out_p[2] = grey;
            out_p[3] = grey;
        }
    }
}

It’s exactly the same as the previous version with assertions, except for using exact_chunks instead of chunks and the same for the mutable iterator. The resulting benchmark of all our variants now looks as follow

test tests::bench_chunks_1920x1080_no_asserts ... bench:   4,420,145 ns/iter (+/- 139,051)
test tests::bench_chunks_1920x1080_asserts    ... bench:   4,897,046 ns/iter (+/- 166,555)
test tests::bench_chunks_1920x1080_asserts_2  ... bench:   3,968,976 ns/iter (+/- 97,084)
test tests::bench_chunks_1920x1080_iter_sum   ... bench:  11,393,600 ns/iter (+/- 347,958)
test tests::bench_chunks_1920x1080_iter_sum_2 ... bench:  10,420,442 ns/iter (+/- 242,379)
test tests::bench_exact_chunks_1920x1080      ... bench:   2,007,459 ns/iter (+/- 112,287)

Compared to our initial version this is a speedup of a factor of 2.2, compared to our version with assertions a factor of 1.98. This seems like a worthwhile improvement, and if we look at the resulting assembly there are no bounds checks at all anymore

.LBB0_10:
  movzx edx, byte ptr [rsi - 2]
  movzx r15d, byte ptr [rsi - 1]
  movzx r12d, byte ptr [rsi]
  imul r13d, edx, 19595
  imul edx, r15d, 38470
  add edx, r13d
  imul ebx, r12d, 7471
  add ebx, edx
  shr ebx, 16
  mov byte ptr [rcx - 3], bl
  mov byte ptr [rcx - 2], bl
  mov byte ptr [rcx - 1], bl
  mov byte ptr [rcx], bl
  add rcx, 4
  add rsi, 4
  dec r10
  jne .LBB0_10

Also due to this the compiler is able to apply some more optimizations and we only have one loop counter for the number of iterations r10 and the two pointers rcx and rsi that are increased/decreased in each iteration. There is no tracking of the remaining slice lengths anymore, as in the assembly of the original version (and the versions with assertions).

Summary

So overall we got a speedup of a factor of 2.2 while still writing very high-level Rust code with iterators and not falling back to unsafe code or using SIMD. The optimizations the Rust compiler is applying are quite impressive and the Rust marketing line of zero-cost abstractions is really visible in reality here.

The same approach should also work for many similar algorithms, and thus many similar multimedia related algorithms where you iterate over slices and operate on fixed-size chunks.

Also the above shows that as a first step it’s better to write clean and understandable high-level Rust code without worrying too much about performance (assume the compiler can optimize well), and only afterwards take a look at the generated assembly and check which instructions should really go away (like bounds checking). In many cases this can be achieved by adding assertions in strategic places, or like in this case by switching to a slightly different abstraction that is closer to the actual requirements (however I believe the compiler should be able to produce the same code with the help of assertions with the normal chunks iterator, but making that possible requires improvements to the LLVM optimizer probably).

And if all does not help, there’s still the escape hatch of unsafe (for using functions like slice::get_unchecked() or going down to raw pointers) and the possibility of using SIMD instructions (by using faster or stdsimd directly). But in the end this should be a last resort for those little parts of your code where optimizations are needed and the compiler can’t be easily convinced to do it for you.

Addendum: slice::split_at

User newpavlov suggested on Reddit to use repeated slice::split_at in a while loop for similar performance.

This would for example like

pub fn bgrx_to_gray_split_at(
    in_data: &[u8],
    out_data: &mut [u8],
    in_stride: usize,
    out_stride: usize,
    width: usize,
) {
    assert_eq!(in_data.len() % 4, 0);
    assert_eq!(out_data.len() % 4, 0);
    assert_eq!(out_data.len() / out_stride, in_data.len() / in_stride);

    let in_line_bytes = width * 4;
    let out_line_bytes = width * 4;

    assert!(in_line_bytes <= in_stride);
    assert!(out_line_bytes <= out_stride);

    for (in_line, out_line) in in_data
        .exact_chunks(in_stride)
        .zip(out_data.exact_chunks_mut(out_stride))
    {
        let mut in_pp: &[u8] = in_line[..in_line_bytes].as_ref();
        let mut out_pp: &mut [u8] = out_line[..out_line_bytes].as_mut();
        assert!(in_pp.len() == out_pp.len());

        while in_pp.len() >= 4 {
            let (in_p, in_tmp) = in_pp.split_at(4);
            let (out_p, out_tmp) = { out_pp }.split_at_mut(4);
            in_pp = in_tmp;
            out_pp = out_tmp;

            let b = u32::from(in_p[0]);
            let g = u32::from(in_p[1]);
            let r = u32::from(in_p[2]);
            let x = u32::from(in_p[3]);

            let grey = ((r * RGB_Y[0]) + (g * RGB_Y[1]) + (b * RGB_Y[2]) + (x * RGB_Y[3])) / 65536;
            let grey = grey as u8;
            out_p[0] = grey;
            out_p[1] = grey;
            out_p[2] = grey;
            out_p[3] = grey;
        }
    }
}

Performance-wise this brings us very close to the exact_chunks version

test tests::bench_exact_chunks_1920x1080      ... bench: 1,965,631 ns/iter (+/- 58,832)
test tests::bench_split_at_1920x1080          ... bench: 2,046,834 ns/iter (+/- 35,990)

and the assembly is also very similar

.LBB0_10:
  add rbx, -4
  movzx r15d, byte ptr [rsi]
  movzx r12d, byte ptr [rsi + 1]
  movzx edx, byte ptr [rsi + 2]
  imul r13d, edx, 19595
  imul r12d, r12d, 38470
  imul edx, r15d, 7471
  add edx, r12d
  add edx, r13d
  shr edx, 16
  movzx edx, dl
  imul edx, edx, 16843009
  mov dword ptr [rcx], edx
  lea rcx, [rcx + 4]
  add rsi, 4
  cmp rbx, 3
  ja .LBB0_10

Here the compiler even optimizes the storing of the value into a single write operation of 4 bytes, at the cost of an additional multiplication and zero-extend register move.

Overall this code performs very well too, but in my opinion it looks rather ugly compared to the versions using the different chunks iterators. Also this is basically what the exact_chunks iterator does internally: repeatedly calling slice::split_at. In theory both versions could lead to the very same assembly, but the LLVM optimizer is currently handling both slightly different.

Addendum 2: SIMD with faster

Adam Niederer, author of faster, provided a PR that implements the same algorithm with faster to make explicit use of SIMD instructions if available.

Due to some codegen issues, this currently has to be compiled with the CPU being selected as nehalem, i.e. by running RUSTFLAGS=”-C target-cpu=nehalem” cargo +nightly bench, but it provides yet another speedup by a factor of up to 1.27x compared to the fastest previous solution and 2.7x compared to the initial solution:

test tests::bench_chunks_1920x1080_asserts                     ... bench:   4,539,286 ns/iter (+/- 106,265)
test tests::bench_chunks_1920x1080_asserts_2                   ... bench:   3,550,683 ns/iter (+/- 96,917)
test tests::bench_chunks_1920x1080_iter_sum                    ... bench:   5,233,238 ns/iter (+/- 114,671)
test tests::bench_chunks_1920x1080_iter_sum_2                  ... bench:   3,532,059 ns/iter (+/- 94,964)
test tests::bench_chunks_1920x1080_no_asserts                  ... bench:   4,468,269 ns/iter (+/- 89,329)
test tests::bench_chunks_1920x1080_no_asserts_faster           ... bench:   2,476,077 ns/iter (+/- 54,877)
test tests::bench_chunks_1920x1080_no_asserts_faster_unstrided ... bench:   1,642,980 ns/iter (+/- 108,034)
test tests::bench_exact_chunks_1920x1080                       ... bench:   2,078,950 ns/iter (+/- 64,536)
test tests::bench_split_at_1920x1080                           ... bench:   2,096,603 ns/iter (+/- 107,420)

The code in question is very similar to what you would’ve written with ORC, especially the unstrided version. You basically operate on multiple elements at once, doing the same operation on each, but both versions do this in a slightly different way.

pub fn bgrx_to_gray_chunks_no_asserts_faster_unstrided(in_data: &[u8], out_data: &mut [u8]) {
    // Relies on vector width which is a multiple of 4
    assert!(u8s::WIDTH % 4 == 0 && u32s::WIDTH % 4 == 0);

    const RGB_Y: [u32; 16] = [19595, 38470, 7471, 0, 19595, 38470, 7471, 0, 19595, 38470, 7471, 0, 19595, 38470, 7471, 0];
    let rgbvec = u32s::load(&RGB_Y, 0);
    in_data.simd_iter(u8s(0)).simd_map(|v| {
        let (a, b) = v.upcast();
        let (a32, b32) = a.upcast();
        let (c32, d32) = b.upcast();

        let grey32a = a32 * rgbvec / u32s(65536);
        let grey32b = b32 * rgbvec / u32s(65536);
        let grey32c = c32 * rgbvec / u32s(65536);
        let grey32d = d32 * rgbvec / u32s(65536);

        let grey16a = grey32a.saturating_downcast(grey32b);
        let grey16b = grey32c.saturating_downcast(grey32d);

        let grey = grey16a.saturating_downcast(grey16b);
        grey
    }).scalar_fill(out_data);
}

pub fn bgrx_to_gray_chunks_no_asserts_faster(in_data: &[u8], out_data: &mut [u8]) {
    // Sane, but slowed down by faster's current striding implementation.
    in_data.stride_four(tuplify!(4, u8s(0))).zip().simd_map(|(r, g, b, _)| {
        let (r16a, r16b) = r.upcast();
        let (r32a, r32b) = r16a.upcast();
        let (r32c, r32d) = r16b.upcast();

        let (g16a, g16b) = g.upcast();
        let (g32a, g32b) = g16a.upcast();
        let (g32c, g32d) = g16b.upcast();

        let (b16a, b16b) = b.upcast();
        let (b32a, b32b) = b16a.upcast();
        let (b32c, b32d) = b16b.upcast();

        let grey32a = (r32a * u32s(19595) + g32a * u32s(38470) + b32a * u32s(7471)) / u32s(65536);
        let grey32b = (r32b * u32s(19595) + g32b * u32s(38470) + b32b * u32s(7471)) / u32s(65536);
        let grey32c = (r32c * u32s(19595) + g32c * u32s(38470) + b32c * u32s(7471)) / u32s(65536);
        let grey32d = (r32d * u32s(19595) + g32d * u32s(38470) + b32d * u32s(7471)) / u32s(65536);

        let grey16a = grey32a.saturating_downcast(grey32b);
        let grey16b = grey32c.saturating_downcast(grey32d);

        let grey = grey16a.saturating_downcast(grey16b);
        grey
    }).scalar_fill(out_data);
}

How to write GStreamer Elements in Rust Part 1: A Video Filter for converting RGB to grayscale

This is part one of a series of blog posts that I’ll write in the next weeks, as previously announced in the GStreamer Rust bindings 0.10.0 release blog post. Since the last series of blog posts about writing GStreamer plugins in Rust ([1] [2] [3] [4]) a lot has changed, and the content of those blog posts has only historical value now, as the journey of experimentation to what exists now.

In this first part we’re going to write a plugin that contains a video filter element. The video filter can convert from RGB to grayscale, either output as 8-bit per pixel grayscale or 32-bit per pixel RGB. In addition there’s a property to invert all grayscale values, or to shift them by up to 255 values. In the end this will allow you to watch Big Bucky Bunny, or anything else really that can somehow go into a GStreamer pipeline, in grayscale. Or encode the output to a new video file, send it over the network via WebRTC or something else, or basically do anything you want with it.

Big Bucky Bunny – Grayscale

This will show the basics of how to write a GStreamer plugin and element in Rust: the basic setup for registering a type and implementing it in Rust, and how to use the various GStreamer API and APIs from the Rust standard library to do the processing.

The final code for this plugin can be found here, and it is based on the 0.1 version of the gst-plugin crate and the 0.10 version of the gstreamer crate. At least Rust 1.20 is required for all this. I’m also assuming that you have GStreamer (at least version 1.8) installed for your platform, see e.g. the GStreamer bindings installation instructions.

Table of Contents

  1. Project Structure
  2. Plugin Initialization
  3. Type Registration
  4. Type Class & Instance Initialization
  5. Caps & Pad Templates
  6. Caps Handling Part 1
  7. Caps Handling Part 2
  8. Conversion of BGRx Video Frames to Grayscale
  9. Testing the new element
  10. Properties
  11. What next?

Project Structure

We’ll create a new cargo project with cargo init –lib –name gst-plugin-tutorial. This will create a basically empty Cargo.toml and a corresponding src/lib.rs. We will use this structure: lib.rs will contain all the plugin related code, separate modules will contain any GStreamer plugins that are added.

The empty Cargo.toml has to be updated to list all the dependencies that we need, and to define that the crate should result in a cdylib, i.e. a C library that does not contain any Rust-specific metadata. The final Cargo.toml looks as follows

[package]
name = "gst-plugin-tutorial"
version = "0.1.0"
authors = ["Sebastian Dröge <sebastian@centricular.com>"]
repository = "https://github.com/sdroege/gst-plugin-rs"
license = "MIT/Apache-2.0"

[dependencies]
glib = "0.4"
gstreamer = "0.10"
gstreamer-base = "0.10"
gstreamer-video = "0.10"
gst-plugin = "0.1"

[lib]
name = "gstrstutorial"
crate-type = ["cdylib"]
path = "src/lib.rs"

We’re depending on the gst-plugin crate, which provides all the basic infrastructure for implementing GStreamer plugins and elements. In addition we depend on the gstreamer, gstreamer-base and gstreamer-video crates for various GStreamer API that we’re going to use later, and the glib crate to be able to use some GLib API that we’ll need. GStreamer is building upon GLib, and this leaks through in various places.

With the basic project structure being set-up, we should be able to compile the project with cargo build now, which will download and build all dependencies and then creates a file called target/debug/libgstrstutorial.so (or .dll on Windows, .dylib on macOS). This is going to be our GStreamer plugin.

To allow GStreamer to find our new plugin and make it available in every GStreamer-based application, we could install it into the system- or user-wide GStreamer plugin path or simply point the GST_PLUGIN_PATH environment variable to the directory containing it:

export GST_PLUGIN_PATH=<code>{{EJS45}}</code>/target/debug

If you now run the gst-inspect-1.0 tool on the libgstrstutorial.so, it will not yet print all information it can extract from the plugin but for now just complains that this is not a valid GStreamer plugin. Which is true, we didn’t write any code for it yet.

Plugin Initialization

Let’s start editing src/lib.rs to make this an actual GStreamer plugin. First of all, we need to add various extern crate directives to be able to use our dependencies and also mark some of them #[macro_use] because we’re going to use macros defined in some of them. This looks like the following

extern crate glib;
#[macro_use]
extern crate gstreamer as gst;
extern crate gstreamer_base as gst_base;
extern crate gstreamer_video as gst_video;
#[macro_use]
extern crate gst_plugin;

Next we make use of the plugin_define! macro from the gst-plugin crate to set-up the static metadata of the plugin (and make the shared library recognizeable by GStreamer to be a valid plugin), and to define the name of our entry point function (plugin_init) where we will register all the elements that this plugin provides.

plugin_define!(
    b"rstutorial\0",
    b"Rust Tutorial Plugin\0",
    plugin_init,
    b"1.0\0",
    b"MIT/X11\0",
    b"rstutorial\0",
    b"rstutorial\0",
    b"https://github.com/sdroege/gst-plugin-rs\0",
    b"2017-12-30\0"
);

This is unfortunately not very beautiful yet due to a) GStreamer requiring this information to be statically available in the shared library, not returned by a function (starting with GStreamer 1.14 it can be a function), and b) Rust not allowing raw strings (b”blabla) to be concatenated with a macro like the std::concat macro (so that the b and \0 parts could be hidden away). Expect this to become better in the future.

The static plugin metadata that we provide here is

  1. name of the plugin
  2. short description for the plugin
  3. name of the plugin entry point function
  4. version number of the plugin
  5. license of the plugin (only a fixed set of licenses is allowed here, see)
  6. source package name
  7. binary package name (only really makes sense for e.g. Linux distributions)
  8. origin of the plugin
  9. release date of this version

In addition we’re defining an empty plugin entry point function that just returns true

fn plugin_init(plugin: &gst::Plugin) -> bool {
    true
}

With all that given, gst-inspect-1.0 should print exactly this information when running on the libgstrstutorial.so file (or .dll on Windows, or .dylib on macOS)

gst-inspect-1.0 target/debug/libgstrstutorial.so

Type Registration

As a next step, we’re going to add another module rgb2gray to our project, and call a function called register from our plugin_init function.

mod rgb2gray;

fn plugin_init(plugin: &gst::Plugin) -> bool {
    rgb2gray::register(plugin);
    true
}

With that our src/lib.rs is complete, and all following code is only in src/rgb2gray.rs. At the top of the new file we first need to add various use-directives to import various types and functions we’re going to use into the current module’s scope

use glib;
use gst;
use gst::prelude::*;
use gst_video;

use gst_plugin::properties::*;
use gst_plugin::object::*;
use gst_plugin::element::*;
use gst_plugin::base_transform::*;

use std::i32;
use std::sync::Mutex;

GStreamer is based on the GLib object system (GObject). C (just like Rust) does not have built-in support for object orientated programming, inheritance, virtual methods and related concepts, and GObject makes these features available in C as a library. Without language support this is a quite verbose endeavour in C, and the gst-plugin crate tries to expose all this in a (as much as possible) Rust-style API while hiding all the details that do not really matter.

So, as a next step we need to register a new type for our RGB to Grayscale converter GStreamer element with the GObject type system, and then register that type with GStreamer to be able to create new instances of it. We do this with the following code

struct Rgb2GrayStatic;

impl ImplTypeStatic<BaseTransform> for Rgb2GrayStatic {
    fn get_name(&self) -> &str {
        "Rgb2Gray"
    }

    fn new(&self, element: &BaseTransform) -> Box<BaseTransformImpl<BaseTransform>> {
        Rgb2Gray::new(element)
    }

    fn class_init(&self, klass: &mut BaseTransformClass) {
        Rgb2Gray::class_init(klass);
    }
}

pub fn register(plugin: &gst::Plugin) {
    let type_ = register_type(Rgb2GrayStatic);
    gst::Element::register(plugin, "rsrgb2gray", 0, type_);
}

This defines a zero-sized struct Rgb2GrayStatic that is used to implement the ImplTypeStatic<BaseTransform> trait on it for providing static information about the type to the type system. In our case this is a zero-sized struct, but in other cases this struct might contain actual data (for example if the same element code is used for multiple elements, e.g. when wrapping a generic codec API that provides support for multiple decoders and then wanting to register one element per decoder). By implementing ImplTypeStatic<BaseTransform> we also declare that our element is going to be based on the GStreamer BaseTransform base class, which provides a relatively simple API for 1:1 transformation elements like ours is going to be.

ImplTypeStatic provides functions that return a name for the type, and functions for initializing/returning a new instance of our element (new) and for initializing the class metadata (class_init, more on that later). We simply let those functions proxy to associated functions on the Rgb2Gray struct that we’re going to define at a later time.

In addition, we also define a register function (the one that is already called from our plugin_init function) and in there first register the Rgb2GrayStatic type metadata with the GObject type system to retrieve a type ID, and then register this type ID to GStreamer to be able to create new instances of it with the name “rsrgb2gray” (e.g. when using gst::ElementFactory::make).

Type Class & Instance Initialization

As a next step we declare the Rgb2Gray struct and implement the new and class_init functions on it. In the first version, this struct is almost empty but we will later use it to store all state of our element.

struct Rgb2Gray {
    cat: gst::DebugCategory,
}

impl Rgb2Gray {
    fn new(_transform: &BaseTransform) -> Box<BaseTransformImpl<BaseTransform>> {
        Box::new(Self {
            cat: gst::DebugCategory::new(
                "rsrgb2gray",
                gst::DebugColorFlags::empty(),
                "Rust RGB-GRAY converter",
            ),
        })
    }

    fn class_init(klass: &mut BaseTransformClass) {
        klass.set_metadata(
            "RGB-GRAY Converter",
            "Filter/Effect/Converter/Video",
            "Converts RGB to GRAY or grayscale RGB",
            "Sebastian Dröge <sebastian@centricular.com>",
        );

        klass.configure(BaseTransformMode::NeverInPlace, false, false);
    }
}

In the new function we return a boxed (i.e. heap-allocated) version of our struct, containing a newly created GStreamer debug category of name “rsrgb2gray”. We’re going to use this debug category later for making use of GStreamer’s debug logging system for logging the state and changes of our element.

In the class_init function we, again, set up some metadata for our new element. In this case these are a description, a classification of our element, a longer description and the author. The metadata can later be retrieved and made use of via the Registry and PluginFeature/ElementFactory API. We also configure the BaseTransform class and define that we will never operate in-place (producing our output in the input buffer), and that we don’t want to work in passthrough mode if the input/output formats are the same.

Additionally we need to implement various traits on the Rgb2Gray struct, which will later be used to override virtual methods of the various parent classes of our element. For now we can keep the trait implementations empty. There is one trait implementation required per parent class.

impl ObjectImpl<BaseTransform> for Rgb2Gray {}
impl ElementImpl<BaseTransform> for Rgb2Gray {}
impl BaseTransformImpl<BaseTransform> for Rgb2Gray {}

With all this defined, gst-inspect-1.0 should be able to show some more information about our element already but will still complain that it’s not complete yet.

Caps & Pad Templates

Data flow of GStreamer elements is happening via pads, which are the input(s) and output(s) (or sinks and sources) of an element. Via the pads, buffers containing actual media data, events or queries are transferred. An element can have any number of sink and source pads, but our new element will only have one of each.

To be able to declare what kinds of pads an element can create (they are not necessarily all static but could be created at runtime by the element or the application), it is necessary to install so-called pad templates during the class initialization. These pad templates contain the name (or rather “name template”, it could be something like src_%u for e.g. pad templates that declare multiple possible pads), the direction of the pad (sink or source), the availability of the pad (is it always there, sometimes added/removed by the element or to be requested by the application) and all the possible media types (called caps) that the pad can consume (sink pads) or produce (src pads).

In our case we only have always pads, one sink pad called “sink”, on which we can only accept RGB (BGRx to be exact) data with any width/height/framerate and one source pad called “src”, on which we will produce either RGB (BGRx) data or GRAY8 (8-bit grayscale) data. We do this by adding the following code to the class_init function.

        let caps = gst::Caps::new_simple(
            "video/x-raw",
            &[
                (
                    "format",
                    &gst::List::new(&[
                        &gst_video::VideoFormat::Bgrx.to_string(),
                        &gst_video::VideoFormat::Gray8.to_string(),
                    ]),
                ),
                ("width", &gst::IntRange::<i32>::new(0, i32::MAX)),
                ("height", &gst::IntRange::<i32>::new(0, i32::MAX)),
                (
                    "framerate",
                    &gst::FractionRange::new(
                        gst::Fraction::new(0, 1),
                        gst::Fraction::new(i32::MAX, 1),
                    ),
                ),
            ],
        );

        let src_pad_template = gst::PadTemplate::new(
            "src",
            gst::PadDirection::Src,
            gst::PadPresence::Always,
            &caps,
        );
        klass.add_pad_template(src_pad_template);

        let caps = gst::Caps::new_simple(
            "video/x-raw",
            &[
                ("format", &gst_video::VideoFormat::Bgrx.to_string()),
                ("width", &gst::IntRange::<i32>::new(0, i32::MAX)),
                ("height", &gst::IntRange::<i32>::new(0, i32::MAX)),
                (
                    "framerate",
                    &gst::FractionRange::new(
                        gst::Fraction::new(0, 1),
                        gst::Fraction::new(i32::MAX, 1),
                    ),
                ),
            ],
        );

        let sink_pad_template = gst::PadTemplate::new(
            "sink",
            gst::PadDirection::Sink,
            gst::PadPresence::Always,
            &caps,
        );
        klass.add_pad_template(sink_pad_template);

The names “src” and “sink” are pre-defined by the BaseTransform class and this base-class will also create the actual pads with those names from the templates for us whenever a new element instance is created. Otherwise we would have to do that in our new function but here this is not needed.

If you now run gst-inspect-1.0 on the rsrgb2gray element, these pad templates with their caps should also show up.

Caps Handling Part 1

As a next step we will add caps handling to our new element. This involves overriding 4 virtual methods from the BaseTransformImpl trait, and actually storing the configured input and output caps inside our element struct. Let’s start with the latter

struct State {
    in_info: gst_video::VideoInfo,
    out_info: gst_video::VideoInfo,
}

struct Rgb2Gray {
    cat: gst::DebugCategory,
    state: Mutex<Option<State>>,
}

impl Rgb2Gray {
    fn new(_transform: &BaseTransform) -> Box<BaseTransformImpl<BaseTransform>> {
        Box::new(Self {
            cat: gst::DebugCategory::new(
                "rsrgb2gray",
                gst::DebugColorFlags::empty(),
                "Rust RGB-GRAY converter",
            ),
            state: Mutex::new(None),
        })
    }
}

We define a new struct State that contains the input and output caps, stored in a VideoInfo. VideoInfo is a struct that contains various fields like width/height, framerate and the video format and allows to conveniently with the properties of (raw) video formats. We have to store it inside a Mutex in our Rgb2Gray struct as this can (in theory) be accessed from multiple threads at the same time.

Whenever input/output caps are configured on our element, the set_caps virtual method of BaseTransform is called with both caps (i.e. in the very beginning before the data flow and whenever it changes), and all following video frames that pass through our element should be according to those caps. Once the element is shut down, the stop virtual method is called and it would make sense to release the State as it only contains stream-specific information. We’re doing this by adding the following to the BaseTransformImpl trait implementation

impl BaseTransformImpl<BaseTransform> for Rgb2Gray {
    fn set_caps(&self, element: &BaseTransform, incaps: &gst::Caps, outcaps: &gst::Caps) -> bool {
        let in_info = match gst_video::VideoInfo::from_caps(incaps) {
            None => return false,
            Some(info) => info,
        };
        let out_info = match gst_video::VideoInfo::from_caps(outcaps) {
            None => return false,
            Some(info) => info,
        };

        gst_debug!(
            self.cat,
            obj: element,
            "Configured for caps {} to {}",
            incaps,
            outcaps
        );

        *self.state.lock().unwrap() = Some(State {
            in_info: in_info,
            out_info: out_info,
        });

        true
    }

    fn stop(&self, element: &BaseTransform) -> bool {
        // Drop state
        let _ = self.state.lock().unwrap().take();

        gst_info!(self.cat, obj: element, "Stopped");

        true
    }
}

This code should be relatively self-explanatory. In set_caps we’re parsing the two caps into a VideoInfo and then store this in our State, in stop we drop the State and replace it with None. In addition we make use of our debug category here and use the gst_info! and gst_debug! macros to output the current caps configuration to the GStreamer debug logging system. This information can later be useful for debugging any problems once the element is running.

Next we have to provide information to the BaseTransform base class about the size in bytes of a video frame with specific caps. This is needed so that the base class can allocate an appropriately sized output buffer for us, that we can then fill later. This is done with the get_unit_size virtual method, which is required to return the size of one processing unit in specific caps. In our case, one processing unit is one video frame. In the case of raw audio it would be the size of one sample multiplied by the number of channels.

impl BaseTransformImpl<BaseTransform> for Rgb2Gray {
    fn get_unit_size(&self, _element: &BaseTransform, caps: &gst::Caps) -> Option<usize> {
        gst_video::VideoInfo::from_caps(caps).map(|info| info.size())
    }
}

We simply make use of the VideoInfo API here again, which conveniently gives us the size of one video frame already.

Instead of get_unit_size it would also be possible to implement the transform_size virtual method, which is getting passed one size and the corresponding caps, another caps and is supposed to return the size converted to the second caps. Depending on how your element works, one or the other can be easier to implement.

Caps Handling Part 2

We’re not done yet with caps handling though. As a very last step it is required that we implement a function that is converting caps into the corresponding caps in the other direction. For example, if we receive BGRx caps with some width/height on the sinkpad, we are supposed to convert this into new caps with the same width/height but BGRx or GRAY8. That is, we can convert BGRx to BGRx or GRAY8. Similarly, if the element downstream of ours can accept GRAY8 with a specific width/height from our source pad, we have to convert this to BGRx with that very same width/height.

This has to be implemented in the transform_caps virtual method, and looks as following

impl BaseTransformImpl<BaseTransform> for Rgb2Gray {
    fn transform_caps(
        &self,
        element: &BaseTransform,
        direction: gst::PadDirection,
        caps: gst::Caps,
        filter: Option<&gst::Caps>,
    ) -> gst::Caps {
        let other_caps = if direction == gst::PadDirection::Src {
            let mut caps = caps.clone();

            for s in caps.make_mut().iter_mut() {
                s.set("format", &gst_video::VideoFormat::Bgrx.to_string());
            }

            caps
        } else {
            let mut gray_caps = gst::Caps::new_empty();

            {
                let gray_caps = gray_caps.get_mut().unwrap();

                for s in caps.iter() {
                    let mut s_gray = s.to_owned();
                    s_gray.set("format", &gst_video::VideoFormat::Gray8.to_string());
                    gray_caps.append_structure(s_gray);
                }
                gray_caps.append(caps.clone());
            }

            gray_caps
        };

        gst_debug!(
            self.cat,
            obj: element,
            "Transformed caps from {} to {} in direction {:?}",
            caps,
            other_caps,
            direction
        );

        if let Some(filter) = filter {
            filter.intersect_with_mode(&other_caps, gst::CapsIntersectMode::First)
        } else {
            other_caps
        }
    }
}

This caps conversion happens in 3 steps. First we check if we got caps for the source pad. In that case, the caps on the other pad (the sink pad) are going to be exactly the same caps but no matter if the caps contained BGRx or GRAY8 they must become BGRx as that’s the only format that our sink pad can accept. We do this by creating a clone of the input caps, then making sure that those caps are actually writable (i.e. we’re having the only reference to them, or a copy is going to be created) and then iterate over all the structures inside the caps and then set the “format” field to BGRx. After this, all structures in the new caps will be with the format field set to BGRx.

Similarly, if we get caps for the sink pad and are supposed to convert it to caps for the source pad, we create new caps and in there append a copy of each structure of the input caps (which are BGRx) with the format field set to GRAY8. In the end we append the original caps, giving us first all caps as GRAY8 and then the same caps as BGRx. With this ordering we signal to GStreamer that we would prefer to output GRAY8 over BGRx.

In the end the caps we created for the other pad are filtered against optional filter caps to reduce the potential size of the caps. This is done by intersecting the caps with that filter, while keeping the order (and thus preferences) of the filter caps (gst::CapsIntersectMode::First).

Conversion of BGRx Video Frames to Grayscale

Now that all the caps handling is implemented, we can finally get to the implementation of the actual video frame conversion. For this we start with defining a helper function bgrx_to_gray that converts one BGRx pixel to a grayscale value. The BGRx pixel is passed as a &[u8] slice with 4 elements and the function returns another u8 for the grayscale value.

impl Rgb2Gray {
    #[inline]
    fn bgrx_to_gray(in_p: &[u8]) -> u8 {
        // See https://en.wikipedia.org/wiki/YUV#SDTV_with_BT.601
        const R_Y: u32 = 19595; // 0.299 * 65536
        const G_Y: u32 = 38470; // 0.587 * 65536
        const B_Y: u32 = 7471; // 0.114 * 65536

        assert_eq!(in_p.len(), 4);

        let b = u32::from(in_p[0]);
        let g = u32::from(in_p[1]);
        let r = u32::from(in_p[2]);

        let gray = ((r * R_Y) + (g * G_Y) + (b * B_Y)) / 65536;
        (gray as u8)
    }
}

This function works by extracting the blue, green and red components from each pixel (remember: we work on BGRx, so the first value will be blue, the second green, the third red and the fourth unused), extending it from 8 to 32 bits for a wider value-range and then converts it to the Y component of the YUV colorspace (basically what your grandparents’ black & white TV would’ve displayed). The coefficients come from the Wikipedia page about YUV and are normalized to unsigned 16 bit integers so we can keep some accuracy, don’t have to work with floating point arithmetic and stay inside the range of 32 bit integers for all our calculations. As you can see, the green component is weighted more than the others, which comes from our eyes being more sensitive to green than to other colors.

Note: This is only doing the actual conversion from linear RGB to grayscale (and in BT.601 colorspace). To do this conversion correctly you need to know your colorspaces and use the correct coefficients for conversion, and also do gamma correction. See this about why it is important.

Afterwards we have to actually call this function on every pixel. For this the transform virtual method is implemented, which gets a input and output buffer passed and we’re supposed to read the input buffer and fill the output buffer. The implementation looks as follows, and is going to be our biggest function for this element

impl BaseTransformImpl<BaseTransform> for Rgb2Gray {
    fn transform(
        &self,
        element: &BaseTransform,
        inbuf: &gst::Buffer,
        outbuf: &mut gst::BufferRef,
    ) -> gst::FlowReturn {
        let mut state_guard = self.state.lock().unwrap();
        let state = match *state_guard {
            None => {
                gst_element_error!(element, gst::CoreError::Negotiation, ["Have no state yet"]);
                return gst::FlowReturn::NotNegotiated;
            }
            Some(ref mut state) => state,
        };

        let in_frame = match gst_video::VideoFrameRef::from_buffer_ref_readable(
            inbuf.as_ref(),
            &state.in_info,
        ) {
            None => {
                gst_element_error!(
                    element,
                    gst::CoreError::Failed,
                    ["Failed to map input buffer readable"]
                );
                return gst::FlowReturn::Error;
            }
            Some(in_frame) => in_frame,
        };

        let mut out_frame =
            match gst_video::VideoFrameRef::from_buffer_ref_writable(outbuf, &state.out_info) {
                None => {
                    gst_element_error!(
                        element,
                        gst::CoreError::Failed,
                        ["Failed to map output buffer writable"]
                    );
                    return gst::FlowReturn::Error;
                }
                Some(out_frame) => out_frame,
            };

        let width = in_frame.width() as usize;
        let in_stride = in_frame.plane_stride()[0] as usize;
        let in_data = in_frame.plane_data(0).unwrap();
        let out_stride = out_frame.plane_stride()[0] as usize;
        let out_format = out_frame.format();
        let out_data = out_frame.plane_data_mut(0).unwrap();

        if out_format == gst_video::VideoFormat::Bgrx {
            assert_eq!(in_data.len() % 4, 0);
            assert_eq!(out_data.len() % 4, 0);
            assert_eq!(out_data.len() / out_stride, in_data.len() / in_stride);

            let in_line_bytes = width * 4;
            let out_line_bytes = width * 4;

            assert!(in_line_bytes <= in_stride);
            assert!(out_line_bytes <= out_stride);

            for (in_line, out_line) in in_data
                .chunks(in_stride)
                .zip(out_data.chunks_mut(out_stride))
            {
                for (in_p, out_p) in in_line[..in_line_bytes]
                    .chunks(4)
                    .zip(out_line[..out_line_bytes].chunks_mut(4))
                {
                    assert_eq!(out_p.len(), 4);

                    let gray = Rgb2Gray::bgrx_to_gray(in_p);
                    out_p[0] = gray;
                    out_p[1] = gray;
                    out_p[2] = gray;
                }
            }
        } else if out_format == gst_video::VideoFormat::Gray8 {
            assert_eq!(in_data.len() % 4, 0);
            assert_eq!(out_data.len() / out_stride, in_data.len() / in_stride);

            let in_line_bytes = width * 4;
            let out_line_bytes = width;

            assert!(in_line_bytes <= in_stride);
            assert!(out_line_bytes <= out_stride);

            for (in_line, out_line) in in_data
                .chunks(in_stride)
                .zip(out_data.chunks_mut(out_stride))
            {
                for (in_p, out_p) in in_line[..in_line_bytes]
                    .chunks(4)
                    .zip(out_line[..out_line_bytes].iter_mut())
                {
                    let gray = Rgb2Gray::bgrx_to_gray(in_p);
                    *out_p = gray;
                }
            }
        } else {
            unimplemented!();
        }

        gst::FlowReturn::Ok
    }
}

What happens here is that we first of all lock our state (the input/output VideoInfo) and error out if we don’t have any yet (which can’t really happen unless other elements have a bug, but better safe than sorry). After that we map the input buffer readable and the output buffer writable with the VideoFrameRef API. By mapping the buffers we get access to the underlying bytes of them, and the mapping operation could for example make GPU memory available or just do nothing and give us access to a normally allocated memory area. We have access to the bytes of the buffer until the VideoFrameRef goes out of scope.

Instead of VideoFrameRef we could’ve also used the gst::Buffer::map_readable() and gst::Buffer::map_writable() API, but different to those the VideoFrameRef API also extracts various metadata from the raw video buffers and makes them available. For example we can directly access the different planes as slices without having to calculate the offsets ourselves, or we get directly access to the width and height of the video frame.

After mapping the buffers, we store various information we’re going to need later in local variables to save some typing later. This is the width (same for input and output as we never changed the width in transform_caps), the input and out (row-) stride (the number of bytes per row/line, which possibly includes some padding at the end of each line for alignment reasons), the output format (which can be BGRx or GRAY8 because of how we implemented transform_caps) and the pointers to the first plane of the input and output (which in this case also is the only plane, BGRx and GRAY8 both have only a single plane containing all the RGB/gray components).

Then based on whether the output is BGRx or GRAY8, we iterate over all pixels. The code is basically the same in both cases, so I’m only going to explain the case where BGRx is output.

We start by iterating over each line of the input and output, and do so by using the chunks iterator to give us chunks of as many bytes as the (row-) stride of the video frame is, do the same for the other frame and then zip both iterators together. This means that on each iteration we get exactly one line as a slice from each of the frames and can then start accessing the actual pixels in each line.

To access the individual pixels in each line, we again use the chunks iterator the same way, but this time to always give us chunks of 4 bytes from each line. As BGRx uses 4 bytes for each pixel, this gives us exactly one pixel. Instead of iterating over the whole line, we only take the actual sub-slice that contains the pixels, not the whole line with stride number of bytes containing potential padding at the end. Now for each of these pixels we call our previously defined bgrx_to_gray function and then fill the B, G and R components of the output buffer with that value to get grayscale output. And that’s all.

Using Rust high-level abstractions like the chunks iterators and bounds-checking slice accesses might seem like it’s going to cause quite some performance penalty, but if you look at the generated assembly most of the bounds checks are completely optimized away and the resulting assembly code is close to what one would’ve written manually (especially when using the newly-added exact_chunks iterators). Here you’re getting safe and high-level looking code with low-level performance!

You might’ve also noticed the various assertions in the processing function. These are there to give further hints to the compiler about properties of the code, and thus potentially being able to optimize the code better and moving e.g. bounds checks out of the inner loop and just having the assertion outside the loop check for the same. In Rust adding assertions can often improve performance by allowing further optimizations to be applied, but in the end always check the resulting assembly to see if what you did made any difference.

Testing the new element

Now we implemented almost all functionality of our new element and could run it on actual video data. This can be done now with the gst-launch-1.0 tool, or any application using GStreamer and allowing us to insert our new element somewhere in the video part of the pipeline. With gst-launch-1.0 you could run for example the following pipelines

# Run on a test pattern
gst-launch-1.0 videotestsrc ! rsrgb2gray ! videoconvert ! autovideosink

# Run on some video file, also playing the audio
gst-launch-1.0 playbin uri=file:///path/to/some/file video-filter=rsrgb2gray

Note that you will likely want to compile with cargo build –release and add the target/release directory to GST_PLUGIN_PATH instead. The debug build might be too slow, and generally the release builds are multiple orders of magnitude (!) faster.

Properties

The only feature missing now are the properties I mentioned in the opening paragraph: one boolean property to invert the grayscale value and one integer property to shift the value by up to 255. Implementing this on top of the previous code is not a lot of work. Let’s start with defining a struct for holding the property values and defining the property metadata.

const DEFAULT_INVERT: bool = false;
const DEFAULT_SHIFT: u32 = 0;

#[derive(Debug, Clone, Copy)]
struct Settings {
    invert: bool,
    shift: u32,
}

impl Default for Settings {
    fn default() -> Self {
        Settings {
            invert: DEFAULT_INVERT,
            shift: DEFAULT_SHIFT,
        }
    }
}

static PROPERTIES: [Property; 2] = [
    Property::Boolean(
        "invert",
        "Invert",
        "Invert grayscale output",
        DEFAULT_INVERT,
        PropertyMutability::ReadWrite,
    ),
    Property::UInt(
        "shift",
        "Shift",
        "Shift grayscale output (wrapping around)",
        (0, 255),
        DEFAULT_SHIFT,
        PropertyMutability::ReadWrite,
    ),
];

struct Rgb2Gray {
    cat: gst::DebugCategory,
    settings: Mutex<Settings>,
    state: Mutex<Option<State>>,
}

impl Rgb2Gray {
    fn new(_transform: &BaseTransform) -> Box<BaseTransformImpl<BaseTransform>> {
        Box::new(Self {
            cat: gst::DebugCategory::new(
                "rsrgb2gray",
                gst::DebugColorFlags::empty(),
                "Rust RGB-GRAY converter",
            ),
            settings: Mutex::new(Default::default()),
            state: Mutex::new(None),
        })
    }
}

This should all be rather straightforward: we define a Settings struct that stores the two values, implement the Default trait for it, then define a two-element array with property metadata (names, description, ranges, default value, writability), and then store the default value of our Settings struct inside another Mutex inside the element struct.

In the next step we have to make use of these: we need to tell the GObject type system about the properties, and we need to implement functions that are called whenever a property value is set or get.

impl Rgb2Gray {
    fn class_init(klass: &mut BaseTransformClass) {
        [...]
        klass.install_properties(&PROPERTIES);
        [...]
    }
}

impl ObjectImpl<BaseTransform> for Rgb2Gray {
    fn set_property(&self, obj: &glib::Object, id: u32, value: &glib::Value) {
        let prop = &PROPERTIES[id as usize];
        let element = obj.clone().downcast::<BaseTransform>().unwrap();

        match *prop {
            Property::Boolean("invert", ..) => {
                let mut settings = self.settings.lock().unwrap();
                let invert = value.get().unwrap();
                gst_info!(
                    self.cat,
                    obj: &element,
                    "Changing invert from {} to {}",
                    settings.invert,
                    invert
                );
                settings.invert = invert;
            }
            Property::UInt("shift", ..) => {
                let mut settings = self.settings.lock().unwrap();
                let shift = value.get().unwrap();
                gst_info!(
                    self.cat,
                    obj: &element,
                    "Changing shift from {} to {}",
                    settings.shift,
                    shift
                );
                settings.shift = shift;
            }
            _ => unimplemented!(),
        }
    }

    fn get_property(&self, _obj: &glib::Object, id: u32) -> Result<glib::Value, ()> {
        let prop = &PROPERTIES[id as usize];

        match *prop {
            Property::Boolean("invert", ..) => {
                let settings = self.settings.lock().unwrap();
                Ok(settings.invert.to_value())
            }
            Property::UInt("shift", ..) => {
                let settings = self.settings.lock().unwrap();
                Ok(settings.shift.to_value())
            }
            _ => unimplemented!(),
        }
    }
}

Property values can be changed from any thread at any time, that’s why the Mutex is needed here to protect our struct. And we’re using a new mutex to be able to have it locked only for the shorted possible amount of time: we don’t want to keep it locked for the whole time of the transform function, otherwise applications trying to set/get values would block for up to one frame.

In the property setter/getter functions we are working with a glib::Value. This is a dynamically typed value type that can contain values of any type, together with the type information of the contained value. Here we’re using it to handle an unsigned integer (u32) and a boolean for our two properties. To know which property is currently set/get, we get an identifier passed which is the index into our PROPERTIES array. We then simply match on the name of that to decide which property was meant

With this implemented, we can already compile everything, see the properties and their metadata in gst-inspect-1.0 and can also set them on gst-launch-1.0 like this

# Set invert to true and shift to 128
gst-launch-1.0 videotestsrc ! rsrgb2gray invert=true shift=128 ! videoconvert ! autovideosink

If we set GST_DEBUG=rsrgb2gray:6 in the environment before running that, we can also see the corresponding debug output when the values are changing. The only thing missing now is to actually make use of the property values for the processing. For this we add the following changes to bgrx_to_gray and the transform function

impl Rgb2Gray {
    #[inline]
    fn bgrx_to_gray(in_p: &[u8], shift: u8, invert: bool) -> u8 {
        [...]

        let gray = ((r * R_Y) + (g * G_Y) + (b * B_Y)) / 65536;
        let gray = (gray as u8).wrapping_add(shift);

        if invert {
            255 - gray
        } else {
            gray
        }
    }
}

impl BaseTransformImpl<BaseTransform> for Rgb2Gray {
    fn transform(
        &self,
        element: &BaseTransform,
        inbuf: &gst::Buffer,
        outbuf: &mut gst::BufferRef,
    ) -> gst::FlowReturn {
        let settings = *self.settings.lock().unwrap();
        [...]
                    let gray = Rgb2Gray::bgrx_to_gray(in_p, settings.shift as u8, settings.invert);
        [...]
    }
}

And that’s all. If you run the element in gst-launch-1.0 and change the values of the properties you should also see the corresponding changes in the video output.

Note that we always take a copy of the Settings struct at the beginning of the transform function. This ensures that we take the mutex only the shorted possible amount of time and then have a local snapshot of the settings for each frame.

Also keep in mind that the usage of the property values in the bgrx_to_gray function is far from optimal. It means the addition of another condition to the calculation of each pixel, thus potentially slowing it down a lot. Ideally this condition would be moved outside the inner loops and the bgrx_to_gray function would made generic over that. See for example this blog post about “branchless Rust” for ideas how to do that, the actual implementation is left as an exercise for the reader.

What next?

I hope the code walkthrough above was useful to understand how to implement GStreamer plugins and elements in Rust. If you have any questions, feel free to ask them here in the comments.

The same approach also works for audio filters or anything that can be handled in some way with the API of the BaseTransform base class. You can find another filter, an audio echo filter, using the same approach here.

In the next blog post in this series I’ll show how to use another base class to implement another kind of element, but for the time being you can also check the GIT repository for various other element implementations.

GStreamer Rust bindings release 0.10.0 & gst-plugin release 0.1.0

Today I’ve released version 0.10.0 of the Rust GStreamer bindings, and after a journey of more than 1½ years the first release of the GStreamer plugin writing infrastructure crate “gst-plugin”.

Check the repositories¹² of both for more details, the code and various examples.

GStreamer Bindings

Some of the changes since the 0.9.0 release were already outlined in the previous blog post, and most of the other changes were also things I found while writing GStreamer plugins. For the full changelog, take a look at the CHANGELOG.md in the repository.

Other changes include

  • I went over the whole API in the last days, added any missing things I found, simplified API as it made sense, changed functions to take Option<_> if allowed, etc.
  • Bindings for using and writing typefinders. Typefinders are the part of GStreamer that try to guess what kind of media is to be handled based on looking at the bytes. Especially writing those in Rust seems worthwhile, considering that basically all of the GIT log of the existing typefinders consists of fixes for various kinds of memory-safety problems.
  • Bindings for the Registry and PluginFeature were added, as well as fixing the relevant API that works with paths/filenames to actually work on Paths
  • Bindings for the GStreamer Net library were added, allowing to build applications that synchronize their media of the network by using PTP, NTP or a custom GStreamer protocol (for which there also exists a server). This could be used for building video-walls, systems recording the same scene from multiple cameras, etc. and provides (depending on network conditions) up to < 1ms synchronization between devices.

Generally, this is something like a “1.0” release for me now (due to depending on too many pre-1.0 crates this is not going to be 1.0 anytime soon). The basic API is all there and nicely usable now and hopefully without any bugs, the known-missing APIs are not too important for now and can easily be added at a later time when needed. At this point I don’t expect many API changes anymore.

GStreamer Plugins

The other important part of this announcement is the first release of the “gst-plugin” crate. This provides the basic infrastructure for writing GStreamer plugins and elements in Rust, without having to write any unsafe code.

I started experimenting with using Rust for this more than 1½ years ago, and while a lot of things have changed in that time, this release is a nice milestone. In the beginning there were no GStreamer bindings and I was writing everything manually, and there were also still quite a few pieces of code written in C. Nowadays everything is in Rust and using the automatically generated GStreamer bindings.

Unfortunately there is no real documentation for any of this yet, there’s only the autogenerated rustdoc documentation available from here, and various example GStreamer plugins inside the GIT repository that can be used as a starting point. And various people already wrote their GStreamer plugins in Rust based on this.

The basic idea of the API is however that everything is as Rust-y as possible. Which might not be too much due to having to map subtyping, virtual methods and the like to something reasonable in Rust, but I believe it’s nice to use now. You basically only have to implement one or more traits on your structs, and that’s it. There’s still quite some boilerplate required, but it’s far less than what would be required in C. The best example at this point might be the audioecho element.

Over the next days (or weeks?) I’m not going to write any documentation yet, but instead will write a couple of very simple, minimal elements that do basically nothing and can be used as starting points to learn how all this works together. And will write another blog post or two about the different parts of writing a GStreamer plugin and element in Rust, so that all of you can get started with that.

Let’s hope that the number of new GStreamer plugins written in C is going to decrease in the future, and maybe even new people who would’ve never done that in C, with all the footguns everywhere, can get started with writing GStreamer plugins in Rust now.

A GStreamer Plugin like the Rec Button on your Tape Recorder – A Multi-Threaded Plugin written in Rust

As Rust is known for “Fearless Concurrency”, that is being able to write concurrent, multi-threaded code without fear, it seemed like a good fit for a GStreamer element that we had to write at Centricular.

Previous experience with Rust for writing (mostly) single-threaded GStreamer elements and applications (also multi-threaded) were all quite successful and promising already. And in the end, this new element was also a pleasure to write and probably faster than doing the equivalent in C. For the impatient, the code, tests and a GTK+ example application (written with the great Rust GTK bindings, but the GStreamer element is also usable from C or any other language) can be found here.

What does it do?

The main idea of the element is that it basically works like the rec button on your tape recorder. There is a single boolean property called “record”, and whenever it is set to true it will pass-through data and whenever it is set to false it will drop all data. But different to the existing valve element, it

  • Outputs a contiguous timeline without gaps, i.e. there are no gaps in the output when not recording. Similar to the recording you get on a tape recorder, you don’t have 10s of silence if you didn’t record for 10s.
  • Handles and synchronizes multiple streams at once. When recording e.g. a video stream and an audio stream, every recorded segment starts and stops with both streams at the same time
  • Is key-frame aware. If you record a compressed video stream, each recorded segment starts at a keyframe and ends right before the next keyframe to make it most likely that all frames can be successfully decoded

The multi-threading aspect here comes from the fact that in GStreamer each stream usually has its own thread, so in this case the video stream and the audio stream(s) would come from different threads but would have to be synchronized between each other.

The GTK+ example application for the plugin is playing a video with the current playback time and a beep every second, and allows to record this as an MP4 file in the current directory.

How did it go?

This new element was again based on the Rust GStreamer bindings and the infrastructure that I was writing over the last year or two for writing GStreamer plugins in Rust.

As written above, it generally went all fine and was quite a pleasure but there were a few things that seem noteworthy. But first of all, writing this in Rust was much more convenient and fun than writing it in C would’ve been, and I’ve written enough similar code in C before. It would’ve taken quite a bit longer, I would’ve had to debug more problems in the new code during development (there were actually surprisingly few things going wrong during development, I expected more!), and probably would’ve written less exhaustive tests because writing tests in C is just so inconvenient.

Rust does not prevent deadlocks

While this should be clear, and was also clear to myself before, this seems like it might need some reiteration. Safe Rust prevents data races, but not all possible bugs that multi-threaded programs can have. Rust is not magic, only a tool that helps you prevent some classes of potential bugs.

For example, you can’t just stop thinking about lock order if multiple mutexes are involved, or that you can carelessly use condition variables without making sure that your conditions actually make sense and accessed atomically. As a wise man once said, “the safest program is the one that does not run at all”, and a deadlocking program is very close to that.

The part about condition variables might be something that can be improved in Rust. Without this, you can easily end up in situations where you wait forever or your conditions are actually inconsistent. Currently Rust’s condition variables only require a mutex to be passed to the functions for waiting for the condition to be notified, but it would probably also make sense to require passing the same mutex to the constructor and notify functions to make it absolutely clear that you need to ensure that your conditions are always accessed/modified while this specific mutex is locked. Otherwise you might end up in debugging hell.

Fortunately during development of the plugin I only ran into a simple deadlock, caused by accidentally keeping a mutex locked for too long and then running into conflict with another one. Which is probably an easy trap if the most common way of unlocking a mutex is to let the mutex lock guard fall out of scope. This makes it impossible to forget to unlock the mutex, but also makes it less explicit when it is unlocked and sometimes explicit unlocking by manually dropping the mutex lock guard is still necessary.

So in summary, while a big group of potential problems with multi-threaded programs are prevented by Rust, you still have to be careful to not run into any of the many others. Especially if you use lower-level constructs like condition variables, and not just e.g. channels. Everything is however far more convenient than doing the same in C, and with more support by the compiler, so I definitely prefer writing such code in Rust over doing the same in C.

Missing API

As usual, for the first dozen projects using a new library or new bindings to an existing library, you’ll notice some missing bits and pieces. That I missed relatively core part of GStreamer, the GstRegistry API, was surprising nonetheless. True, you usually don’t use it directly and I only need to use it here for loading the new plugin from a non-standard location, but it was still surprising. Let’s hope this was the biggest oversight. If you look at the issues page on GitHub, you’ll find a few other things that are still missing though. But nobody needed them yet, so it’s probably fine for the time being.

Another part of missing APIs that I noticed during development was that many manual (i.e. not auto-generated) bindings didn’t have the Debug trait implemented, or not in a too useful way. This is solved now, as otherwise I wouldn’t have been able to properly log what is happening inside the element to allow easier debugging later if something goes wrong.

Apart from that there were also various other smaller things that were missing, or bugs (see below) that I found in the bindings while going through all these. But those seem not very noteworthy – check the commit logs if you’re interested.

Bugs, bugs, bgsu

I also found a couple of bugs in the bindings. They can be broadly categorized in two categories

  • Annotation bugs in GStreamer. The auto-generated parts of the bindings are generated from an XML description of the API, that is generated from the C headers and code and annotations in there. There were a couple of annotations that were wrong (or missing) in GStreamer, which then caused memory leaks in my case. Such mistakes could also easily cause memory-safety issues though. The annotations are fixed now, which will also benefit all the other language bindings for GStreamer (and I’m not sure why nobody noticed the memory leaks there before me).
  • Bugs in the manually written parts of the bindings. Similarly to the above, there was one memory leak and another case where a function could’ve returned NULL but did not have this case covered on the Rust-side by returning an Option<_>.

Generally I was quite happy with the lack of bugs though, the bindings are really ready for production at this point. And especially, all the bugs that I found are things that are unfortunately “normal” and common when writing code in C, while Rust is preventing exactly these classes of bugs. As such, they have to be solved only once at the bindings layer and then you’re free of them and you don’t have to spent any brain capacity on their existence anymore and can use your brain to solve the actual task at hand.

Inconvenient API

Similar to the missing API, whenever using some rather new API you will find things that are inconvenient and could ideally be done better. The biggest case here was the GstSegment API. A segment represents a (potentially open-ended) playback range and contains all the information to convert timestamps to the different time bases used in GStreamer. I’m not going to get into details here, best check the documentation for them.

A segment can be in different formats, e.g. in time or bytes. In the C API this is handled by storing the format inside the segment, and requiring you to pass the format together with the value to every function call, and internally there are some checks then that let the function fail if there is a format mismatch. In the previous version of the Rust segment API, this was done the same, and caused lots of unwrap() calls in this element.

But in Rust we can do better, and the new API for the segment now encodes the format in the type system (i.e. there is a Segment<Time>) and only values with the correct type (e.g. ClockTime) can be passed to the corresponding functions of the segment. In addition there is a type for a generic segment (which still has all the runtime checks) and functions to “cast” between the two.

Overall this gives more type-safety (the compiler already checks that you don’t mix calculations between seconds and bytes) and makes the API usage more convenient as various error conditions just can’t happen and thus don’t have to be handled. Or like in C, are simply ignored and not handled, potentially leaving a trap that can cause hard to debug bugs at a later time.

That Rust requires all errors to be handled makes it very obvious how many potential error cases the average C code out there is not handling at all, and also shows that a more expressive language than C can easily prevent many of these error cases at compile-time already.

GStreamer Rust bindings release 0.9

About 3 months, a GStreamer Conference and two bug-fix releases have passed now since the GStreamer Rust bindings release 0.8.0. Today version 0.9.0 (and 0.9.1 with a small bugfix to export some forgotten types) with a couple of API improvements and lots of additions and cleanups was released. This new version depends on the new set of releases of the gtk-rs crates (glib/etc).

The full changelog can be found here, but below is a short overview of the (in my opinion) most interesting changes.

Tutorials

The basic tutorials 1 to 8 were ported from C to Rust by various contributors. The C versions and the corresponding explanatory text can be found here, and it should be relatively easy to follow the text together with the Rust code.

This should make learning to use GStreamer from Rust much easier, in combination with the few example applications that exist in the repository.

Type-safety Improvements

Previously querying the current playback position from a pipeline (and various other things analogous) was giving you a plain 64-bit integer, just like in C. However in Rust we can easily do better.

The main problem with just getting an integer was that there are “special” values that have the meaning of “no value known”, specifically GST_CLOCK_TIME_NONE for values in time. In C this often causes bugs by code ignoring this special case and then doing calculations with such a value, resulting in completely wrong numbers. In the Rust bindings these are now expressed as an Option<_> so that the special case has to be handled separately, and in combination with that for timed values there is a new type called ClockTime that is implementing all the arithmetic traits and others so you can still do normal arithmetic operations on the values, while the implementation of those operations takes care of GST_CLOCK_TIME_NONE. Also it was previously easy to get a value in bytes and add it to a value in time. Whenever multiple formats are possible, a new type called FormatValue is now used that combines the value itself with its format to prevent such mistakes.

Error Handling

Various operations in GStreamer can fail with a custom enum type: link pads (PadLinkReturn), pushing a buffer (FlowReturn), changing an element’s state (StateChangeReturn). Previously handling this was not as convenient as the usual Result-based error handling in Rust. With this release, all these types provide a function into_result() that allows to convert into a Result that splits the enum into its good and bad cases, e.g. FlowSuccess and FlowError. Based on this, the usual Rust error handling is possible, including usage of the ?-operator. Once the Try trait is stable, it will also be possible to directly use the ?-operator on FlowReturn and the others before conversion into a Result.

All these enums are also marked as #[must_use] now, which causes a compiler warning if code is not specifically handling them (which could mean to explicitly ignore them), making it even harder to ignore errors caused by any failures of such operations.

In addition, all the examples and tutorials make use of the above now and many examples were ported to the failure crate and implement proper error handling in all situations now, for example the decodebin example.

Various New API

Apart from all of the above, a lot of new API was added. Both for writing GStreamer-based applications, and making that easier, as well as for writing GStreamer plugins in Rust. For the latter, the gst-plugin-rs repository with various crates (and plugins) was ported to the GStreamer bindings and completely rewritten, but more on that in another blog post in the next couple of days once the gst-plugin crate is released and published on crates.io.

Multi-threaded raw video conversion and scaling in GStreamer

Another new feature that landed in GStreamer already a while ago, and is included in the 1.12 release, is multi-threaded raw video conversion and scaling. The short story is that it lead to e.g. 3.2x speed-up converting 1080p video to 4k with 4 cores.

I had a few cases where a single core was not able to do rescaling in real-time anymore, even on a quite fast machine. One of the cases was 60fps 4k video in the v210 (10 bit YUV) color format, which is a lot of bytes per second in a not very processing-friendly format. GStreamer’s video converter and scaler is already quite optimized and using SIMD instructions like SSE or Neon, so there was not much potential for further optimizations in that direction.
However basically every machine nowadays has multiple CPU cores that could be used and raw video conversion/scaling is an almost perfectly parallelizable problem, and the way how the conversion code was already written it was relatively easy to add.

The way it works now is similar to the processing model of libraries like OpenMP or Rayon. The whole work is divided into smaller, equal sub-problems that are then handled in parallel, then it is waiting until all parts are done and the result is combined. In our specific case that means that each plane of the video frame is cut into 2, 4, or more slices of full rows, which are then converted separately. The “combining” step does not exist, all sub-conversions are directly written to the correct place in the output already.

As a small helper object for this kind of processing model, I wrote GstParallelizedTaskRunner which might also be useful for other pieces of code that want to do the same.

In the end it was not much work, but the results were satisfying. For example the conversion of 1080p to 4k video in the v210 color format with 4 threads gave a speedup of 3.2x. At that point it looks like the main bottleneck was memory bandwidth, but I didn’t look closer as this is already more than enough for the use cases I was interested in.