Porting EBU R128 audio loudness analysis from C to Rust

Over the last few weeks I ported the libebur128 C library to Rust, both with a proper Rust API as well as a 100% compatible C API.

This blog post will be split into 4 parts that will be published over the next weeks

  1. Overview and motivation
  2. Porting approach with various details, examples and problems I ran into along the way
  3. Performance optimizations
  4. Building Rust code into a C library as drop-in replacement

If you’re only interested in the code, that can be found on GitHub and in the ebur128 crate on crates.io.

The initial versions of the ebur128 crate was built around the libebur128 C library (and included its code for ease of building), version 0.1.2 and newer is the pure Rust implementation.

EBU R128

libebur128 implements the EBU R128 loudness standard. The Wikipedia page gives a good summary of the standard, but in short it describes how to measure loudness of an audio signal and how to use this for loudness normalization.

While this intuitively doesn’t sound very complicated, there are lots of little details (like how human ears are actually working) that make this not as easy as one might expect. This results in there being many different ways for measuring loudness and is one of the reasons why this standard was introduced. Of course it is also not the only standard for this.

libebur128 is also the library that I used in the GStreamer loudness normalization plugin, about which I wrote a few weeks ago already. By porting the underlying loudness measurement code to Rust, the only remaining C dependency of that plugin is GStreamer itself.

Apart from that it is used by FFmpeg, but they include their own modified copy, as well as many other projects that need some kind of loudness measurement and don’t use ReplayGain, another older but widely used standard for the same problem.

Why?

Before going over the details of what I did, let me first explain why I did this work at all. libebur128 is a perfectly well working library, in wide use for a long time and probably rather bug-free at this point and it was already possible to use the C implementation from Rust just fine. That’s what the initial versions of the ebur128 crate were doing.

My main reason for doing this was simply because it seemed like a fun little project. It isn’t a lot of code that is changing often so once ported it should be more or less finished and it shouldn’t be much work to stay in sync with the C version. I started thinking about doing this already after the initial release of the C-based ebur128 release, but after reading Joe Neeman’s blog post about porting another C audio library (RNNoise) to Rust this gave me the final push to actually start with porting the code and to follow through until it’s done.

However, don’t go around and ask other people to rewrite their projects in Rust (don’t be rude) or think that your own rewrite is magically going to be much faster and less buggy than the existing implementation. While Rust saves you from a big class of possible bugs, it doesn’t save you from yourself and usually rewrites contain bugs that didn’t exist in the original implementation. Also getting good performance in Rust requires, like in every other language, some effort. Before rewriting any software, think about the goals of this rewrite realistically as well as the effort required to actually get it finished.

Apart from fun there were also a few technical and non-technical reasons for me to look into this. I’m going to just list two here (curiosity and portability). I will skip the usual Rust memory-safety argument as that seems less important with this code: the C code is widely used for a long time, not changing a lot and has easy to follow memory access patterns. While it definitely had a memory safety bug (see above), it was rather difficult to trigger and it was fixed in the meantime.

Curiosity

Personally and at my company Centricular we try to do any new projects where it makes sense in Rust. While this worked very well in the past and we got great results, there were some questions for future projects that I wanted to get some answers, hard data and personal experience for

  • How difficult is it to port a C codebase function by function to Rust while keeping everything working along the way?
  • How difficult is it to get the same or better performance with idiomatic Rust code for low-level media processing code?
  • How much bigger or smaller is the resulting code and do Rust’s higher-level concepts like iterators help to keep code concise?
  • How difficult is it to create a C-compatible library in Rust with the same API and ABI?

I have some answers to all these questions already but previous work on this was not well structured and the results were also not documented, which I’m trying to change here now. Both to have a reference for myself in the future as well as for convincing other people that Rust is a reasonable technology choice for such projects.

As you can see the general pattern of these questions are introducing Rust into an existing codebase, replacing existing components with Rust and writing new components in Rust, which is also relates to my work on the Rust GStreamer bindings.

Portability

C is a very old language and while there is a standard, each compiler has its own quirks and each platform different APIs on top of the bare minimum that the C standard defines. C itself is very portable, but it is not easy to write portable C code, especially when not using a library like GLib that hides these differences and provides basic data structures and algorithms.

This seems to be something that is often forgotten when the portability of C is given as an argument against Rust, and that’s the reason why I wanted to mention this here specifically. While you can get a C compiler basically everywhere, writing C code that also runs well everywhere is another story and C doesn’t make this easy by design. Rust on the other hand makes writing portable code quite easy in my experience.

In practice there were three specific issues I had for this codebase. Most of the advantages of Rust here are because it is a new language and doesn’t have to carry a lot of historical baggage.

Mathematical Constants and Functions

Mathematical constants are not actually part of any C standard. While most compilers just define M_PI (for π), M_E (for 𝖾) and others in math.h nonetheless as they’re defined by POSIX and UNIX98.

Microsoft’s MSVC doesn’t, but instead you have to #define _USE_MATH_DEFINES before including math.h.

While not a big problem per-se, it is annoying and indeed caused the initial version of the ebur128 Rust crate to not compile with MSVC because I forgot about it.

Similarly, which mathematical functions are available depends a lot on the target platform and which version of the C standard is supported. An example of this is the log10 function to calculate the base-10 logarithm. For portability reasons, libebur128 didn’t use it but instead calculated it via the natural logarithm (ln(x) / ln(10) = log10(x)) because it’s only available in POSIX and since C99. While C99 is from 1999, there are still many compilers out there that don’t fully support it, again most prominently MSVC until very recently.

Using log10 instead of going via the natural logarithm is faster and more precise due to floating point number reasons, which is why the Rust implementation uses it but in C it would be required to check at build-time if the function is available or not, which complicates the build process and can easily be forgotten. libebur128 decided to not bother with these complications and simply not use it. Because of that, some conditional code in the Rust implementation is necessary for ensuring that both implementations return the same results in the tests.

Data Structures

libebur128 uses a linked-list-based queue data structure. As the C standard library is very minimal, no collection data structures are included. However on the BSDs and also on Linux with the GNU C library there is one available in sys/queue.h.

Of course MSVC does not have this and other compilers/platforms probably won’t have it either, so libebur128 included a local copy of that queue implementation. Now when building, one has to decide whether there is a system implementation available or otherwise use the internal version. Or simply always use the internal version.

Copying implementations of basic data structures and algorithms into every single project is ugly and error-prone, so let’s maybe not do that. C not having a standardized mechanism for dependency handling doesn’t help with this, which is unfortunately why this is very common in C projects.

One-time Initialization

Thread-safe one-time initialization is another thing that is not defined by the C standard, and depending on your platform there are different APIs available for it or none at all. POSIX again defines one that is widely available, but you can’t really depend on it unconditionally.

This complicates the code and build procedure, so libebur128 simply did not do that and did its one-time initializations of some global arrays every time a new instance was created. Which is probably fine, but a bit wasteful and probably strictly-speaking according to the C standard not actually thread-safe.

The initial version of the ebur128 Rust crate side-stepped this problem by simply doing this initialization once with the API provided by the Rust standard library. See part 2 and part 3 of this blog post for some more details about this.

Easier to Compile and Integrate

A Rust port only requires a Rust compiler, a mixed C/Rust codebase requires at least a C compiler in addition and some kind of build system for the C code.

libebur128 uses CMake, which would be an additional dependency so in the initial version of the ebur128 crate I went via cargo‘s build.rs build scripts and the cc crate as building libebur128 is easy enough. This works but build scripts are problematic for integration of the Rust code into other build systems than cargo.

The Rust port also makes use of conditional compilation in various places. Unlike in C with the preprocessor, non-standardized and inconsistent platform #defines and it being necessary to integrate everything in a custom way into the build system, Rust has a principled and well-designed approach to this problem. This makes it easier to keep the code clean, easier to maintain and more portable.

In addition to build system related simplifications, by not having any C code it is also much easier to compile the code to other targets like WebAssembly, which is natively supported by Rust. It is also possible to compile C to WebAssembly but getting both toolchains to agree with each other and produce compatible code seems not very easy.

Overview

As mentioned above, the code can be found on GitHub and in the ebur128 crate on crates.io.

The current version of the code produces the exact same results as the C version. This is enforced by the quickcheck tests that are running randomized inputs through both versions and check that the results are the same. The code also succeeds all the tests in the EBU loudness test set, so should hopefully be standards compliant as long as the test implementation is not wrong.

Performance-wise the Rust implementation is at least as fast as the C implementation. In some configurations it’s a few percent faster but probably not enough that it actually matters in practice. There are various benchmarks for both versions in different configurations available. The benchmarks are based on the criterion crate, which uses statistical methods to give as accurate as possible results. criterion also generates nice results with graphs for making analysis of the results more pleasant. See part 3 of this blog post for more details.

Writing tests and benchmarks for Rust is so much easier and feels more natural then doing it in C, so the Rust implementation has quite good coverage of the different code paths now. Especially no struggling with build systems was necessary like it would have been in C thanks to cargo and Rust having built-in support. This alone seems to have the potential to cause Rust code having, on average, better quality than similar code written in C.

It is also possible to compile the Rust implementation into a C library with the great cargo-c tool. This easily builds the code as a static/dynamic C library and installs the library, a C header file and also a pkg-config file. With this the Rust implementation is a 100% drop-in replacement of the C libebur128. It is not even necessary to recompile existing code. See part 4 of this blog post for more details.

Dependencies

Apart from the Rust standard library the Rust implementation depends on two other, small and widely used crates. Unlike with C, depending on external dependencies is rather simple with Rust and cargo. The two crates in question are

  • smallvec for a dynamically sized vectors/arrays that can be stored on the stack up to a certain size and only then fall back to heap allocations. This allows to avoid a couple of heap allocations under normal usage.
  • bitflags, which provides a macro for implementing properly typed bitflags. This is used in the constructor of the main type for selecting the features and modes that should be enabled, which directly maps to how the C API works (just with less type-safety).

Unsafe Code

A common question when announcing a Rust port of some C library is how much unsafe code was necessary to reach the same performance as the C code. In this case there are two uses of unsafe code outside the FFI code to call the C implementation in the tests/benchmarks and the C API.

Resampler

The True Peak measurement is using a resampler to upsample the audio signal to a higher sample rate. As part of the most inner loop of the resampler a statically sized ringbuffer is used.

As part of that ringbuffer, explicit indexing of a slice is needed. While the indexes are already manually checked to wrap around when needed, the Rust compiler and LLVM can’t figure that out so additional bounds checks plus panic handling is present in the compiled code. Apart from slowing down the loop with the additional condition, the panic code also causes the whole loop to be optimized less well.

So to get around that, unsafe indexing into the slice is used for performance reasons. While it requires a human now to check the memory safety of the code instead of relying on the compiler, the code in question is simple and small enough that it shouldn’t be a problem in practice.

More on this in part 2 and part 3 of this blog post.

Flushing Denormals to Zero

The other use of unsafe code is in the filter that is applied to the incoming audio signal. On x86/x86-64 the MXCSR register temporarily gets the _MM_FLUSH_ZERO_ON bit set to flush denormal floating point number to zero. That is, denormals (i.e. very small numbers close to zero) as result of any floating point operation are considered as zero.

This happens both for performance reasons as well as correctness reasons. Operations on denormals are generally much slower than on normalized floating point numbers. This has a measurable impact on the performance in this case.

Also as the C library does the same and not flushing denormals to zero would lead to slightly different results. While this difference doesn’t matter in practice as it’s very very small, it would make it harder to compare the results of both implementations as they wouldn’t be as close to each other anymore.

Doing this affects every floating point operation that happens while that bit is set, but because these are only the floating point operations performed by this crate and it’s guaranteed that the bit is unset again (even in case of panics) before leaving the filter, this shouldn’t cause any problems for other code.

Additional Features

Once the C library was ported and performance was comparable to the C implementation, I shortly checked the issues reported on the C library to check if there’s any useful feature requests or bug reports that I could implement / fix in the Rust implementation. There were three, one of which I also wanted for a future project.

None of the new features are available via the C API at this point for compatibility reasons.

Resetting the State

For this one there was a PR already for the C library. Previously the only way to reset all measurements was to create a new instance, which involves new memory allocations, filter initialization, etc..

It’s easy enough to provide a reset method to do only the minimal work required to reset all measurements and restart with a fresh state so I’ve added that to the Rust implementation.

Fix set_max_window() to actually work

This was a bug introduced in the C implementation a while ago in an attempt to prevent integer overflows when calculating sizes of memory allocations, which then would cause memory safety bugs because less memory was allocated than expected. Accidentally this fix restricted the allowed values for the maximum window size too much. There is a PR for fixing this in the C implementation.

On the Rust side this bug also existed because I simply ported over the checks. If I hadn’t ported over the checks, or ported an earlier version without the checks, there fortunately wouldn’t have been any memory safety bug on the Rust side though but instead one of two situations would have happened instead

  1. In debug builds integer overflows cause a panic, so instead of allocating less memory than expected during the setting of the parameters there would’ve been a panic immediately instead of invalid memory accesses later.
  2. In release builds integer overflows simply wrap around for performance reasons. This would’ve caused less memory than expected to be allocated, but later when trying to access the memory there would’ve been a panic when trying to access memory outside the allocated area.

While a panic is also not nice, it at least leads to no undefined behaviour and prevents worse things from happening.

The proper fix in this case was to not restrict the maximum window size statically but to instead check for overflows during the calculations. This is the same the PR for the C implementation does, but on the Rust side this is much easier because of built-in operations like checked_mul for doing an overflow-checking multiplication. In C this requires some rather convoluted code (check the PR for details).

Support for Planar Audio Input

The last additional feature that I implemented was support for planar audio input, for which also a PR to the C implementation exists already.

Most of the time audio signals have the samples of each channel interleaved with each other, so for example for stereo you have an array of samples with the first sample for the left channel, the first sample for the right channel, the second sample for the left channel, etc.. While this representation has some advantages, in other situations it is easier or faster to work with planar audio: the samples of each channel are contiguous one after another, so you have e.g. first all the samples of the left channel one after another and only then all samples of the right channel.

The PR for the C implementation does this with some code duplication of existing macro code (which can be prevented by making the macros more complicated), on the Rust side I implemented this without any code duplication by adding an internal abstraction for interleaved/planar audio and iterating over the samples and then working with that in normal, generic Rust code. This required some minor refactoring and code reorganization but in the end was rather painless. Note that most of the change is addition of new tests and moving some code around.

When looking at the Samples trait, the main part of this refactoring, one might wonder why I used closures instead of Rust iterators for iterating over the samples and the reason is unfortunately performance. More on this in part 3 of this blog post.

Next Part

In the next part of this blog post I will describe the porting approach in detail and also give various examples for how to port C code to idiomatic Rust, and some examples of problems I was running into.

Automatic retry on error and fallback stream handling for GStreamer sources

A very common problem in GStreamer, especially when working with live network streams, is that the source might just fail at some point. Your own network might have problems, the source of the stream might have problems, …

Without any special handling of such situations, the default behaviour in GStreamer is to simply report an error and let the application worry about handling it. The application might for example want to restart the stream, or it might simply want to show an error to the user, or it might want to show a fallback stream instead, telling the user that the stream is currently not available and then seamlessly switch back to the stream once it comes back.

Implementing all of the aforementioned is quite some effort, especially to do it in a robust way. To make it easier for applications I implemented a new plugin called fallbackswitch that contains two elements to automate this.

It is part of the GStreamer Rust plugins and also included in the recent 0.6.0 release, which can also be found on the Rust package (“crate”) repository crates.io.

Installation

For using the plugin you most likely first need to compile it yourself, unless you’re lucky enough that e.g. your Linux distribution includes it already.

Compiling it requires a Rust toolchain and GStreamer 1.14 or newer. The former you can get via rustup for example, if you don’t have it yet, the latter either from your Linux distribution or by using the macOS, Windows, etc binaries that are provided by the GStreamer project. Once that is done, compiling is mostly a matter of running cargo build in the utils/fallbackswitch directory and copying the resulting libgstfallbackswitch.so (or .dll or .dylib) into one of the GStreamer plugin directories, for example ~/.local/share/gstreamer-1.0/plugins.

fallbackswitch

The first of the two elements is fallbackswitch. It acts as a filter that can be placed into any kind of live stream. It consumes one main stream (which must be live) and outputs this stream as-is if everything works well. Based on the timeout property it detects if this main stream didn’t have any activity for the configured amount of time, or everything arrived too late for that long, and then seamlessly switches to a fallback stream. The fallback stream is the second input of the element and does not have to be live (but it can be).

Switching between main stream and fallback stream doesn’t only work for raw audio and video streams but also works for compressed formats. The element will take constraints like keyframes into account when switching, and if necessary/possible also request new keyframes from the sources.

For example to play the Sintel trailer over the network and displaying a test pattern if it doesn’t produce any data, the following pipeline can be constructed:

gst-launch-1.0 souphttpsrc location=https://www.freedesktop.org/software/gstreamer-sdk/data/media/sintel_trailer-480p.webm ! \
    decodebin ! identity sync=true ! fallbackswitch name=s ! videoconvert ! autovideosink \
    videotestsrc ! s.fallback_sink

Note the identity sync=true in the main stream here as we have to convert it to an actual live stream.

Now when running the above command and disconnecting from the network, the video should freeze at some point and after 5 seconds a test pattern should be displayed.

However, when using fallbackswitch the application will still have to take care of handling actual errors from the main source and possibly restarting it. Waiting a bit longer after disconnecting the network with the above command will report an error, which then stops the pipeline.

To make that part easier there is the second element.

fallbacksrc

The second element is fallbacksrc and as the name suggests it is an actual source element. When using it, the main source can be configured via an URI or by providing a custom source element. Internally it then takes care of buffering the source, converting non-live streams into live streams and restarting the source transparently on errors. The various timeouts for this can be configured via properties.

Different to fallbackswitch it also handles audio and video at the same time and demuxes/decodes the streams.

Currently the only fallback streams that can be configured are still images for video. For audio the element will always output silence for now, and if no fallback image is configured for video it outputs black instead. In the future I would like to add support for arbitrary fallback streams, which hopefully shouldn’t be too hard. The basic infrastructure for it is already there.

To use it again in our previous example and having a JPEG image displayed whenever the source does not produce any new data, the following can be done:

gst-launch-1.0 fallbacksrc uri=https://www.freedesktop.org/software/gstreamer-sdk/data/media/sintel_trailer-480p.webm \
    fallback-uri=file:///path/to/some/jpg ! videoconvert ! autovideosink

Now when disconnecting the network, after a while (longer than before because fallbacksrc does additional buffering for non-live network streams) the fallback image should be shown. Different to before, waiting longer will not lead to an error and reconnecting the network causes the video to reappear. However as this is not an actual live-stream, right now playback would again start from the beginning. Seeking back to the previous position would be another potential feature that could be added in the future.

Overall these two elements should make it easier for applications to handle errors in live network sources. While the two elements are still relatively minimal feature-wise, they should already be usable in various real scenarios and are already used in production.

As usual, if you run into any problems or are missing some features, please create an issue in the GStreamer bug tracker.

GStreamer Rust Bindings & Plugins New Releases

It has been quite a while since the last status update for the GStreamer Rust bindings and the GStreamer Rust plugins, so the new releases last week make for a good opportunity to do so now.

Bindings

I won’t write too much about the bindings this time. The latest version as of now is 0.16.1, which means that since I started working on the bindings there were 8 major releases. In that same time there were 45 contributors working on the bindings, which seems quite a lot and really makes me happy.

Just as before, I don’t think any major APIs are missing from the bindings anymore, even for implementing subclasses of the various GStreamer types. The wide usage of the bindings in Free Software projects and commercial products also shows both the interest in writing GStreamer applications and plugins in Rust as well as that the bindings are complete enough and production-ready.

Most of the changes since the last status update involve API cleanups, usability improvements, various bugfixes and addition of minor API that was not included before. The details of all changes can be read in the changelog.

The bindings work with any GStreamer version since 1.8 (released more than 4 years ago), support APIs up to GStreamer 1.18 (to be released soon) and work with Rust 1.40 or newer.

Plugins

The biggest progress probably happened with the GStreamer Rust plugins.

There also was a new release last week, 0.6.0, which was the first release where selected plugins were also uploaded to the Rust package (“crate”) database crates.io. This makes it easy for Rust applications to embed any of these plugins statically instead of depending on them to be available on the system.

Overall there are now 40 GStreamer elements in 18 plugins by 28 contributors available as part of the gst-plugins-rs repository, one tutorial plugin with 4 elements and various plugins in external locations.

These 40 GStreamer elements are the following:

Audio
  • rsaudioecho: Port of the audioecho element from gst-plugins-good
  • rsaudioloudnorm: Live audio loudness normalization element based on the FFmpeg af_loudnorm filter
  • claxondec: FLAC lossless audio codec decoder element based on the pure-Rust claxon implementation
  • csoundfilter: Audio filter that can use any filter defined via the Csound audio programming language
  • lewtondec: Vorbis audio decoder element based on the pure-Rust lewton implementation
Video
  • cdgdec/cdgparse: Decoder and parser for the CD+G video codec based on a pure-Rust CD+G implementation, used for example by karaoke CDs
  • cea608overlay: CEA-608 Closed Captions overlay element
  • cea608tott: CEA-608 Closed Captions to timed-text (e.g. VTT or SRT subtitles) converter
  • tttocea608: CEA-608 Closed Captions from timed-text converter
  • mccenc/mccparse: MacCaption Closed Caption format encoder and parser
  • sccenc/sccparse: Scenarist Closed Caption format encoder and parser
  • dav1dec: AV1 video decoder based on the dav1d decoder implementation by the VLC project
  • rav1enc: AV1 video encoder based on the fast and pure-Rust rav1e encoder implementation
  • rsflvdemux: Alternative to the flvdemux FLV demuxer element from gst-plugins-good, not feature-equivalent yet
  • rsgifenc/rspngenc: GIF/PNG encoder elements based on the pure-Rust implementations by the image-rs project
Text
  • textwrap: Element for line-wrapping timed text (e.g. subtitles) for better screen-fitting, including hyphenation support for some languages
Network
  • reqwesthttpsrc: HTTP(S) source element based on the Rust reqwest/hyper HTTP implementations and almost feature-equivalent with the main GStreamer HTTP source souphttpsrc
  • s3src/s3sink: Source/sink element for the Amazon S3 cloud storage
  • awstranscriber: Live audio to timed text transcription element using the Amazon AWS Transcribe API
Generic
  • sodiumencrypter/sodiumdecrypter: Encryption/decryption element based on libsodium/NaCl
  • togglerecord: Recording element that allows to pause/resume recordings easily and considers keyframe boundaries
  • fallbackswitch/fallbacksrc: Elements for handling potentially failing (network) sources, restarting them on errors/timeout and showing a fallback stream instead
  • threadshare: Set of elements that provide alternatives for various existing GStreamer elements but allow to share the streaming threads between each other to reduce the number of threads
  • rsfilesrc/rsfilesink: File source/sink elements as replacements for the existing filesrc/filesink elements

Live loudness normalization in GStreamer & experiences with porting a C audio filter to Rust

A few months ago I wrote a new GStreamer plugin: an audio filter for live loudness normalization and automatic gain control.

The plugin can be found as part of the GStreamer Rust plugin in the audiofx plugin. It’s also included in the recent 0.6.0 release of the GStreamer Rust plugins and available from crates.io.

Its code is based on Kyle Swanson’s great FFmpeg filter af_loudnorm, about which he wrote some more technical details on his blog a few years back. I’m not going to repeat all that here, if you’re interested in those details and further links please read Kyle’s blog post.

From a very high-level, the filter works by measuring the loudness of the input following the EBU R128 standard with a 3s lookahead, adjusts the gain to reach the target loudness and then applies a true peak limiter with 10ms to prevent any too high peaks to get passed through. Both the target loudness and the maximum peak can be configured via the loudness-target and max-true-peak properties, same as in the FFmpeg filter. Different to the FFmpeg filter I only implemented the “live” mode and not the two-pass mode that is implemented in FFmpeg, which first measures the loudness of the whole stream and then in a second pass adjusts it.

Below I’ll describe the usage of the filter in GStreamer a bit and also some information about the development process, and the porting of the C code to Rust.

Usage

For using the filter you most likely first need to compile it yourself, unless you’re lucky enough that e.g. your Linux distribution includes it already.

Compiling it requires a Rust toolchain and GStreamer 1.8 or newer. The former you can get via rustup for example, if you don’t have it yet, the latter either from your Linux distribution or by using the macOS, Windows, etc binaries that are provided by the GStreamer project. Once that is done, compiling is mostly a matter of running cargo build in the audio/audiofx directory and copying the resulting libgstrsaudiofx.so (or .dll or .dylib) into one of the GStreamer plugin directories, for example ~/.local/share/gstreamer-1.0/plugins.

After that boring part is done, you can use it for example as follows to run loudness normalization on the Sintel trailer:

gst-launch-1.0 playbin \
    uri=https://www.freedesktop.org/software/gstreamer-sdk/data/media/sintel_trailer-480p.webm \
    audio-filter="audioresample ! rsaudioloudnorm ! audioresample ! capsfilter caps=audio/x-raw,rate=48000"

As can be seen above, it is necessary to put audioresample elements around the filter. The reason for that is that the filter currently only works on 192kHz input. This is a simplification for now to make it easier inside the filter to detect true peaks. You would first upsample your audio to 192kHz and then, if needed, later downsample it again to your target sample rate (48kHz in the example above). See the link mentioned before for details about true peaks and why this is generally a good idea to do. In the future the resampling could be implemented internally and maybe optionally the filter could also work with “normal” peak detection on the non-upsampled input.

Apart from that caveat the filter element works like any other GStreamer audio filter and can be placed accordingly in any GStreamer pipeline.

If you run into any problems using the code or it doesn’t work well for your use-case, please create an issue in the GStreamer bugtracker.

The process

As I wrote above, the GStreamer plugin is part of the GStreamer Rust plugins so the first step was to port the FFmpeg C code to Rust. I expected that to be the biggest part of the work, but as writing Rust is simply so much more enjoyable than writing C and I would have to adjust big parts of the code to fit the GStreamer infrastructure anyway, I took this approach nonetheless. The alternative of working based on the C code and writing the plugin in C didn’t seem very appealing to me. In the end, as usual when developing in Rust, this also allowed me to be more confident about the robustness of the result and probably reduced the amount of time spent debugging. Surprisingly, the translation was actually not the biggest part of the work, but instead I had to debug a couple of issues that were already present in the original FFmpeg code and find solutions for them. But more on that later.

The first step for porting the code was to get an implementation of the EBU R128 loudness analysis. In FFmpeg they’re using a fork of the libebur128 C library. I checked if there was anything similar for Rust already, maybe even a pure-Rust implementation of it, but couldn’t find anything. As I didn’t want to write one myself or port the code of the libebur128 C library to Rust, I wrote safe Rust bindings for that library instead. The end result of that can be found on crates.io as an independent crate, in case someone else also needs it for other purposes at some point. The crate also includes the code of the C library, making it as easy as possible to build and include into other projects.

The next step was to actually port the FFmpeg C code to Rust. In the end that was a rather straightforward translation fortunately. The latest version of that code can be found here.

The biggest difference to the C code is the usage of Rust iterators and iterator combinators like zip and chunks_exact. In my opinion this makes the code quite a bit easier to read compared to the manual iteration in the C code together with array indexing, and as a side effect it should also make the code run faster in Rust as it allows to get rid of a lot of array bounds checks.

Apart from that, one part that was a bit inconvenient during that translation and still required manual array indexing is the usage of ringbuffers everywhere in the code. For now I wrote those like I would in C and used a few unsafe operations like get_unchecked to avoid redundant bounds checks, but at a later time I might refactor this into a proper ringbuffer abstraction for such audio processing use-cases. It’s not going to be the last time I need such a data structure. A short search on crates.io gave various results for ringbuffers but none of them seem to provide an API that fits the use-case here. Once that’s abstracted away into a nice data structure, I believe the Rust code of this filter is really nice to read and follow.

Now to the less pleasant parts, and also a small warning to all the people asking for Rust rewrites of everything: of course I introduced a couple of new bugs while translating the code although this was a rather straightforward translation and I tried to be very careful. I’m sure there is also still a bug or two left that I didn’t find while debugging. So always keep in mind that rewriting a project will also involve adding new bugs that didn’t exist in the original code. Or maybe you’re just a better programmer than me and don’t make such mistakes.

Debugging these issues that showed up while testing the code was a good opportunity to also add extensive code comments everywhere so I don’t have to remind myself every time again what this block of code is doing exactly, and it’s something I was missing a bit from the FFmpeg code (it doesn’t have a single comment currently). While writing those comments and explaining the code to myself, I found the majority of these bugs that I introduced and as a side-effect I now have documentation for my future self or other readers of the code.

Fixing these issues I introduced myself wasn’t that time-consuming neither in the end fortunately, but while writing those code comments and also while doing more testing on various audio streams, I found a couple of bugs that already existed in the original FFmpeg C code. Further testing also showed that they caused quite audible distortions on various test streams. These are the bugs that unfortunately took most of the time in the whole process, but at least to my knowledge there are no known bugs left in the code now.

For these bugs in the FFmpeg code I also provided a fix that is merged already, and reported the other two in their bug tracker.

The first one I’d be happy to provide a fix for if my approach is considered correct, but the second one I’ll leave for someone else. Porting over my Rust solution for that one will take some time and getting all the array indexing involved correct in C would require some serious focusing, for which I currently don’t have the time.

Or maybe my solutions to these problems are actually wrong, or my understanding of the original code was wrong and I actually introduced them in my translation, which also would be useful to know.

Overall, while porting the C code to Rust introduced a few new problems that had to be fixed, I would definitely do this again for similar projects in the future. It’s more fun to write and in my opinion the resulting code is easier readable, and better to maintain and extend.

The GTK Rust bindings are not ready yet? Yes they are!

When talking to various people at conferences in the last year or at conferences, a recurring topic was that they believed that the GTK Rust bindings are not ready for use yet.

I don’t know where that perception comes from but if it was true, there wouldn’t have been applications like Fractal, Podcasts or Shortwave using GTK from Rust, or I wouldn’t be able to do a workshop about desktop application development in Rust with GTK and GStreamer at the Linux Application Summit in Barcelona this Friday (code can be found here already) or earlier this year at GUADEC.

One reason I sometimes hear is that there is not support for creating subclasses of GTK types in Rust yet. While that was true, it is not true anymore nowadays. But even more important: unless you want to create your own special widgets, you don’t need that. Many examples and tutorials in other languages make use of inheritance/subclassing for the applications’ architecture, but that’s because it is the idiomatic pattern in those languages. However, in Rust other patterns are more idiomatic and even for those examples and tutorials in other languages it wouldn’t be the one and only option to design applications.

Almost everything is included in the bindings at this point, so seriously consider writing your next GTK UI application in Rust. While some minor features are still missing from the bindings, none of those should prevent you from successfully writing your application.

And if something is actually missing for your use-case or something is not working as expected, please let us know. We’d be happy to make your life easier!

P.S.

Some people are already experimenting with new UI development patterns on top of the GTK Rust bindings. So if you want to try developing an UI application but want to try something different than the usual signal/callback spaghetti code, also take a look at those.

GStreamer Rust bindings 0.12 and GStreamer Plugin 0.3 release

After almost 6 months, a new release of the GStreamer Rust bindings and the GStreamer plugin writing infrastructure for Rust is out. As usual this was coinciding with the release of all the gtk-rs crates to make use of all the new features they contain.

Thanks to all the contributors of both gtk-rs and the GStreamer bindings for all the nice changes that happened over the last 6 months!

And as usual, if you find any bugs please report them and if you have any questions let me know.

GStreamer Bindings

For the full changelog check here.

Most changes this time were internally, especially because many user-facing changes (like Debug impls for various types) were already backported to the minor releases in the 0.11 release series.

WebRTC

The biggest change this time is probably the inclusion of bindings for the GStreamer WebRTC library.

This allows using building all kinds of WebRTC applications outside the browser (or providing a WebRTC implementation for a browser), and while not as full-featured as Google’s own implementation, this interoperates well with the various browsers and generally works much better on embedded devices.

A small example application in Rust is available here.

Serde

Optionally, serde trait implementations for the Serialize and Deserialize trait can be enabled for various fundamental GStreamer types, including caps, buffers, events, messages and tag lists. This allows serializing them into any format that can be handled by serde (which are many!), and deserializing them back to normal Rust structs.

Generic Tag API

Previously only a strongly-typed tag API was exposed that made it impossible to use the wrong data type for a specific tag, e.g. code that tries to store a string for the track number or an integer for the title would simply not compile:

let mut tags = gst::TagList::new();
{
    let tags = tags.get_mut().unwrap();
    tags.add::<Title>(&"some title", gst::TagMergeMode::Append);
    tags.add::<TrackNumber>(&12, gst::TagMergeMode::Append);
}

While this is convenient, it made it rather complicated to work with tag lists if you only wanted to handle them in a generic way. For example by iterating over the tag list and simply checking what kind of tags are available. To solve that, a new generic API was added in addition. This works on glib::Values, which can store any kind of type, and using the wrong type for a specific tag would simply cause an error at runtime instead of compile-time.

let mut tags = gst::TagList::new();
{
    let tags = tags.get_mut().unwrap();
    tags.add_generic(&gst::tags::TAG_TITLE, &"some title", gst::TagMergeMode::Append)
.expect("wrong type for title tag");
    tags.add_generic(&gst::tags::TAG_TRACK_NUMBER, &12, gst::TagMergeMode::Append)
.expect("wrong type for track number tag");
}

This also greatly simplified the serde serialization/deserialization for tag lists.

GStreamer Plugins

For the full changelog check here.

gobject-subclass

The main change this time is that all the generic GObject subclassing infrastructure was moved out of the gst-plugin crate and moved to its own gobject-subclass crate as part of the gtk-rs organization.

As part of this, some major refactoring has happened that allows subclassing more different types but also makes it simpler to add new types. There are also experimental crates for adding some subclassing support to gio and gtk, and a PR for autogenerating part of the code via the gir code generator.

More classes!

The other big addition this time is that it’s now possible to subclass GStreamer Pads and GhostPads, to implement the ChildProxy interface and to subclass the Aggregator and AggregatorPad class.

This now allows to write custom mixer/muxer-style elements (or generally elements that have multiple sink pads) in Rust via the Aggregator base class, and to have custom pad types for elements to allow for setting custom properties on the pads (e.g. to control the opacity of a single video mixer input).

There is currently no example for such an element, but I’ll add a very simple video mixer to the repository some time in the next weeks and will also write a blog post about it for explaining all the steps.

Improving GStreamer performance on a high number of network streams by sharing threads between elements with Rust’s tokio crate

For one of our customers at Centricular we were working on a quite interesting project. Their use-case was basically to receive an as-high-as-possible number of audio RTP streams over UDP, transcode them, and then send them out via UDP again. Due to how GStreamer usually works, they were running into some performance issues.

This blog post will describe the first set of improvements that were implemented for this use-case, together with a minimal benchmark and the results. My colleague Mathieu will follow up with one or two other blog posts with the other improvements and a more full-featured benchmark.

The short version is that CPU usage decreased by about 65-75%, i.e. allowing 3-4x more streams with the same CPU usage. Also parallelization works better and usage of different CPU cores is more controllable, allowing for better scalability. And a fixed, but configurable number of threads is used, which is independent of the number of streams.

The code for this blog post can be found here.

Table of Contents

  1. GStreamer & Threads
  2. Thread-Sharing GStreamer Elements
  3. Available Elements
  4. Little Benchmark
  5. Conclusion

GStreamer & Threads

In GStreamer, by default each source is running from its own OS thread. Additionally, for receiving/sending RTP, there will be another thread in the RTP jitterbuffer, yet another thread for receiving RTCP (another source) and a last thread for sending RTCP at the right times. And RTCP has to be received and sent for the receiver and sender side part of the pipeline, so the number of threads doubles. In the sum this gives at least 1 + 1 + (1 + 1) * 2 = 6 threads per RTP stream in this scenario. In a normal audio scenario, there will be one packet received/sent e.g. every 20ms on each stream, and every now and then an RTCP packet. So most of the time all these threads are only waiting.

Apart from the obvious waste of OS resources (1000 streams would be 6000 threads), this also brings down performance as all the time threads are being woken up. This means that context switches have to happen basically all the time.

To solve this we implemented a mechanism to share threads, and in the end as a result we have a fixed, but configurable number of threads that is independent from the number of streams. And can run e.g. 500 streams just fine on a single thread with a single core, which was completely impossible before. In addition we also did some work to reduce the number of allocations for each packet, so that after startup no additional allocations happen per packet anymore for buffers. See Mathieu’s upcoming blog post for details.

In this blog post, I’m going to write about a generic mechanism for sources, queues and similar elements to share their threads between each other. For the RTP related bits (RTP jitterbuffer and RTCP timer) this was not used due to reuse of existing C codebases.

Thread-Sharing GStreamer Elements

The code in question can be found here, a small benchmark is in the examples directory and it is going to be used for the results later. A full-featured benchmark will come in Mathieu’s blog post.

This is a new GStreamer plugin, written in Rust and around the Tokio crate for asynchronous IO and generally a “task scheduler”.

While this could certainly also have been written in C around something like libuv, doing this kind of work in Rust is simply more productive and fun due to its safety guarantees and the strong type system, which definitely reduced the amount of debugging a lot. And in addition “modern” language features like closures, which make working with futures much more ergonomic.

When using these elements it is important to have full control over the pipeline and its elements, and the dataflow inside the pipeline has to be carefully considered to properly configure how to share threads. For example the following two restrictions should be kept in mind all the time:

  1. Downstream of such an element, the streaming thread must never ever block for considerable amounts of time. Otherwise all other elements inside the same thread-group would be blocked too, even if they could do any work now
  2. This generally all works better in live pipelines, where media is produced in real-time and not as fast as possible

Available Elements

So this repository currently contains the generic infrastructure (see the src/iocontext.rs source file) and a couple of elements:

  • an UDP source: ts-udpsrc, a replacement for udpsrc
  • an app source: ts-appsrc, a replacement for appsrc to inject packets into the pipeline from the application
  • a queue: ts-queue, a replacement for queue that is useful for adding buffering to a pipeline part. The upstream side of the queue will block if not called from another thread-sharing element, but if called from another thread-sharing element it will pause the current task asynchronously. That is, stop the upstream task from producing more data.
  • a proxysink/src element: ts-proxysrc, ts-proxysink, replacements for proxysink/proxysrc for connecting two pipelines with each other. This basically works like the queue, but split into two elements.
  • a tone generator source around spandsp: ts-tonesrc, a replacement for tonegeneratesrc. This also contains some minimal FFI bindings for that part of the spandsp C library.

All these elements have more or less the same API as their non-thread-sharing counterparts.

API-wise, each of these elements has a set of properties for controlling how it is sharing threads with other elements, and with which elements:

  • context: A string that defines in which group this element is. All elements with the same context are running on the same thread or group of threads,
  • context-threads: Number of threads to use in this context. -1 means exactly one thread, 1 and above used N+1 threads (1 thread for polling fds, N worker threads) and 0 sets N to the number of available CPU cores. As long as no considerable work is done in these threads, -1 has shown to be the most efficient. See also this tokio GitHub issue
  • context-wait: Number of milliseconds that the threads will wait on each iteration. This allows to reduce CPU usage even further by handling all events/packets that arrived during that timespan to be handled all at once instead of waking up the thread every time a little event happens, thus reducing context switches again

The elements are all pushing data downstream from a tokio thread whenever data is available, assuming that downstream does not block. If downstream is another thread-sharing element and it would have to block (e.g. a full queue), it instead returns a new future to upstream so that upstream can asynchronously wait on that future before producing more output. By this, back-pressure is implemented between different GStreamer elements without ever blocking any of the tokio threads. All this is implemented around the normal GStreamer data-flow mechanisms, there is no “tokio fast-path” between elements.

Little Benchmark

As mentioned above, there’s a small benchmark application in the examples directory. This basically sets up a configurable number of streams and directly connects them to a fakesink, throwing away all packets. Additionally there is another thread that is sending all these packets. As such, this is really the most basic benchmark and not very realistic but nonetheless it shows the same performance improvement as the real application. Again, see Mathieu’s upcoming blog post for a more realistic and complete benchmark.

When running it, make sure that your user can create enough fds. The benchmark will just abort if not enough fds can be allocated. You can control this with ulimit -n SOME_NUMBER, and allowing a couple of thousands is generally a good idea. The benchmarks below were running with 10000.

After running cargo build –release to build the plugin itself, you can run the benchmark with:

cargo run --release --example udpsrc-benchmark -- 1000 ts-udpsrc -1 1 20

and in another shell the UDP sender with

cargo run --release --example udpsrc-benchmark-sender -- 1000

This runs 1000 streams, uses ts-udpsrc (alternative would be udpsrc), configures exactly one thread -1, 1 context, and a wait time of 20ms. See above for what these settings mean. You can check CPU usage with e.g. top. Testing was done on an Intel i7-4790K, with Rust 1.25 and GStreamer 1.14. One packet is sent every 20ms for each stream.

Source Streams Threads Contexts Wait CPU
udpsrc 1000 1000 x x 44%
ts-udpsrc 1000 -1 1 0 18%
ts-udpsrc 1000 -1 1 20 13%
ts-udpsrc 1000 -1 2 20 15%
ts-udpsrc 1000 2 1 20 16%
ts-udpsrc 1000 2 2 20 27%
Source Streams Threads Contexts Wait CPU
udpsrc 2000 2000 x x 95%
ts-udpsrc 2000 -1 1 20 29%
ts-udpsrc 2000 -1 2 20 31%
Source Streams Threads Contexts Wait CPU
ts-udpsrc 3000 -1 1 20 36%
ts-udpsrc 3000 -1 2 20 47%

Results for 3000 streams for the old udpsrc are not included as starting up that many threads needs too long.

The best configuration is apparently a single thread per context (see this tokio GitHub issue) and waiting 20ms for every iterations. Compared to the old udpsrc, CPU usage is about one third in that setting, and generally it seems to parallelize well. It’s not clear to me why the last test has 11% more CPU with two contexts, while in every other test the number of contexts does not really make a difference, and also not for that many streams in the real test-case.

The waiting does not reduce CPU usage a lot in this benchmark, but on the real test-case it does. The reason is most likely that this benchmark basically sends all packets at once, then waits for the remaining time, then sends the next packets.

Take these numbers with caution, the real test-case in Mathieu’s blog post will show the improvements in the bigger picture, where it was generally a quarter of CPU usage and almost perfect parallelization when increasing the number of contexts.

Conclusion

Generally this was a fun exercise and we’re quite happy with the results, especially the real results. It took me some time to understand how tokio works internally so that I can implement all kinds of customizations on top of it, but for normal usage of tokio that should not be required and the overall design makes a lot of sense to me, as well as the way how futures are implemented in Rust. It requires some learning and understanding how exactly the API can be used and behaves, but once that point is reached it seems like a very productive and performant solution for asynchronous IO. And modelling asynchronous IO problems based on the Rust-style futures seems a nice and intuitive fit.

The performance measurements also showed that GStreamer’s default usage of threads is not always optimal, and a model like in upipe or pipewire (or rather SPA) can provide better performance. But as this also shows, it is possible to implement something like this on top of GStreamer and for the common case, using threads like in GStreamer reduces the cognitive load on the developer a lot.

For a future version of GStreamer, I don’t think we should make the threading “manual” like in these two other projects, but instead provide some API additions that make it nicer to implement thread-sharing elements and to add ways in the GStreamer core to make streaming threads non-blocking. All this can be implemented already, but it could be nicer.

All this “only” improved the number of threads, and thus the threading and context switching overhead. Many other optimizations in other areas are still possible on top of this, for example optimizing receive performance and reducing the number of memory copies inside the pipeline even further. If that’s something you would be interested in, feel free to get in touch.

And with that: Read Mathieu’s upcoming blog posts about the other parts, RTP jitterbuffer / RTCP timer thread sharing, and no allocations, and the full benchmark.

GStreamer Rust bindings 0.11 / plugin writing infrastructure 0.2 release

Following the GStreamer 1.14 release and the new round of gtk-rs releases, there are also new releases for the GStreamer Rust bindings (0.11) and the plugin writing infrastructure (0.2).

Thanks also to all the contributors for making these releases happen and adding lots of valuable changes and API additions.

GStreamer Rust Bindings

The main changes in the Rust bindings were the update to GStreamer 1.14 (which brings in quite some new API, like GstPromise), a couple of API additions (GstBufferPool specifically) and the addition of the GstRtspServer and GstPbutils crates. The former allows writing a full RTSP server in a couple of lines of code (with lots of potential for customizations), the latter provides access to the GstDiscoverer helper object that allows inspecting files and streams for their container format, codecs, tags and all kinds of other metadata.

The GstPbutils crate will also get other features added in the near future, like encoding profile bindings to allow using the encodebin GStreamer element (a helper element for automatically selecting/configuring encoders and muxers) from Rust.

But the biggest changes in my opinion is some refactoring that was done to the Event, Message and Query APIs. Previously you would have to use a view on a newly created query to be able to use the type-specific functions on it

let mut q = gst::Query::new_position(gst::Format::Time);
if pipeline.query(q.get_mut().unwrap()) {
    match q.view() {
        QueryView::Position(ref p) => Some(p.get_result()),
        _ => None,
    }
} else {
    None
}

Now you can directly use the type-specific functions on a newly created query

let mut q = gst::Query::new_position(gst::Format::Time);
if pipeline.query(&mut q) {
    Some(q.get_result())
} else {
    None
}

In addition, the views can now dereference directly to the event/message/query itself and provide access to their API, which simplifies some code even more.

Plugin Writing Infrastructure

While the plugin writing infrastructure did not see that many changes apart from a couple of bugfixes and updating to the new versions of everything else, this does not mean that development on it stalled. Quite the opposite. The existing code works very well already and there was just no need for adding anything new for the projects I and others did on top of it, most of the required API additions were in the GStreamer bindings.

So the status here is the same as last time, get started writing GStreamer plugins in Rust. It works well!

How to write GStreamer Elements in Rust Part 2: A raw audio sine wave source

A bit later than anticipated, this is now part two of the blog post series about writing GStreamer elements in Rust. Part one can be found here, and I’ll assume that everything written there is known already.

In this part, a raw audio sine wave source element is going to be written. It will be similar to the one Mathieu was writing in his blog post about writing such a GStreamer element in Python. Various details will be different though, but more about that later.

The final code can be found here.

Table of Contents

  1. Boilerplate
  2. Caps Negotiation
  3. Query Handling
  4. Buffer Creation
  5. (Pseudo) Live Mode
  6. Unlocking
  7. Seeking

Boilerplate

The first part here will be all the boilerplate required to set up the element. You can safely skip this if you remember all this from the previous blog post.

Our sine wave element is going to produce raw audio, with a number of channels and any possible sample rate with both 32 bit and 64 bit floating point samples. It will produce a simple sine wave with a configurable frequency, volume/mute and number of samples per audio buffer. In addition it will be possible to configure the element in (pseudo) live mode, meaning that it will only produce data in real-time according to the pipeline clock. And it will be possible to seek to any time/sample position on our source element. It will basically be a more simply version of the audiotestsrc element from gst-plugins-base.

So let’s get started with all the boilerplate. This time our element will be based on the BaseSrc base class instead of BaseTransform.

use glib;
use gst;
use gst::prelude::*;
use gst_base::prelude::*;
use gst_audio;

use byte_slice_cast::*;

use gst_plugin::properties::*;
use gst_plugin::object::*;
use gst_plugin::element::*;
use gst_plugin::base_src::*;

use std::{i32, u32};
use std::sync::Mutex;
use std::ops::Rem;

use num_traits::float::Float;
use num_traits::cast::NumCast;

// Default values of properties
const DEFAULT_SAMPLES_PER_BUFFER: u32 = 1024;
const DEFAULT_FREQ: u32 = 440;
const DEFAULT_VOLUME: f64 = 0.8;
const DEFAULT_MUTE: bool = false;
const DEFAULT_IS_LIVE: bool = false;

// Property value storage
#[derive(Debug, Clone, Copy)]
struct Settings {
    samples_per_buffer: u32,
    freq: u32,
    volume: f64,
    mute: bool,
    is_live: bool,
}

impl Default for Settings {
    fn default() -> Self {
        Settings {
            samples_per_buffer: DEFAULT_SAMPLES_PER_BUFFER,
            freq: DEFAULT_FREQ,
            volume: DEFAULT_VOLUME,
            mute: DEFAULT_MUTE,
            is_live: DEFAULT_IS_LIVE,
        }
    }
}

// Metadata for the properties
static PROPERTIES: [Property; 5] = [
    Property::UInt(
        "samples-per-buffer",
        "Samples Per Buffer",
        "Number of samples per output buffer",
        (1, u32::MAX),
        DEFAULT_SAMPLES_PER_BUFFER,
        PropertyMutability::ReadWrite,
    ),
    Property::UInt(
        "freq",
        "Frequency",
        "Frequency",
        (1, u32::MAX),
        DEFAULT_FREQ,
        PropertyMutability::ReadWrite,
    ),
    Property::Double(
        "volume",
        "Volume",
        "Output volume",
        (0.0, 10.0),
        DEFAULT_VOLUME,
        PropertyMutability::ReadWrite,
    ),
    Property::Boolean(
        "mute",
        "Mute",
        "Mute",
        DEFAULT_MUTE,
        PropertyMutability::ReadWrite,
    ),
    Property::Boolean(
        "is-live",
        "Is Live",
        "(Pseudo) live output",
        DEFAULT_IS_LIVE,
        PropertyMutability::ReadWrite,
    ),
];

// Stream-specific state, i.e. audio format configuration
// and sample offset
struct State {
    info: Option<gst_audio::AudioInfo>,
    sample_offset: u64,
    sample_stop: Option<u64>,
    accumulator: f64,
}

impl Default for State {
    fn default() -> State {
        State {
            info: None,
            sample_offset: 0,
            sample_stop: None,
            accumulator: 0.0,
        }
    }
}

// Struct containing all the element data
struct SineSrc {
    cat: gst::DebugCategory,
    settings: Mutex<Settings>,
    state: Mutex<State>,
}

impl SineSrc {
    // Called when a new instance is to be created
    fn new(element: &BaseSrc) -> Box<BaseSrcImpl<BaseSrc>> {
        // Initialize live-ness and notify the base class that
        // we'd like to operate in Time format
        element.set_live(DEFAULT_IS_LIVE);
        element.set_format(gst::Format::Time);

        Box::new(Self {
            cat: gst::DebugCategory::new(
                "rssinesrc",
                gst::DebugColorFlags::empty(),
                "Rust Sine Wave Source",
            ),
            settings: Mutex::new(Default::default()),
            state: Mutex::new(Default::default()),
        })
    }

    // Called exactly once when registering the type. Used for
    // setting up metadata for all instances, e.g. the name and
    // classification and the pad templates with their caps.
    //
    // Actual instances can create pads based on those pad templates
    // with a subset of the caps given here. In case of basesrc,
    // a "src" and "sink" pad template are required here and the base class
    // will automatically instantiate pads for them.
    //
    // Our element here can output f32 and f64
    fn class_init(klass: &mut BaseSrcClass) {
        klass.set_metadata(
            "Sine Wave Source",
            "Source/Audio",
            "Creates a sine wave",
            "Sebastian Dröge <sebastian@centricular.com>",
        );

        // On the src pad, we can produce F32/F64 with any sample rate
        // and any number of channels
        let caps = gst::Caps::new_simple(
            "audio/x-raw",
            &[
                (
                    "format",
                    &gst::List::new(&[
                        &gst_audio::AUDIO_FORMAT_F32.to_string(),
                        &gst_audio::AUDIO_FORMAT_F64.to_string(),
                    ]),
                ),
                ("layout", &"interleaved"),
                ("rate", &gst::IntRange::<i32>::new(1, i32::MAX)),
                ("channels", &gst::IntRange::<i32>::new(1, i32::MAX)),
            ],
        );
        // The src pad template must be named "src" for basesrc
        // and specific a pad that is always there
        let src_pad_template = gst::PadTemplate::new(
            "src",
            gst::PadDirection::Src,
            gst::PadPresence::Always,
            &caps,
        );
        klass.add_pad_template(src_pad_template);

        // Install all our properties
        klass.install_properties(&PROPERTIES);
    }
}

impl ObjectImpl<BaseSrc> for SineSrc {
    // Called whenever a value of a property is changed. It can be called
    // at any time from any thread.
    fn set_property(&self, obj: &glib::Object, id: u32, value: &glib::Value) {
        let prop = &PROPERTIES[id as usize];
        let element = obj.clone().downcast::<BaseSrc>().unwrap();

        match *prop {
            Property::UInt("samples-per-buffer", ..) => {
                let mut settings = self.settings.lock().unwrap();
                let samples_per_buffer = value.get().unwrap();
                gst_info!(
                    self.cat,
                    obj: &element,
                    "Changing samples-per-buffer from {} to {}",
                    settings.samples_per_buffer,
                    samples_per_buffer
                );
                settings.samples_per_buffer = samples_per_buffer;
                drop(settings);

                let _ =
                    element.post_message(&gst::Message::new_latency().src(Some(&element)).build());
            }
            Property::UInt("freq", ..) => {
                let mut settings = self.settings.lock().unwrap();
                let freq = value.get().unwrap();
                gst_info!(
                    self.cat,
                    obj: &element,
                    "Changing freq from {} to {}",
                    settings.freq,
                    freq
                );
                settings.freq = freq;
            }
            Property::Double("volume", ..) => {
                let mut settings = self.settings.lock().unwrap();
                let volume = value.get().unwrap();
                gst_info!(
                    self.cat,
                    obj: &element,
                    "Changing volume from {} to {}",
                    settings.volume,
                    volume
                );
                settings.volume = volume;
            }
            Property::Boolean("mute", ..) => {
                let mut settings = self.settings.lock().unwrap();
                let mute = value.get().unwrap();
                gst_info!(
                    self.cat,
                    obj: &element,
                    "Changing mute from {} to {}",
                    settings.mute,
                    mute
                );
                settings.mute = mute;
            }
            Property::Boolean("is-live", ..) => {
                let mut settings = self.settings.lock().unwrap();
                let is_live = value.get().unwrap();
                gst_info!(
                    self.cat,
                    obj: &element,
                    "Changing is-live from {} to {}",
                    settings.is_live,
                    is_live
                );
                settings.is_live = is_live;
            }
            _ => unimplemented!(),
        }
    }

    // Called whenever a value of a property is read. It can be called
    // at any time from any thread.
    fn get_property(&self, _obj: &glib::Object, id: u32) -> Result<glib::Value, ()> {
        let prop = &PROPERTIES[id as usize];

        match *prop {
            Property::UInt("samples-per-buffer", ..) => {
                let settings = self.settings.lock().unwrap();
                Ok(settings.samples_per_buffer.to_value())
            }
            Property::UInt("freq", ..) => {
                let settings = self.settings.lock().unwrap();
                Ok(settings.freq.to_value())
            }
            Property::Double("volume", ..) => {
                let settings = self.settings.lock().unwrap();
                Ok(settings.volume.to_value())
            }
            Property::Boolean("mute", ..) => {
                let settings = self.settings.lock().unwrap();
                Ok(settings.mute.to_value())
            }
            Property::Boolean("is-live", ..) => {
                let settings = self.settings.lock().unwrap();
                Ok(settings.is_live.to_value())
            }
            _ => unimplemented!(),
        }
    }
}

// Virtual methods of gst::Element. We override none
impl ElementImpl<BaseSrc> for SineSrc { }

impl BaseSrcImpl<BaseSrc> for SineSrc {
    // Called when starting, so we can initialize all stream-related state to its defaults
    fn start(&self, element: &BaseSrc) -> bool {
        // Reset state
        *self.state.lock().unwrap() = Default::default();

        gst_info!(self.cat, obj: element, "Started");

        true
    }

    // Called when shutting down the element so we can release all stream-related state
    fn stop(&self, element: &BaseSrc) -> bool {
        // Reset state
        *self.state.lock().unwrap() = Default::default();

        gst_info!(self.cat, obj: element, "Stopped");

        true
    }
}

struct SineSrcStatic;

// The basic trait for registering the type: This returns a name for the type and registers the
// instance and class initializations functions with the type system, thus hooking everything
// together.
impl ImplTypeStatic<BaseSrc> for SineSrcStatic {
    fn get_name(&self) -> &str {
        "SineSrc"
    }

    fn new(&self, element: &BaseSrc) -> Box<BaseSrcImpl<BaseSrc>> {
        SineSrc::new(element)
    }

    fn class_init(&self, klass: &mut BaseSrcClass) {
        SineSrc::class_init(klass);
    }
}

// Registers the type for our element, and then registers in GStreamer under
// the name "sinesrc" for being able to instantiate it via e.g.
// gst::ElementFactory::make().
pub fn register(plugin: &gst::Plugin) {
    let type_ = register_type(SineSrcStatic);
    gst::Element::register(plugin, "rssinesrc", 0, type_);
}

If any of this needs explanation, please see the previous blog post and the comments in the code. The explanation for all the structs fields and what they’re good for will follow in the next sections.

With all of the above and a small addition to src/lib.rs this should compile now.

mod sinesrc;
[...]

fn plugin_init(plugin: &gst::Plugin) -> bool {
    [...]
    sinesrc::register(plugin);
    true
}

Also a couple of new crates have to be added to Cargo.toml and src/lib.rs, but you best check the code in the repository for details.

Caps Negotiation

The first part that we have to implement, just like last time, is caps negotiation. We already notified the base class about any caps that we can potentially handle via the caps in the pad template in class_init but there are still two more steps of behaviour left that we have to implement.

First of all, we need to get notified whenever the caps that our source is configured for are changing. This will happen once in the very beginning and then whenever the pipeline topology or state changes and new caps would be more optimal for the new situation. This notification happens via the BaseTransform::set_caps virtual method.

    fn set_caps(&self, element: &BaseSrc, caps: &gst::CapsRef) -> bool {
        use std::f64::consts::PI;

        let info = match gst_audio::AudioInfo::from_caps(caps) {
            None => return false,
            Some(info) => info,
        };

        gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);

        element.set_blocksize(info.bpf() * (*self.settings.lock().unwrap()).samples_per_buffer);

        let settings = *self.settings.lock().unwrap();
        let mut state = self.state.lock().unwrap();

        // If we have no caps yet, any old sample_offset and sample_stop will be
        // in nanoseconds
        let old_rate = match state.info {
            Some(ref info) => info.rate() as u64,
            None => gst::SECOND_VAL,
        };

        // Update sample offset and accumulator based on the previous values and the
        // sample rate change, if any
        let old_sample_offset = state.sample_offset;
        let sample_offset = old_sample_offset
            .mul_div_floor(info.rate() as u64, old_rate)
            .unwrap();

        let old_sample_stop = state.sample_stop;
        let sample_stop =
            old_sample_stop.map(|v| v.mul_div_floor(info.rate() as u64, old_rate).unwrap());

        let accumulator =
            (sample_offset as f64).rem(2.0 * PI * (settings.freq as f64) / (info.rate() as f64));

        *state = State {
            info: Some(info),
            sample_offset: sample_offset,
            sample_stop: sample_stop,
            accumulator: accumulator,
        };

        drop(state);

        let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build());

        true
    }

In here we parse the caps into a AudioInfo and then store that in our internal state, while updating various fields. We tell the base class about the number of bytes each buffer is usually going to hold, and update our current sample position, the stop sample position (when a seek with stop position happens, we need to know when to stop) and our accumulator. This happens by scaling both positions by the old and new sample rate. If we don’t have an old sample rate, we assume nanoseconds (this will make more sense once seeking is implemented). The scaling is done with the help of the muldiv crate, which implements scaling of integer types by a fraction with protection against overflows by doing up to 128 bit integer arithmetic for intermediate values.

The accumulator is the updated based on the current phase of the sine wave at the current sample position.

As a last step we post a new LATENCY message on the bus whenever the sample rate has changed. Our latency (in live mode) is going to be the duration of a single buffer, but more about that later.

BaseSrc is by default already selecting possible caps for us, if there are multiple options. However these defaults might not be (and often are not) ideal and we should override the default behaviour slightly. This is done in the BaseSrc::fixate virtual method.

    fn fixate(&self, element: &BaseSrc, caps: gst::Caps) -> gst::Caps {
        // Fixate the caps. BaseSrc will do some fixation for us, but
        // as we allow any rate between 1 and MAX it would fixate to 1. 1Hz
        // is generally not a useful sample rate.
        //
        // We fixate to the closest integer value to 48kHz that is possible
        // here, and for good measure also decide that the closest value to 1
        // channel is good.
        let mut caps = gst::Caps::truncate(caps);
        {
            let caps = caps.make_mut();
            let s = caps.get_mut_structure(0).unwrap();
            s.fixate_field_nearest_int("rate", 48_000);
            s.fixate_field_nearest_int("channels", 1);
        }

        // Let BaseSrc fixate anything else for us. We could've alternatively have
        // called Caps::fixate() here
        element.parent_fixate(caps)
    }

Here we take the caps that are passed in, truncate them (i.e. remove all but the very first Structure) and then manually fixate the sample rate to the closest value to 48kHz. By default, caps fixation would result in the lowest possible sample rate but this is usually not desired.

For good measure, we also fixate the number of channels to the closest value to 1, but this would already be the default behaviour anyway. And then chain up to the parent class’ implementation of fixate, which for now basically does the same as Caps::fixate(). After this, the caps are fixated, i.e. there is only a single Structure left and all fields have concrete values (no ranges or sets).

Query Handling

As our source element will work by generating a new audio buffer from a specific offset, and especially works in Time format, we want to notify downstream elements that we don’t want to run in Pull mode, only in Push mode. In addition would prefer sequential reading. However we still allow seeking later. For a source that does not know about Time, e.g. a file source, the format would be configured as Bytes. Other values than Time and Bytes generally don’t make any sense.

The main difference here is that otherwise the base class would ask us to produce data for arbitrary Byte offsets, and we would have to produce data for that. While possible in our case, it’s a bit annoying and for other audio sources it’s not easily possible at all.

Downstream elements will try to query this very information from us, so we now have to override the default query handling of BaseSrc and handle the SCHEDULING query differently. Later we will also handle other queries differently.

    fn query(&self, element: &BaseSrc, query: &mut gst::QueryRef) -> bool {
        use gst::QueryView;

        match query.view_mut() {
            // We only work in Push mode. In Pull mode, create() could be called with
            // arbitrary offsets and we would have to produce for that specific offset
            QueryView::Scheduling(ref mut q) => {
                q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0);
                q.add_scheduling_modes(&[gst::PadMode::Push]);
                return true;
            }
            _ => (),
        }
        BaseSrcBase::parent_query(element, query)
    }

To handle the SCHEDULING query specifically, we first have to match on a view (mutable because we want to modify the view) of the query check the type of the query. If it indeed is a scheduling query, we can set the SEQUENTIAL flag and specify that we handle only Push mode, then return true directly as we handled the query already.

In all other cases we fall back to the parent class’ implementation of the query virtual method.

Buffer Creation

Now we have everything in place for a working element, apart from the virtual method to actually generate the raw audio buffers with the sine wave. From a high-level BaseSrc works by calling the create virtual method over and over again to let the subclass produce a buffer until it returns an error or signals the end of the stream.

Let’s first talk about how to generate the sine wave samples themselves. As we want to operate on 32 bit and 64 bit floating point numbers, we implement a generic function for generating samples and storing them in a mutable byte slice. This is done with the help of the num_traits crate, which provides all kinds of useful traits for abstracting over numeric types. In our case we only need the Float and NumCast traits.

Instead of writing a generic implementation with those traits, it would also be possible to do the same with a simple macro that generates a function for both types. Which approach is nicer is a matter of taste in the end, the compiler output should be equivalent for both cases.

    fn process<F: Float + FromByteSlice>(
        data: &mut [u8],
        accumulator_ref: &mut f64,
        freq: u32,
        rate: u32,
        channels: u32,
        vol: f64,
    ) {
        use std::f64::consts::PI;

        // Reinterpret our byte-slice as a slice containing elements of the type
        // we're interested in. GStreamer requires for raw audio that the alignment
        // of memory is correct, so this will never ever fail unless there is an
        // actual bug elsewhere.
        let data = data.as_mut_slice_of::<F>().unwrap();

        // Convert all our parameters to the target type for calculations
        let vol: F = NumCast::from(vol).unwrap();
        let freq = freq as f64;
        let rate = rate as f64;
        let two_pi = 2.0 * PI;

        // We're carrying a accumulator with up to 2pi around instead of working
        // on the sample offset. High sample offsets cause too much inaccuracy when
        // converted to floating point numbers and then iterated over in 1-steps
        let mut accumulator = *accumulator_ref;
        let step = two_pi * freq / rate;

        for chunk in data.chunks_mut(channels as usize) {
            let value = vol * F::sin(NumCast::from(accumulator).unwrap());
            for sample in chunk {
                *sample = value;
            }

            accumulator += step;
            if accumulator >= two_pi {
                accumulator -= two_pi;
            }
        }

        *accumulator_ref = accumulator;
    }

This function takes the mutable byte slice from our buffer as argument, as well as the current value of the accumulator and the relevant settings for generating the sine wave.

As a first step, we “cast” the byte slice to one of the target type (f32 or f64) with the help of the byte_slice_cast crate. This ensures that alignment and sizes are all matching and returns a mutable slice of our target type if successful. In case of GStreamer, the buffer alignment is guaranteed to be big enough for our types here and we allocate the buffer of a correct size later.

Now we convert all the parameters to the types we will use later, and store them together with the current accumulator value in local variables. Then we iterate over the whole floating point number slice in chunks with all channels, and fill each channel with the current value of our sine wave.

The sine wave itself is calculated by val = volume * sin(2 * PI * frequency * (i + accumulator) / rate), but we actually calculate it by simply increasing the accumulator by 2 * PI * frequency / rate for every sample instead of doing the multiplication for each sample. We also make sure that the accumulator always stays between 0 and 2 * PI to prevent any inaccuracies from floating point numbers to affect our produced samples.

Now that this is done, we need to implement the BaseSrc::create virtual method for actually allocating the buffer, setting timestamps and other metadata and it and calling our above function.

    fn create(
        &self,
        element: &BaseSrc,
        _offset: u64,
        _length: u32,
    ) -> Result<gst::Buffer, gst::FlowReturn> {
        // Keep a local copy of the values of all our properties at this very moment. This
        // ensures that the mutex is never locked for long and the application wouldn't
        // have to block until this function returns when getting/setting property values
        let settings = *self.settings.lock().unwrap();

        // Get a locked reference to our state, i.e. the input and output AudioInfo
        let mut state = self.state.lock().unwrap();
        let info = match state.info {
            None => {
                gst_element_error!(element, gst::CoreError::Negotiation, ["Have no caps yet"]);
                return Err(gst::FlowReturn::NotNegotiated);
            }
            Some(ref info) => info.clone(),
        };

        // If a stop position is set (from a seek), only produce samples up to that
        // point but at most samples_per_buffer samples per buffer
        let n_samples = if let Some(sample_stop) = state.sample_stop {
            if sample_stop <= state.sample_offset {
                gst_log!(self.cat, obj: element, "At EOS");
                return Err(gst::FlowReturn::Eos);
            }

            sample_stop - state.sample_offset
        } else {
            settings.samples_per_buffer as u64
        };

        // Allocate a new buffer of the required size, update the metadata with the
        // current timestamp and duration and then fill it according to the current
        // caps
        let mut buffer =
            gst::Buffer::with_size((n_samples as usize) * (info.bpf() as usize)).unwrap();
        {
            let buffer = buffer.get_mut().unwrap();

            // Calculate the current timestamp (PTS) and the next one,
            // and calculate the duration from the difference instead of
            // simply the number of samples to prevent rounding errors
            let pts = state
                .sample_offset
                .mul_div_floor(gst::SECOND_VAL, info.rate() as u64)
                .unwrap()
                .into();
            let next_pts: gst::ClockTime = (state.sample_offset + n_samples)
                .mul_div_floor(gst::SECOND_VAL, info.rate() as u64)
                .unwrap()
                .into();
            buffer.set_pts(pts);
            buffer.set_duration(next_pts - pts);

            // Map the buffer writable and create the actual samples
            let mut map = buffer.map_writable().unwrap();
            let data = map.as_mut_slice();

            if info.format() == gst_audio::AUDIO_FORMAT_F32 {
                Self::process::<f32>(
                    data,
                    &mut state.accumulator,
                    settings.freq,
                    info.rate(),
                    info.channels(),
                    settings.volume,
                );
            } else {
                Self::process::<f64>(
                    data,
                    &mut state.accumulator,
                    settings.freq,
                    info.rate(),
                    info.channels(),
                    settings.volume,
                );
            }
        }
        state.sample_offset += n_samples;
        drop(state);

        gst_debug!(self.cat, obj: element, "Produced buffer {:?}", buffer);

        Ok(buffer)
    }

Just like last time, we start with creating a copy of our properties (settings) and keeping a mutex guard of the internal state around. If the internal state has no AudioInfo yet, we error out. This would mean that no caps were negotiated yet, which is something we can’t handle and is not really possible in our case.

Next we calculate how many samples we have to generate. If a sample stop position was set by a seek event, we have to generate samples up to at most that point. Otherwise we create at most the number of samples per buffer that were set via the property. Then we allocate a buffer of the corresponding size, with the help of the bpf field of the AudioInfo, and then set its metadata and fill the samples.

The metadata that is set is the timestamp (PTS), and the duration. The duration is calculated from the difference of the following buffer’s timestamp and the current buffer’s. By this we ensure that rounding errors are not causing the next buffer’s timestamp to have a different timestamp than the sum of the current’s and its duration. While this would not be much of a problem in GStreamer (inaccurate and jitterish timestamps are handled just fine), we can prevent it here and do so.

Afterwards we call our previously defined function on the writably mapped buffer and fill it with the sample values.

With all this, the element should already work just fine in any GStreamer-based application, for example gst-launch-1.0. Don’t forget to set the GST_PLUGIN_PATH environment variable correctly like last time. Before running this, make sure to turn down the volume of your speakers/headphones a bit.

export GST_PLUGIN_PATH={{EJS37}}/target/debug
gst-launch-1.0 rssinesrc freq=440 volume=0.9 ! audioconvert ! autoaudiosink

You should hear a 440Hz sine wave now.

(Pseudo) Live Mode

Many audio (and video) sources can actually only produce data in real-time and data is produced according to some clock. So far our source element can produce data as fast as downstream is consuming data, but we optionally can change that. We simulate a live source here now by waiting on the pipeline clock, but with a real live source you would only ever be able to have the data in real-time without any need to wait on a clock. And usually that data is produced according to a different clock than the pipeline clock, in which case translation between the two clocks is needed but we ignore this aspect for now. For details check the GStreamer documentation.

For working in live mode, we have to add a few different parts in various places. First of all, we implement waiting on the clock in the create function.

    fn create(...
        [...]
        state.sample_offset += n_samples;
        drop(state);

        // If we're live, we are waiting until the time of the last sample in our buffer has
        // arrived. This is the very reason why we have to report that much latency.
        // A real live-source would of course only allow us to have the data available after
        // that latency, e.g. when capturing from a microphone, and no waiting from our side
        // would be necessary..
        //
        // Waiting happens based on the pipeline clock, which means that a real live source
        // with its own clock would require various translations between the two clocks.
        // This is out of scope for the tutorial though.
        if element.is_live() {
            let clock = match element.get_clock() {
                None => return Ok(buffer),
                Some(clock) => clock,
            };

            let segment = element
                .get_segment()
                .downcast::<gst::format::Time>()
                .unwrap();
            let base_time = element.get_base_time();
            let running_time = segment.to_running_time(buffer.get_pts() + buffer.get_duration());

            // The last sample's clock time is the base time of the element plus the
            // running time of the last sample
            let wait_until = running_time + base_time;
            if wait_until.is_none() {
                return Ok(buffer);
            }

            let id = clock.new_single_shot_id(wait_until).unwrap();

            gst_log!(
                self.cat,
                obj: element,
                "Waiting until {}, now {}",
                wait_until,
                clock.get_time()
            );
            let (res, jitter) = id.wait();
            gst_log!(
                self.cat,
                obj: element,
                "Waited res {:?} jitter {}",
                res,
                jitter
            );
        }

        gst_debug!(self.cat, obj: element, "Produced buffer {:?}", buffer);

        Ok(buffer)
    }

To be able to wait on the clock, we first of all need to calculate the clock time until when we want to wait. In our case that will be the clock time right after the end of the last sample in the buffer we just produced. Simply because you can’t capture a sample before it was produced.

We calculate the running time from the PTS and duration of the buffer with the help of the currently configured segment and then add the base time of the element on this to get the clock time as result. Please check the GStreamer documentation for details, but in short the running time of a pipeline is the time since the start of the pipeline (or the last reset of the running time) and the running time of a buffer can be calculated from its PTS and the segment, which provides the information to translate between the two. The base time is the clock time when the pipeline went to the Playing state, so just an offset.

Next we wait and then return the buffer as before.

Now we also have to tell the base class that we’re running in live mode now. This is done by calling set_live(true) on the base class before changing the element state from Ready to Paused. For this we override the Element::change_state virtual method.

impl ElementImpl<BaseSrc> for SineSrc {
    fn change_state(
        &self,
        element: &BaseSrc,
        transition: gst::StateChange,
    ) -> gst::StateChangeReturn {
        // Configure live'ness once here just before starting the source
        match transition {
            gst::StateChange::ReadyToPaused => {
                element.set_live(self.settings.lock().unwrap().is_live);
            }
            _ => (),
        }

        element.parent_change_state(transition)
    }
}

And as a last step, we also need to notify downstream elements about our latency. Live elements always have to report their latency so that synchronization can work correctly. As the clock time of each buffer is equal to the time when it was created, all buffers would otherwise arrive late in the sinks (they would appear as if they should’ve been played already at the time when they were created). So all the sinks will have to compensate for the latency that it took from capturing to the sink, and they have to do that in a coordinated way (otherwise audio and video would be out of sync if both have different latencies). For this the pipeline is querying each sink for the latency on its own branch, and then configures a global latency on all sinks according to that.

This querying is done with the LATENCY query, which we will now also have to handle.

    fn query(&self, element: &BaseSrc, query: &mut gst::QueryRef) -> bool {
        use gst::QueryView;

        match query.view_mut() {
            // We only work in Push mode. In Pull mode, create() could be called with
            // arbitrary offsets and we would have to produce for that specific offset
            QueryView::Scheduling(ref mut q) => {
                [...]
            }
            // In Live mode we will have a latency equal to the number of samples in each buffer.
            // We can't output samples before they were produced, and the last sample of a buffer
            // is produced that much after the beginning, leading to this latency calculation
            QueryView::Latency(ref mut q) => {
                let settings = *self.settings.lock().unwrap();
                let state = self.state.lock().unwrap();

                if let Some(ref info) = state.info {
                    let latency = gst::SECOND
                        .mul_div_floor(settings.samples_per_buffer as u64, info.rate() as u64)
                        .unwrap();
                    gst_debug!(self.cat, obj: element, "Returning latency {}", latency);
                    q.set(settings.is_live, latency, gst::CLOCK_TIME_NONE);
                    return true;
                } else {
                    return false;
                }
            }
            _ => (),
        }
        BaseSrcBase::parent_query(element, query)
    }

The latency that we report is the duration of a single audio buffer, because we’re simulating a real live source here. A real live source won’t be able to output the buffer before the last sample of it is captured, and the difference between when the first and last sample were captured is exactly the latency that we add here. Other elements further downstream that introduce further latency would then add their own latency on top of this.

Inside the latency query we also signal that we are indeed a live source, and additionally how much buffering we can do (in our case, infinite) until data would be lost. The last part is important if e.g. the video branch has a higher latency, causing the audio sink to have to wait some additional time (so that audio and video stay in sync), which would then require the whole audio branch to buffer some data. As we have an artificial live source, we can always generate data for the next time but a real live source would only have a limited buffer and if no data is read and forwarded once that runs full, data would get lost.

You can test this again with e.g. gst-launch-1.0 by setting the is-live property to true. It should write in the output now that the pipeline is live.

In Mathieu’s blog post this was implemented without explicit waiting and the usage of the get_times virtual method, but as this is only really useful for pseudo live sources like this one I decided to explain how waiting on the clock can be achieved correctly and even more important how that relates to the next section.

Unlocking

With the addition of the live mode, the create function is now blocking and waiting on the clock for some time. This is suboptimal as for example a (flushing) seek would have to wait now until the clock waiting is done, or when shutting down the application would have to wait.

To prevent this, all waiting/blocking in GStreamer streaming threads should be interruptible/cancellable when requested. And for example the ClockID that we got from the clock for waiting can be cancelled by calling unschedule() on it. We only have to do it from the right place and keep it accessible. The right place is the BaseSrc::unlock virtual method.

struct ClockWait {
    clock_id: Option<gst::ClockId>,
    flushing: bool,
}

struct SineSrc {
    cat: gst::DebugCategory,
    settings: Mutex<Settings>,
    state: Mutex<State>,
    clock_wait: Mutex<ClockWait>,
}

[...]

    fn unlock(&self, element: &BaseSrc) -> bool {
        // This should unblock the create() function ASAP, so we
        // just unschedule the clock it here, if any.
        gst_debug!(self.cat, obj: element, "Unlocking");
        let mut clock_wait = self.clock_wait.lock().unwrap();
        if let Some(clock_id) = clock_wait.clock_id.take() {
            clock_id.unschedule();
        }
        clock_wait.flushing = true;

        true
    }

We store the clock ID in our struct, together with a boolean to signal whether we’re supposed to flush already or not. And then inside unlock unschedule the clock ID and set this boolean flag to true.

Once everything is unlocked, we need to reset things again so that data flow can happen in the future. This is done in the unlock_stop virtual method.

    fn unlock_stop(&self, element: &BaseSrc) -> bool {
        // This signals that unlocking is done, so we can reset
        // all values again.
        gst_debug!(self.cat, obj: element, "Unlock stop");
        let mut clock_wait = self.clock_wait.lock().unwrap();
        clock_wait.flushing = false;

        true
    }

To make sure that this struct is always initialized correctly, we also call unlock from stop, and unlock_stop from start.

Now as a last step, we need to actually make use of the new struct we added around the code where we wait for the clock.

            // Store the clock ID in our struct unless we're flushing anyway.
            // This allows to asynchronously cancel the waiting from unlock()
            // so that we immediately stop waiting on e.g. shutdown.
            let mut clock_wait = self.clock_wait.lock().unwrap();
            if clock_wait.flushing {
                gst_debug!(self.cat, obj: element, "Flushing");
                return Err(gst::FlowReturn::Flushing);
            }

            let id = clock.new_single_shot_id(wait_until).unwrap();
            clock_wait.clock_id = Some(id.clone());
            drop(clock_wait);

            gst_log!(
                self.cat,
                obj: element,
                "Waiting until {}, now {}",
                wait_until,
                clock.get_time()
            );
            let (res, jitter) = id.wait();
            gst_log!(
                self.cat,
                obj: element,
                "Waited res {:?} jitter {}",
                res,
                jitter
            );
            self.clock_wait.lock().unwrap().clock_id.take();

            // If the clock ID was unscheduled, unlock() was called
            // and we should return Flushing immediately.
            if res == gst::ClockReturn::Unscheduled {
                gst_debug!(self.cat, obj: element, "Flushing");
                return Err(gst::FlowReturn::Flushing);
            }

The important part in this code is that we first have to check if we are already supposed to unlock, before even starting to wait. Otherwise we would start waiting without anybody ever being able to unlock. Then we need to store the clock id in the struct and make sure to drop the mutex guard so that the unlock function can take it again for unscheduling the clock ID. And once waiting is done, we need to remove the clock id from the struct again and in case of ClockReturn::Unscheduled we directly return FlowReturn::Flushing instead of the error.

Similarly when using other blocking APIs it is important that they are woken up in a similar way when unlock is called. Otherwise the application developer’s and thus user experience will be far from ideal.

Seeking

As a last feature we implement seeking on our source element. In our case that only means that we have to update the sample_offset and sample_stop fields accordingly, other sources might have to do more work than that.

Seeking is implemented in the BaseSrc::do_seek virtual method, and signalling whether we can actually seek in the is_seekable virtual method.

    fn is_seekable(&self, _element: &BaseSrc) -> bool {
        true
    }

    fn do_seek(&self, element: &BaseSrc, segment: &mut gst::Segment) -> bool {
        // Handle seeking here. For Time and Default (sample offset) seeks we can
        // do something and have to update our sample offset and accumulator accordingly.
        //
        // Also we should remember the stop time (so we can stop at that point), and if
        // reverse playback is requested. These values will all be used during buffer creation
        // and for calculating the timestamps, etc.

        if segment.get_rate() < 0.0 {
            gst_error!(self.cat, obj: element, "Reverse playback not supported");
            return false;
        }

        let settings = *self.settings.lock().unwrap();
        let mut state = self.state.lock().unwrap();

        // We store sample_offset and sample_stop in nanoseconds if we
        // don't know any sample rate yet. It will be converted correctly
        // once a sample rate is known.
        let rate = match state.info {
            None => gst::SECOND_VAL,
            Some(ref info) => info.rate() as u64,
        };

        if let Some(segment) = segment.downcast_ref::<gst::format::Time>() {
            use std::f64::consts::PI;

            let sample_offset = segment
                .get_start()
                .unwrap()
                .mul_div_floor(rate, gst::SECOND_VAL)
                .unwrap();

            let sample_stop = segment
                .get_stop()
                .map(|v| v.mul_div_floor(rate, gst::SECOND_VAL).unwrap());

            let accumulator =
                (sample_offset as f64).rem(2.0 * PI * (settings.freq as f64) / (rate as f64));

            gst_debug!(
                self.cat,
                obj: element,
                "Seeked to {}-{:?} (accum: {}) for segment {:?}",
                sample_offset,
                sample_stop,
                accumulator,
                segment
            );

            *state = State {
                info: state.info.clone(),
                sample_offset: sample_offset,
                sample_stop: sample_stop,
                accumulator: accumulator,
            };

            true
        } else if let Some(segment) = segment.downcast_ref::<gst::format::Default>() {
            use std::f64::consts::PI;

            if state.info.is_none() {
                gst_error!(
                    self.cat,
                    obj: element,
                    "Can only seek in Default format if sample rate is known"
                );
                return false;
            }

            let sample_offset = segment.get_start().unwrap();
            let sample_stop = segment.get_stop().0;

            let accumulator =
                (sample_offset as f64).rem(2.0 * PI * (settings.freq as f64) / (rate as f64));

            gst_debug!(
                self.cat,
                obj: element,
                "Seeked to {}-{:?} (accum: {}) for segment {:?}",
                sample_offset,
                sample_stop,
                accumulator,
                segment
            );

            *state = State {
                info: state.info.clone(),
                sample_offset: sample_offset,
                sample_stop: sample_stop,
                accumulator: accumulator,
            };

            true
        } else {
            gst_error!(
                self.cat,
                obj: element,
                "Can't seek in format {:?}",
                segment.get_format()
            );

            false
        }
    }

Currently no support for reverse playback is implemented here, that is left as an exercise for the reader. So as a first step we check if the segment has a negative rate, in which case we just fail and return false.

Afterwards we again take a copy of the settings, keep a mutable mutex guard of our state and then start handling the actual seek.

If no caps are known yet, i.e. the AudioInfo is None, we assume a rate of 1 billion. That is, we just store the time in nanoseconds for now and let the set_caps function take care of that (which we already implemented accordingly) once the sample rate is known.

Then, if a Time seek is performed, we convert the segment start and stop position from time to sample offsets and save them. And then update the accumulator in a similar way as in the set_caps function. If a seek is in Default format (i.e. sample offsets for raw audio), we just have to store the values and update the accumulator but only do so if the sample rate is known already. A sample offset seek does not make any sense until the sample rate is known, so we just fail here to prevent unexpected surprises later.

Try the following pipeline for testing seeking. You should be able to seek the current time drawn over the video, and with the left/right cursor key you can seek. Also this shows that we create a quite nice sine wave.

gst-launch-1.0 rssinesrc ! audioconvert ! monoscope ! timeoverlay ! navseek ! glimagesink

And with that all features are implemented in our sine wave raw audio source.

Speeding up RGB to grayscale conversion in Rust by a factor of 2.2 – and various other multimedia related processing loops

In the previous blog post I wrote about how to write a RGB to grayscale conversion filter for GStreamer in Rust. In this blog post I’m going to write about how to optimize the processing loop of that filter, without resorting to unsafe code or SIMD instructions by staying with plain, safe Rust code.

I also tried to implement the processing loop with faster, a Rust crate for writing safe SIMD code. It looks very promising, but unless I missed something in the documentation it currently is missing some features to be able to express this specific algorithm in a meaningful way. Once it works on stable Rust (waiting for SIMD to be stabilized) and includes runtime CPU feature detection, this could very well be a good replacement for the ORC library used for the same purpose in GStreamer in various places. ORC works by JIT-compiling a minimal “array operation language” to SIMD assembly for your specific CPU (and has support for x86 MMX/SSE, PPC Altivec, ARM NEON, etc.).

If someone wants to prove me wrong and implement this with faster, feel free to do so and I’ll link to your solution and include it in the benchmark results below.

All code below can be found in this GIT repository.

Table of Contents

  1. Baseline Implementation
  2. First Optimization – Assertions
  3. First Optimization – Assertions Try 2
  4. Second Optimization – Iterate a bit more
  5. Third Optimization – Getting rid of the bounds check finally
  6. Summary
  7. Addendum: slice::split_at
  8. Addendum 2: SIMD with faster

Baseline Implementation

This is how the baseline implementation looks like.

pub fn bgrx_to_gray_chunks_no_asserts(
    in_data: &[u8],
    out_data: &mut [u8],
    in_stride: usize,
    out_stride: usize,
    width: usize,
) {
    let in_line_bytes = width * 4;
    let out_line_bytes = width * 4;

    for (in_line, out_line) in in_data
        .chunks(in_stride)
        .zip(out_data.chunks_mut(out_stride))
    {
        for (in_p, out_p) in in_line[..in_line_bytes]
            .chunks(4)
            .zip(out_line[..out_line_bytes].chunks_mut(4))
        {
            let b = u32::from(in_p[0]);
            let g = u32::from(in_p[1]);
            let r = u32::from(in_p[2]);
            let x = u32::from(in_p[3]);

            let grey = ((r * RGB_Y[0]) + (g * RGB_Y[1]) + (b * RGB_Y[2]) + (x * RGB_Y[3])) / 65536;
            let grey = grey as u8;
            out_p[0] = grey;
            out_p[1] = grey;
            out_p[2] = grey;
            out_p[3] = grey;
        }
    }
}

This basically iterates over each line of the input and output frame (outer loop), and then for each BGRx chunk of 4 bytes in each line it converts the values to u32, multiplies with a constant array, converts back to u8 and stores the same value in the whole output BGRx chunk.

Note: This is only doing the actual conversion from linear RGB to grayscale (and in BT.601 colorspace). To do this conversion correctly you need to know your colorspaces and use the correct coefficients for conversion, and also do gamma correction. See this about why it is important.

So what can be improved on this? For starters, let’s write a small benchmark for this so that we know whether any of our changes actually improve something. This is using the (unfortunately still) unstable benchmark feature of Cargo.

#![feature(test)]
#![feature(exact_chunks)]

extern crate test;

pub fn bgrx_to_gray_chunks_no_asserts(...)
    [...]
}

#[cfg(test)]
mod tests {
    use super::*;
    use test::Bencher;
    use std::iter;

    fn create_vec(w: usize, h: usize) -> Vec<u8> {
        iter::repeat(0).take(w * h * 4).collect::<_>()
    }

    #[bench]
    fn bench_chunks_1920x1080_no_asserts(b: &mut Bencher) {
        let i = test::black_box(create_vec(1920, 1080));
        let mut o = test::black_box(create_vec(1920, 1080));

        b.iter(|| bgrx_to_gray_chunks_no_asserts(&i, &mut o, 1920 * 4, 1920 * 4, 1920));
    }
}

This can be run with cargo bench and then prints the amount of nanoseconds each iterator of the closure was taking. To only really measure the processing itself, allocations and initializations of the input/output frame are happening outside of the closure. We’re not interested in times for that.

First Optimization – Assertions

To actually start optimizing this function, let’s take a look at the assembly that the compiler is outputting. The easiest way of doing that is via the Godbolt Compiler Explorer website. Select “rustc nightly” and use “-C opt-level=3” for the compiler flags, and then copy & paste your code in there. Once it compiles, to find the assembly that corresponds to a line, simply right-click on the line and “Scroll to assembly”.

Alternatively you can use cargo rustc –release — -C opt-level=3 –emit asm and check the assembly file that is output in the target/release/deps directory.

What we see then for our inner loop is something like the following

.LBB4_19:
  cmp r15, r11
  mov r13, r11
  cmova r13, r15
  mov rdx, r8
  sub rdx, r13
  je .LBB4_34
  cmp rdx, 3
  jb .LBB4_35
  inc r9
  movzx edx, byte ptr [rbx - 1]
  movzx ecx, byte ptr [rbx - 2]
  movzx esi, byte ptr [rbx]
  imul esi, esi, 19595
  imul edx, edx, 38470
  imul ecx, ecx, 7471
  add ecx, edx
  add ecx, esi
  shr ecx, 16
  mov byte ptr [r10 - 3], cl
  mov byte ptr [r10 - 2], cl
  mov byte ptr [r10 - 1], cl
  mov byte ptr [r10], cl
  add r10, 4
  add r8, -4
  add r15, -4
  add rbx, 4
  cmp r9, r14
  jb .LBB4_19

This is already quite optimized. For each loop iteration the first few instructions are doing some bounds checking and if they fail jump to the .LBB4_34 or .LBB4_35 labels. How to understand that this is bounds checking? Scroll down in the assembly to where these labels are defined and you’ll see something like the following

.LBB4_34:
  lea rdi, [rip + .Lpanic_bounds_check_loc.D]
  xor esi, esi
  xor edx, edx
  call core::panicking::panic_bounds_check@PLT
  ud2
.LBB4_35:
  cmp r15, r11
  cmova r11, r15
  sub r8, r11
  lea rdi, [rip + .Lpanic_bounds_check_loc.F]
  mov esi, 2
  mov rdx, r8
  call core::panicking::panic_bounds_check@PLT
  ud2

Also if you check (with the colors, or the “scroll to source” feature) which Rust code these correspond to, you’ll see that it’s the first and third access to the 4-byte slice that contains our BGRx values.

Afterwards in the assembly, the following steps are happening: 0) incrementing of the “loop counter” representing the number of iterations we’re going to do (r9), 1) actual reading of the B, G and R value and conversion to u32 (the 3 movzx, note that the reading of the x value is optimized away as the compiler sees that it is always multiplied by 0 later), 2) the multiplications with the array elements (the 3 imul), 3) combining of the results and division (i.e. shift) (the 2 add and the shr), 4) storing of the result in the output (the 4 mov). Afterwards the slice pointers are increased by 4 (rbx and r10) and the lengths (used for bounds checking) are decreased by 4 (r8 and r15). Finally there’s a check (cmp) to see if r9 (our loop) counter is at the end of the slice, and if not we jump back to the beginning and operate on the next BGRx chunk.

Generally what we want to do for optimizations is to get rid of unnecessary checks (bounds checking), memory accesses, conditions (cmp, cmov) and jumps (the instructions starting with j). These are all things that are slowing down our code.

So the first thing that seems useful to optimize here is the bounds checking at the beginning. It definitely seems not useful to do two checks instead of one for the two slices (the checks are for the both slices at once but Godbolt does not detect that and believes it’s only the input slice). And ideally we could teach the compiler that no bounds checking is needed at all.

As I wrote in the previous blog post, often this knowledge can be given to the compiler by inserting assertions.

To prevent two checks and just have a single check, you can insert a assert_eq!(in_p.len(), 4) at the beginning of the inner loop and the same for the output slice. Now we only have a single bounds check left per iteration.

As a next step we might want to try to move this knowledge outside the inner loop so that there is no bounds checking at all in there anymore. We might want to add assertions like the following outside the outer loop then to give all knowledge we have to the compiler

assert_eq!(in_data.len() % 4, 0);
assert_eq!(out_data.len() % 4, 0);
assert_eq!(out_data.len() / out_stride, in_data.len() / in_stride);

assert!(in_line_bytes <= in_stride);
assert!(out_line_bytes <= out_stride);

Unfortunately adding those has no effect at all on the inner loop, but having them outside the outer loop for good measure is not the worst idea so let’s just keep them. At least it can be used as some kind of documentation of the invariants of this code for future readers.

So let’s benchmark these two implementations now. The results on my machine are the following

test tests::bench_chunks_1920x1080_no_asserts ... bench:   4,420,145 ns/iter (+/- 139,051)
test tests::bench_chunks_1920x1080_asserts    ... bench:   4,897,046 ns/iter (+/- 166,555)

This is surprising, our version without the assertions is actually faster by a factor of ~1.1 although it had fewer conditions. So let’s take a closer look at the assembly at the top of the loop again, where the bounds checking happens, in the version with assertions

.LBB4_19:
  cmp rbx, r11
  mov r9, r11
  cmova r9, rbx
  mov r14, r12
  sub r14, r9
  lea rax, [r14 - 1]
  mov qword ptr [rbp - 120], rax
  mov qword ptr [rbp - 128], r13
  mov qword ptr [rbp - 136], r10
  cmp r14, 5
  jne .LBB4_33
  inc rcx
  [...]

While this indeed has only one jump as expected for the bounds checking, the number of comparisons is the same and even worse: 3 memory writes to the stack are happening right before the jump. If we follow to the .LBB4_33 label we will see that the assert_eq! macro is going to do something with core::fmt::Debug. This is setting up the information needed for printing the assertion failure, the “expected X equals to Y” output. This is certainly not good and the reason why everything is slower now.

First Optimization – Assertions Try 2

All the additional instructions and memory writes were happening because the assert_eq! macro is outputting something user friendly that actually contains the values of both sides. Let’s try again with the assert! macro instead

test tests::bench_chunks_1920x1080_no_asserts ... bench:   4,420,145 ns/iter (+/- 139,051)
test tests::bench_chunks_1920x1080_asserts    ... bench:   4,897,046 ns/iter (+/- 166,555)
test tests::bench_chunks_1920x1080_asserts_2  ... bench:   3,968,976 ns/iter (+/- 97,084)

This already looks more promising. Compared to our baseline version this gives us a speedup of a factor of 1.12, and compared to the version with assert_eq! 1.23. If we look at the assembly for the bounds checks (everything else stays the same), it also looks more like what we would’ve expected

.LBB4_19:
  cmp rbx, r12
  mov r13, r12
  cmova r13, rbx
  add r13, r14
  jne .LBB4_33
  inc r9
  [...]

One cmp less, only one jump left. And no memory writes anymore!

So keep in mind that assert_eq! is more user-friendly but quite a bit more expensive even in the “good case” compared to assert!.

Second Optimization – Iterate a bit more

This is still not very satisfying though. No bounds checking should be needed at all as each chunk is going to be exactly 4 bytes. We’re just not able to convince the compiler that this is the case. While it may be possible (let me know if you find a way!), let’s try something different. The zip iterator is done when the shortest iterator of both is done, and there are optimizations specifically for zipped slice iterators implemented. Let’s try that and replace the grayscale value calculation with

let grey = in_p.iter()
    .zip(RGB_Y.iter())
    .map(|(i, c)| u32::from(*i) * c)
    .sum::<u32>() / 65536;

If we run that through our benchmark after removing the assert!(in_p.len() == 4) (and the same for the output slice), these are the results

test tests::bench_chunks_1920x1080_asserts_2  ... bench:   3,968,976 ns/iter (+/- 97,084)
test tests::bench_chunks_1920x1080_iter_sum   ... bench:  11,393,600 ns/iter (+/- 347,958)

We’re actually 2.9 times slower! Even when adding back the assert!(in_p.len() == 4) assertion (and the same for the output slice) we’re still slower

test tests::bench_chunks_1920x1080_asserts_2  ... bench:   3,968,976 ns/iter (+/- 97,084)
test tests::bench_chunks_1920x1080_iter_sum   ... bench:  11,393,600 ns/iter (+/- 347,958)
test tests::bench_chunks_1920x1080_iter_sum_2 ... bench:  10,420,442 ns/iter (+/- 242,379)

If we look at the assembly of the assertion-less variant, it’s a complete mess now

.LBB0_19:
  cmp rbx, r13
  mov rcx, r13
  cmova rcx, rbx
  mov rdx, r8
  sub rdx, rcx
  cmp rdx, 4
  mov r11d, 4
  cmovb r11, rdx
  test r11, r11
  je .LBB0_20
  movzx ecx, byte ptr [r15 - 2]
  imul ecx, ecx, 19595
  cmp r11, 1
  jbe .LBB0_22
  movzx esi, byte ptr [r15 - 1]
  imul esi, esi, 38470
  add esi, ecx
  movzx ecx, byte ptr [r15]
  imul ecx, ecx, 7471
  add ecx, esi
  test rdx, rdx
  jne .LBB0_23
  jmp .LBB0_35
.LBB0_20:
  xor ecx, ecx
.LBB0_22:
  test rdx, rdx
  je .LBB0_35
.LBB0_23:
  shr ecx, 16
  mov byte ptr [r10 - 3], cl
  mov byte ptr [r10 - 2], cl
  cmp rdx, 3
  jb .LBB0_36
  inc r9
  mov byte ptr [r10 - 1], cl
  mov byte ptr [r10], cl
  add r10, 4
  add r8, -4
  add rbx, -4
  add r15, 4
  cmp r9, r14
  jb .LBB0_19

In short, there are now various new conditions and jumps for short-circuiting the zip iterator in the various cases. And because of all the noise added, the compiler was not even able to optimize the bounds check for the output slice away anymore (.LBB0_35 cases). While it was able to unroll the iterator (note that the 3 imul multiplications are not interleaved with jumps and are actually 3 multiplications instead of yet another loop), which is quite impressive, it couldn’t do anything meaningful with that information it somehow got (it must’ve understood that each chunk has 4 bytes!). This looks like something going wrong somewhere in the optimizer to me.

If we take a look at the variant with the assertions, things look much better

.LBB3_19:
  cmp r11, r12
  mov r13, r12
  cmova r13, r11
  add r13, r14
  jne .LBB3_33
  inc r9
  movzx ecx, byte ptr [rdx - 2]
  imul r13d, ecx, 19595
  movzx ecx, byte ptr [rdx - 1]
  imul ecx, ecx, 38470
  add ecx, r13d
  movzx ebx, byte ptr [rdx]
  imul ebx, ebx, 7471
  add ebx, ecx
  shr ebx, 16
  mov byte ptr [r10 - 3], bl
  mov byte ptr [r10 - 2], bl
  mov byte ptr [r10 - 1], bl
  mov byte ptr [r10], bl
  add r10, 4
  add r11, -4
  add r14, 4
  add rdx, 4
  cmp r9, r15
  jb .LBB3_19

This is literally the same as the assertion version we had before, except that the reading of the input slice, the multiplications and the additions are happening in iterator order instead of being batched all together. It’s quite impressive that the compiler was able to completely optimize away the zip iterator here, but unfortunately it’s still many times slower than the original version. The reason must be the instruction-reordering. The previous version had all memory reads batched and then the operations batched, which is apparently much better for the internal pipelining of the CPU (it is going to perform the next instructions without dependencies on the previous ones already while waiting for the pending instructions to finish).

It’s also not clear to me why the LLVM optimizer is not able to schedule the instructions the same way here. It apparently has all information it needs for that if no iterator is involved, and both versions are leading to exactly the same assembly except for the order of instructions. This also seems like something fishy.

Nonetheless, we still have our manual bounds check (the assertion) left here and we should really try to get rid of that. No progress so far.

Third Optimization – Getting rid of the bounds check finally

Let’s tackle this from a different angle now. Our problem is apparently that the compiler is not able to understand that each chunk is exactly 4 bytes.

So why don’t we write a new chunks iterator that has always exactly the requested amount of items, instead of potentially less for the very last iteration. And instead of panicking if there are leftover elements, it seems useful to just ignore them. That way we have API that is functionally different from the existing chunks iterator and provides behaviour that is useful in various cases. It’s basically the slice equivalent of the exact_chunks iterator of the ndarray crate.

By having it functionally different from the existing one, and not just an optimization, I also submitted it for inclusion in Rust’s standard library and it’s nowadays available as an unstable feature in nightly. Like all newly added API. Nonetheless, the same can also be implemented inside your code with basically the same effect, there are no dependencies on standard library internals.

So, let’s use our new exact_chunks iterator that is guaranteed (by API) to always give us exactly 4 bytes. In our case this is exactly equivalent to the normal chunks as by construction our slices always have a length that is a multiple of 4, but the compiler can’t infer that information. The resulting code looks as follows

pub fn bgrx_to_gray_exact_chunks(
    in_data: &[u8],
    out_data: &mut [u8],
    in_stride: usize,
    out_stride: usize,
    width: usize,
) {
    assert_eq!(in_data.len() % 4, 0);
    assert_eq!(out_data.len() % 4, 0);
    assert_eq!(out_data.len() / out_stride, in_data.len() / in_stride);

    let in_line_bytes = width * 4;
    let out_line_bytes = width * 4;

    assert!(in_line_bytes <= in_stride);
    assert!(out_line_bytes <= out_stride);

    for (in_line, out_line) in in_data
        .exact_chunks(in_stride)
        .zip(out_data.exact_chunks_mut(out_stride))
    {
        for (in_p, out_p) in in_line[..in_line_bytes]
            .exact_chunks(4)
            .zip(out_line[..out_line_bytes].exact_chunks_mut(4))
        {
            assert!(in_p.len() == 4);
            assert!(out_p.len() == 4);

            let b = u32::from(in_p[0]);
            let g = u32::from(in_p[1]);
            let r = u32::from(in_p[2]);
            let x = u32::from(in_p[3]);

            let grey = ((r * RGB_Y[0]) + (g * RGB_Y[1]) + (b * RGB_Y[2]) + (x * RGB_Y[3])) / 65536;
            let grey = grey as u8;
            out_p[0] = grey;
            out_p[1] = grey;
            out_p[2] = grey;
            out_p[3] = grey;
        }
    }
}

It’s exactly the same as the previous version with assertions, except for using exact_chunks instead of chunks and the same for the mutable iterator. The resulting benchmark of all our variants now looks as follow

test tests::bench_chunks_1920x1080_no_asserts ... bench:   4,420,145 ns/iter (+/- 139,051)
test tests::bench_chunks_1920x1080_asserts    ... bench:   4,897,046 ns/iter (+/- 166,555)
test tests::bench_chunks_1920x1080_asserts_2  ... bench:   3,968,976 ns/iter (+/- 97,084)
test tests::bench_chunks_1920x1080_iter_sum   ... bench:  11,393,600 ns/iter (+/- 347,958)
test tests::bench_chunks_1920x1080_iter_sum_2 ... bench:  10,420,442 ns/iter (+/- 242,379)
test tests::bench_exact_chunks_1920x1080      ... bench:   2,007,459 ns/iter (+/- 112,287)

Compared to our initial version this is a speedup of a factor of 2.2, compared to our version with assertions a factor of 1.98. This seems like a worthwhile improvement, and if we look at the resulting assembly there are no bounds checks at all anymore

.LBB0_10:
  movzx edx, byte ptr [rsi - 2]
  movzx r15d, byte ptr [rsi - 1]
  movzx r12d, byte ptr [rsi]
  imul r13d, edx, 19595
  imul edx, r15d, 38470
  add edx, r13d
  imul ebx, r12d, 7471
  add ebx, edx
  shr ebx, 16
  mov byte ptr [rcx - 3], bl
  mov byte ptr [rcx - 2], bl
  mov byte ptr [rcx - 1], bl
  mov byte ptr [rcx], bl
  add rcx, 4
  add rsi, 4
  dec r10
  jne .LBB0_10

Also due to this the compiler is able to apply some more optimizations and we only have one loop counter for the number of iterations r10 and the two pointers rcx and rsi that are increased/decreased in each iteration. There is no tracking of the remaining slice lengths anymore, as in the assembly of the original version (and the versions with assertions).

Summary

So overall we got a speedup of a factor of 2.2 while still writing very high-level Rust code with iterators and not falling back to unsafe code or using SIMD. The optimizations the Rust compiler is applying are quite impressive and the Rust marketing line of zero-cost abstractions is really visible in reality here.

The same approach should also work for many similar algorithms, and thus many similar multimedia related algorithms where you iterate over slices and operate on fixed-size chunks.

Also the above shows that as a first step it’s better to write clean and understandable high-level Rust code without worrying too much about performance (assume the compiler can optimize well), and only afterwards take a look at the generated assembly and check which instructions should really go away (like bounds checking). In many cases this can be achieved by adding assertions in strategic places, or like in this case by switching to a slightly different abstraction that is closer to the actual requirements (however I believe the compiler should be able to produce the same code with the help of assertions with the normal chunks iterator, but making that possible requires improvements to the LLVM optimizer probably).

And if all does not help, there’s still the escape hatch of unsafe (for using functions like slice::get_unchecked() or going down to raw pointers) and the possibility of using SIMD instructions (by using faster or stdsimd directly). But in the end this should be a last resort for those little parts of your code where optimizations are needed and the compiler can’t be easily convinced to do it for you.

Addendum: slice::split_at

User newpavlov suggested on Reddit to use repeated slice::split_at in a while loop for similar performance.

This would for example like

pub fn bgrx_to_gray_split_at(
    in_data: &[u8],
    out_data: &mut [u8],
    in_stride: usize,
    out_stride: usize,
    width: usize,
) {
    assert_eq!(in_data.len() % 4, 0);
    assert_eq!(out_data.len() % 4, 0);
    assert_eq!(out_data.len() / out_stride, in_data.len() / in_stride);

    let in_line_bytes = width * 4;
    let out_line_bytes = width * 4;

    assert!(in_line_bytes <= in_stride);
    assert!(out_line_bytes <= out_stride);

    for (in_line, out_line) in in_data
        .exact_chunks(in_stride)
        .zip(out_data.exact_chunks_mut(out_stride))
    {
        let mut in_pp: &[u8] = in_line[..in_line_bytes].as_ref();
        let mut out_pp: &mut [u8] = out_line[..out_line_bytes].as_mut();
        assert!(in_pp.len() == out_pp.len());

        while in_pp.len() >= 4 {
            let (in_p, in_tmp) = in_pp.split_at(4);
            let (out_p, out_tmp) = { out_pp }.split_at_mut(4);
            in_pp = in_tmp;
            out_pp = out_tmp;

            let b = u32::from(in_p[0]);
            let g = u32::from(in_p[1]);
            let r = u32::from(in_p[2]);
            let x = u32::from(in_p[3]);

            let grey = ((r * RGB_Y[0]) + (g * RGB_Y[1]) + (b * RGB_Y[2]) + (x * RGB_Y[3])) / 65536;
            let grey = grey as u8;
            out_p[0] = grey;
            out_p[1] = grey;
            out_p[2] = grey;
            out_p[3] = grey;
        }
    }
}

Performance-wise this brings us very close to the exact_chunks version

test tests::bench_exact_chunks_1920x1080      ... bench: 1,965,631 ns/iter (+/- 58,832)
test tests::bench_split_at_1920x1080          ... bench: 2,046,834 ns/iter (+/- 35,990)

and the assembly is also very similar

.LBB0_10:
  add rbx, -4
  movzx r15d, byte ptr [rsi]
  movzx r12d, byte ptr [rsi + 1]
  movzx edx, byte ptr [rsi + 2]
  imul r13d, edx, 19595
  imul r12d, r12d, 38470
  imul edx, r15d, 7471
  add edx, r12d
  add edx, r13d
  shr edx, 16
  movzx edx, dl
  imul edx, edx, 16843009
  mov dword ptr [rcx], edx
  lea rcx, [rcx + 4]
  add rsi, 4
  cmp rbx, 3
  ja .LBB0_10

Here the compiler even optimizes the storing of the value into a single write operation of 4 bytes, at the cost of an additional multiplication and zero-extend register move.

Overall this code performs very well too, but in my opinion it looks rather ugly compared to the versions using the different chunks iterators. Also this is basically what the exact_chunks iterator does internally: repeatedly calling slice::split_at. In theory both versions could lead to the very same assembly, but the LLVM optimizer is currently handling both slightly different.

Addendum 2: SIMD with faster

Adam Niederer, author of faster, provided a PR that implements the same algorithm with faster to make explicit use of SIMD instructions if available.

Due to some codegen issues, this currently has to be compiled with the CPU being selected as nehalem, i.e. by running RUSTFLAGS=”-C target-cpu=nehalem” cargo +nightly bench, but it provides yet another speedup by a factor of up to 1.27x compared to the fastest previous solution and 2.7x compared to the initial solution:

test tests::bench_chunks_1920x1080_asserts                     ... bench:   4,539,286 ns/iter (+/- 106,265)
test tests::bench_chunks_1920x1080_asserts_2                   ... bench:   3,550,683 ns/iter (+/- 96,917)
test tests::bench_chunks_1920x1080_iter_sum                    ... bench:   5,233,238 ns/iter (+/- 114,671)
test tests::bench_chunks_1920x1080_iter_sum_2                  ... bench:   3,532,059 ns/iter (+/- 94,964)
test tests::bench_chunks_1920x1080_no_asserts                  ... bench:   4,468,269 ns/iter (+/- 89,329)
test tests::bench_chunks_1920x1080_no_asserts_faster           ... bench:   2,476,077 ns/iter (+/- 54,877)
test tests::bench_chunks_1920x1080_no_asserts_faster_unstrided ... bench:   1,642,980 ns/iter (+/- 108,034)
test tests::bench_exact_chunks_1920x1080                       ... bench:   2,078,950 ns/iter (+/- 64,536)
test tests::bench_split_at_1920x1080                           ... bench:   2,096,603 ns/iter (+/- 107,420)

The code in question is very similar to what you would’ve written with ORC, especially the unstrided version. You basically operate on multiple elements at once, doing the same operation on each, but both versions do this in a slightly different way.

pub fn bgrx_to_gray_chunks_no_asserts_faster_unstrided(in_data: &[u8], out_data: &mut [u8]) {
    // Relies on vector width which is a multiple of 4
    assert!(u8s::WIDTH % 4 == 0 && u32s::WIDTH % 4 == 0);

    const RGB_Y: [u32; 16] = [19595, 38470, 7471, 0, 19595, 38470, 7471, 0, 19595, 38470, 7471, 0, 19595, 38470, 7471, 0];
    let rgbvec = u32s::load(&RGB_Y, 0);
    in_data.simd_iter(u8s(0)).simd_map(|v| {
        let (a, b) = v.upcast();
        let (a32, b32) = a.upcast();
        let (c32, d32) = b.upcast();

        let grey32a = a32 * rgbvec / u32s(65536);
        let grey32b = b32 * rgbvec / u32s(65536);
        let grey32c = c32 * rgbvec / u32s(65536);
        let grey32d = d32 * rgbvec / u32s(65536);

        let grey16a = grey32a.saturating_downcast(grey32b);
        let grey16b = grey32c.saturating_downcast(grey32d);

        let grey = grey16a.saturating_downcast(grey16b);
        grey
    }).scalar_fill(out_data);
}

pub fn bgrx_to_gray_chunks_no_asserts_faster(in_data: &[u8], out_data: &mut [u8]) {
    // Sane, but slowed down by faster's current striding implementation.
    in_data.stride_four(tuplify!(4, u8s(0))).zip().simd_map(|(r, g, b, _)| {
        let (r16a, r16b) = r.upcast();
        let (r32a, r32b) = r16a.upcast();
        let (r32c, r32d) = r16b.upcast();

        let (g16a, g16b) = g.upcast();
        let (g32a, g32b) = g16a.upcast();
        let (g32c, g32d) = g16b.upcast();

        let (b16a, b16b) = b.upcast();
        let (b32a, b32b) = b16a.upcast();
        let (b32c, b32d) = b16b.upcast();

        let grey32a = (r32a * u32s(19595) + g32a * u32s(38470) + b32a * u32s(7471)) / u32s(65536);
        let grey32b = (r32b * u32s(19595) + g32b * u32s(38470) + b32b * u32s(7471)) / u32s(65536);
        let grey32c = (r32c * u32s(19595) + g32c * u32s(38470) + b32c * u32s(7471)) / u32s(65536);
        let grey32d = (r32d * u32s(19595) + g32d * u32s(38470) + b32d * u32s(7471)) / u32s(65536);

        let grey16a = grey32a.saturating_downcast(grey32b);
        let grey16b = grey32c.saturating_downcast(grey32d);

        let grey = grey16a.saturating_downcast(grey16b);
        grey
    }).scalar_fill(out_data);
}