Porting EBU R128 audio loudness analysis from C to Rust – Porting Details

This blog post is part two of a four part series

  1. Overview, summary and motivation
  2. Porting approach with various details, examples and problems I ran into along the way
  3. Performance optimizations
  4. Building Rust code into a C library as drop-in replacement

In this part I’ll go through the actual porting process of the libebur128 C code to Rust, the approach I’ve chosen with various examples and a few problems I was running into.

It will be rather technical. I won’t explain details about how the C code works but will only focus on the aspects that are relevant for porting to Rust, otherwise this blog post would become even longer than it already is.

Porting

With the warnings out of the way, let’s get started. As a reminder, the code can be found on GitHub and you can also follow along the actual chronological porting process by going through the git history there. It’s not very different to what will follow here but just in case you prefer looking at diffs instead.

Approach

The approach I’ve taken is basically the same that Federico took for librsvg or Joe Neeman’s took for nnnoiseless:

  1. Start with the C code and safe Rust bindings around the C API
  2. Look for a function or component very low in the call graph without dependencies on (too much) other C code
  3. Rewrite that code in Rust and add an internal C API for it
  4. Call the new internal C API for that new Rust code from the C code and get rid of the C implementation of the component
  5. Make sure the tests are still passing
  6. Go to 2. and repeat

Compared to what I did when porting the FFmpeg loudness normalization filter this has the advantage that at every step there is a working version of the code and you don’t only notice at the very end that somewhere along the way you made a mistake. At each step you can validate that what you did was correct and the amount of code to debug if something went wrong is limited.

Thanks to Rust having a good FFI story for interoperability with C in either direction, writing the parts of the code that are called from C or calling into C is not that much of a headache and not worse than actually writing C.

Rust Bindings around C Library

This step could’ve been skipped if all I cared about was having a C API for the ported code later, or if I wanted to work with the tests of the C library for validation and worry about calling it from Rust at a later point. In this case I had already done safe Rust bindings around the C library before, and having a Rust API made it much easier to write tests that could be used during the porting and that could be automatically run at each step.

bindgen

As a first step for creating the Rust bindings there needs to be a way to actually call into the C code. In C there are the header files with the type definitions and function declarations, but Rust can’t directly work from those. The solution to this was in this case bindgen, which basically converts the C header files into something that Rust can understand. The resulting API is completely unsafe still but can be used in a next step to write safe Rust bindings around it.

I would recommend using bindgen for any non-trivial C API for which there is no better translation tool available, or for which there is no machine-readable description of the API that could be used instead by another tool. Parsing C headers is no fun and there is very little information available in C for generating safe bindings. For example for GObject-based libraries, using gir would be a better idea as it works from a rich XML description of the API that contains information about e.g. ownership transfer and allows to autogenerate safe Rust bindings in many cases.

Also the dependency on clang makes it hard to run bindgen as part of every build, so instead I’ve made sure that the code generated by bindgen is platform independent and included it inside the repository. If you use bindgen, please try to do the same. Requiring clang for building your crate makes everything more complicated for your users, especially if they’re unfortunate enough to use Windows.

But back to the topic. What bindgen generates is basically a translation of the C header into Rust: type definitions and function declarations. This looks for example as follows

\#[repr(C)]
\#[derive(Debug, Copy, Clone)]
pub struct ebur128_state {
    pub mode: ::std::os::raw::c_int,
    pub channels: ::std::os::raw::c_uint,
    pub samplerate: ::std::os::raw::c_ulong,
    pub d: *mut ebur128_state_internal,
}

extern "C" {
    pub fn ebur128_init(
        channels: ::std::os::raw::c_uint,
        samplerate: ::std::os::raw::c_ulong,
        mode: ::std::os::raw::c_int,
    ) -> *mut ebur128_state;

    pub fn ebur128_destroy(st: *mut *mut ebur128_state);

    pub fn ebur128_add_frames_int(
        st: *mut ebur128_state,
        src: *const ::std::os::raw::c_int,
        frames: usize,
    ) -> ::std::os::raw::c_int;
}

Based on this it is possible to call the C functions directly from unsafe Rust code and access the members of all the structs. It requires working with raw pointers and ensuring that everything is done correctly at any point to not cause memory corruption or worse. It’s just like using the API from C with a slightly different syntax.

Build System

To be able to call into the C API its implementation somehow has to be linked into your crate. As the C code later also has to be modified to call into the already ported Rust functions instead of the original C code, it makes most sense to build it as part of the crate instead of linking to an external version of it.

This can be done with the cc crate. It is called into from cargo‘s build.rs for configuring it, for example for configuring which C files to compile and how. Once done it is possible to call any exported C function from the Rust code. The build.rs is not really complicated in this case

fn main() {
    cc::Build::new()
        .file("src/c/ebur128.c")
        .compile("ebur128");
}

Safe Rust API

With all that in place a safe Rust API around the unsafe C functions can be written now. How this looks in practice differs from API to API and might require some more thought in case of a more complex API to ensure everything is still safe and sound from a Rust point of view. In this case it was fortunately rather simple.

For example the struct definition, the constructor and the destructor (Drop impl) looks as follows based on what bindgen generated above

pub struct EbuR128(ptr::NonNull<ffi::ebur128_state>);

The struct is a simple wrapper around std::ptr::NonNull, which itself is a zero-cost wrapper around raw pointers that additionally ensures that the stored pointer is never NULL and allows additional optimizations to take place based on that.

In other words: the Rust struct is just a raw pointer but with additional safety guarantees.

impl EbuR128 {
    pub fn new(channels: u32, samplerate: u32, mode: Mode) -> Result<Self, Error> {
        static ONCE: std::sync::Once = std::sync::Once::new();

        ONCE.call_once(|| unsafe { ffi::ebur128_libinit() });

        unsafe {
            let ptr = ffi::ebur128_init(channels, samplerate as _, mode.bits() as i32);
            let ptr = ptr::NonNull::new(ptr).ok_or(Error::NoMem)?;
            Ok(EbuR128(ptr))
        }
    }
}

The constructor is slightly more complicated as it also has to ensure that the one-time initialization function is called, once. This requires using std::sync::Once as above.

After that it calls the C constructor with the given parameters. This can return NULL in various cases when not enough memory could be allocated as described in the documentation of the C library. This needs to be handled gracefully here and instead of panicking an error is returned to the caller. ptr::NonNull::new() is returning an Option and if NULL is passed it would return None. If this happens it is transformed into an error together with an early return via the ? operator.

In the end the pointer then only has to be wrapped in the struct and be returned.

impl Drop for EbuR128 {
    fn drop(&mut self) {
        unsafe {
            let mut state = self.0.as_ptr();
            ffi::ebur128_destroy(&mut state);
        }
    }
}

The Drop trait is used for defining what should happen if a value of the struct goes out of scope and what should be done to clean up after it. In this case this means calling the destroy function of the C library. It takes a pointer to a pointer to its state, which is then set to NULL. As such it is necessary to store the raw pointer in a local variable and pass a mutable reference to it. Otherwise the ptr::NonNull would end up with a NULL pointer inside it, which would result in undefined behaviour.

The last function that I want to mention here is the one that takes a slice of audio samples for processing

    pub fn add_frames_i32(&mut self, frames: &[i32]) -> Result<(), Error> {
        unsafe {
            if frames.len() % self.0.as_ref().channels != 0 {
                return Err(Error::NoMem);
            }

            let res = ffi::ebur128_add_frames_int(
                self.0.as_ptr(),
                frames.as_ptr(),
                frames.len() / self.0.as_ref().channels,
            );
            Error::from_ffi(res as ffi::error, || ())
        }
    }

Apart from calling the C function it is again necessary to check various pre-conditions before doing so. The C function will cause out of bounds reads if passed a slice that doesn’t contain a sample for each channel, so this must be checked beforehand or otherwise the caller (safe Rust code!) could cause out of bounds memory accesses.

In the end after calling the function its return value is converted into a Result, converting any errors into the crate’s own Error enum.

As can be seen here, writing safe Rust bindings around the C API requires reading of the documentation of the C code and keeping all the safety guarantees of Rust in mind to ensure that it is impossible to violate those safety guarantees, no matter what the caller passes into the functions.

Not having to read the documentation and still being guaranteed that the code can’t cause memory corruption is one of the big advantages of Rust. If there even is documentation or it mentions such details.

Replacing first function: Resampler

Once the safe Rust API is done, it is possible to write safe Rust code that makes use of the C code. And among other things that allows to write tests to ensure that the ported code still does the same as the previous C code. But this will be the topic of the next section. Writing tests is boring, porting code is more interesting and that’s what I will start with.

To find the first function to port, I first read the C code to get a general idea of how the different pieces fit together and what the overall structure of the code is. Based on this I selected the resampler that is used in the true peak measurement. It is one of the leaf functions of the call-graph, that is, it is not calling into any other C code and is relatively independent from everything else. Unlike many other parts of the code it is also already factored out into separate structs and functions.

In the C code this can be found in the interp_create(), interp_destroy() and interp_process() functions. The resulting Rust code can be found in the interp module, which provides basically the same API in Rust except for the destroy() function which Rust provides for free, and corresponding extern "C" functions that can be called from C.

The create() function is not very interesting from a porting point of view: it simply allocates some memory and initializes it. The C and Rust versions of it look basically the same. The Rust version is only missing the checks for allocation failures as those can’t currently be handled in Rust, and handling allocation failures like in the C code is almost useless with modern operating systems and overcommitting allocators.

Also the struct definition is not too interesting, it is approximately the same as the one in C just that pointers to arrays are replaced by Vecs and lengths of them are directly taken from the Vec instead of storing them separately. In a later version the Vec were replaced by boxed slices and SmallVec but that shouldn’t be a concern here for now.

The main interesting part here is the processing function and how to provide all the functions to the C code.

Processing Function: Using Iterators

The processing function, interp_process(), is basically 4 nested for loops over each frame in the input data, each channel in each frame, the interpolator factors and finally the filter coefficients.

unsigned int out_stride = interp->channels * interp->factor;

for (frame = 0; frame < frames; frame++) {
  for (chan = 0; chan < interp->channels; chan++) {
    interp->z[chan][interp->zi] = *in++;
    outp = out + chan;
    for (f = 0; f < interp->factor; f++) {
      acc = 0.0;
      for (t = 0; t < interp->filter[f].count; t++) {
        int i = (int) interp->zi - (int) interp->filter[f].index[t];
        if (i < 0) {
          i += (int) interp->delay;
        }
        c = interp->filter[f].coeff[t];
        acc += (double) interp->z[chan][i] * c;
      }
      *outp = (float) acc;
      outp += interp->channels;
    }
  }
  out += out_stride;
  interp->zi++;
  if (interp->zi == interp->delay) {
    interp->zi = 0;
  }
}

This could be written the same way in Rust but slice indexing is not very idiomatic in Rust and using Iterators is preferred as it leads to more declarative code. Also all the indexing would lead to suboptimal performance due to the required bounds checks. So the main task here is to translate the code to iterators as much as possible.

When looking at the C code, the outer loop iterates over chunks of channels samples from the input and chunks of channels * factor samples from the output. This is exactly what the chunks_exact iterator on slices does in Rust. Similarly the second outer loop iterates over all samples of the input chunks and the two-dimensional array z, which has delay items per channel. On the Rust side I represented z as a flat, one-dimensional array for simplicty, so instead of indexing the chunks_exact() iterator is used again for iterating over it.

This leads to the following for the two outer loops

for (src, dst) in src
    .chunks_exact(self.channels)
    .zip(dst.chunks_exact_mut(self.channels * self.factor)) {
    for (channel_idx, (src, z)) in src
        .iter()
        .zip(self.z.chunks_exact_mut(delay))
        .enumerate() {
        // insert more code here
    }
}

Apart from making it more clear what the data access patterns are, this is also less error prone and gives more information to the compiler for optimizations.

Inside this loop, a ringbuffer z / zi is used to store the incoming samples for each channel and keeping the last delay number of samples for further processing. We’ll keep this part as it is for now and use explicit indexing. While a VecDeque, or any similar data structure from other crates, could be used here, it would complicate the code more, cause more allocations. I’ll revisit this piece of code in part 3 of this blog post.

The first inner loop now iterates over all filters, of which there are factor many and chunks of size channels from the output, for which the same translation as before is used. The second inner loop iterates over all coefficients of the filter and the z ringbuffer, and sums up the product of both which is then stored in the output.

So overall the body of the second outer loop above with the two inner loops would look as follows

z[self.zi] = *src;

for (filter, dst) in self
    .filter
    .iter()
    .zip(dst.chunks_exact_mut(self.channels)) {
        let mut acc = 0.0;
        for (c, index) in &filter.coeff {
            let mut i = self.zi as i32 - *index as i32;
            if i < 0 {
                i += self.delay as i32;
            }
            acc += z[i] as f64 * c;
        }

        dst[channel_idx] = acc as f32;
    }
}

self.zi += 1;
if self.zi == self.delay {
    self.zi = 0;
}

The full code after porting can be found here.

My general approach for porting C code with loops is what I did above: first try to understand the data access patterns and then find ways to express these with Rust iterators, and only if there is no obvious way use explicit indexing. And if the explicit indexing turns out to be a performance problem due to bounds checks, first try to reorganize the data so that direct iteration is possible (which usually also improves performance due to cache locality) and otherwise use some well-targeted usages of unsafe code. But more about that in part 3 of this blog post in the context of optimizations.

Exporting C Functions

To be able to call the above code from C, a C-compatible function needs to be exported for it. This involves working with unsafe code and raw pointers again as that’s the only thing C understands. The unsafe Rust code needs to assert the implicit assumptions that can’t be expressed in C and that the calling C code has to follow.

For the interpolator, functions with the same API as the previous C code are exported to keep the amount of changes minimal: create(), destroy() and process() functions.

\#[no_mangle]
pub unsafe extern "C" fn interp_create(taps: u32, factor: u32, channels: u32) -> *mut Interp {
    Box::into_raw(Box::new(Interp::new(
        taps,
        factor,
        channels,
    )))
}

\#[no_mangle]
pub unsafe extern "C" fn interp_destroy(interp: *mut Interp) {
    drop(Box::from_raw(interp));
}

The #[no_mangle] attribute on functions makes sure that the symbols are exported as is instead of being mangled by the compiler. extern "C" makes sure that the function has the same calling convention as a corresponding C function.

For the create() function the interpolator struct is wrapped in a Box, which is the most basic mechanism to do heap allocations in Rust. This is needed because the C code shouldn’t have to know about the layout of the Interp struct and should just handle it as an opaque pointer. Box::into_raw() converts the Box into a raw pointer that can be returned directly to C. This also passes ownership of the memory to C.

The destroy() function is doing the inverse of the above and calls Box::from_raw() to get a Box back from the raw pointer. This requires that the raw pointer passed in was actually allocated via a Box and is of the correct type, which is something that can’t really be checked. The function has to trust the C code to do this correctly. The standard approach to memory safety in C: trust that everybody is writing correct code.

After getting back the Box, it only has to be dropped and Rust will take care of deallocating any memory as needed.

\#[no_mangle]
pub unsafe extern "C" fn interp_process(
    interp: *mut Interp,
    frames: usize,
    src: *const f32,
    dst: *mut f32,
) -> usize {
    use std::slice;

    let interp = &mut *interp;
    let src = slice::from_raw_parts(src, interp.channels * frames);
    let dst = slice::from_raw_parts_mut(dst, interp.channels * interp.factor * frames);

    interp.process(src, dst);

    interp.factor * frames
}

The main interesting part for the processing function is the usage of slice::from_raw_parts. From the C side, the function again has to trust that the pointer are correct and actually contains frames number of audio frames. In Rust a slice knows about its size so some conversion between the two is needed: a slice of the correct size has to be created around the pointer. This does not involve any copying of memory, it only stores the length information together with the raw pointer. This is also the reason why it’s not required anywhere to pass the length separately to the Rust version of the processing function.

With this the interpolator is fully ported and the C functions can be called directly from the C code. On the C side they are declared as follows and then called as before

typedef void * interpolator;

extern interpolator* interp_create(unsigned int taps, unsigned int factor, unsigned int channels);
extern void interp_destroy(interpolator* interp);
extern size_t interp_process(interpolator* interp, size_t frames, float* in, float* out);

The commit that did all this also adds some tests to ensure that everything still works correctly. It also contains some optimizations on top of the code above and is not 100% the same code.

Writing Tests

For testing that porting the code part by part to Rust doesn’t introduce any problems I went for the common two layer approach: 1. integration tests that test if the whole thing still works correctly, and 2. unit tests for the rewritten component alone.

The integration tests come in two variants inside the ebur128 module: one variant just testing via assertions that the results on a fixed input are the expected ones, and one variant comparing the C and Rust implementations. The unit tests only come in the second variant for now.

To test the C implementation in the integration tests an old version of the crate that had no ported code yet is pulled in. For comparing the C implementation of the individual components, I extracted the C code into a separate C file that exported the same API as the corresponding Rust code and called both from the tests.

Comparing Floating Point Numbers

The first variant is not very interesting apart from the complications involved when comparing floating point numbers. For this I used the float_eq crate, which provides different ways of comparing floating point numbers.

The first variant of tests use it by checking if the absolute difference between the expected and actual result is very small, which is less strict than the ULP check used for the second variant of tests. Unfortunately this was required because depending on the used CPU and toolchain the results differ from the expected static results (generated with one CPU and toolchain), while for the comparison between the C and Rust implementation on the same CPU the results are basically the same.

quickcheck

quickcheck is a crate that allows to write randomized property tests. This seemed like the perfect tool for writing tests that compare the two implementations: the property to check for is equality of results, and it should be true for any possible input.

Using quickcheck is simple in principle. You write a test function that takes the inputs as parameters, do the processing and then check that the property you want to test for holds by either using assertions or returning a bool or TestResult.

\#[quickcheck]
fn compare_c_impl_i16(signal: Signal<i16>) {
    let mut ebu = EbuR128::new(signal.channels, signal.rate, ebur128::Mode::all()).unwrap();
    ebu.add_frames_i16(&signal.data).unwrap();

    let mut ebu_c =
        ebur128_c::EbuR128::new(signal.channels, signal.rate, ebur128_c::Mode::all()).unwrap();
    ebu_c.add_frames_i16(&signal.data).unwrap();

    compare_results(&ebu, &ebu_c, signal.channels);
}

quickcheck will then generate random values for the function parameters via the Arbitrary impl of the given types and call it many times. If one run fails, it tries to find a minimal testcase that fails based on “shrinking” the initial failure case and then prints that failing, shrunk testcase.

And this is the part of using quickcheck that involves some more effort: writing a reasonable Arbitrary impl for the inputs that can also be shrunk in a useful way on failures.

For the tests here I came up with a Signal type. Its Arbitrary implementation creates an audio signal with 1-16 channels and multiple sine waves of different amplitudes and frequencies. Shrinking first tries to reproduce the problem with a single channel, and then halving the signal length.

This worked well in practice so far. It doesn’t cover all possible inputs but should cover anything that can fail, and the simple shrinking approach also helped to find smallish testcases if something failed. But of course it’s not a perfect solution, only a practical one.

Based on these sets of tests I could be reasonably certain that the C and Rust implementation provide exactly the same results, so I could start porting the next part of the code.

True Peak: Macros vs. Traits

C doesn’t really have any mechanisms for writing code that is generic over different types (other than void *), or any more advanced means of abstraction than functions and structs. For that reason the C code uses macros via the C preprocessor in various places to write code once for the different input types (i16, i32, f32 and f64 or in C terms short, int, float and double). The C preprocessor is just a fancy mechanism for string concatenation so this is rather unpleasant to write and read, results might not even be valid C code and the resulting compiler errors are often rather confusing.

In Rust macros could also be used for this. While more clean than C macros because of macro hygiene rules and working on a typed token tree instead of just strings, this still would end up with hard to write and read code with possibly confusing compiler errors. For abstracting over different types, Rust provides traits. These allow to write code generic over different types with a well-defined interface, and allow to do much more but I won’t cover more here.

One example of macro usage in the C code is the input processing, of which the true peak measurement is one part. In C this basically looks as follows, with some parts left out because they’re not relevant for the true peak measurement itself

static void ebur128_check_true_peak(ebur128_state* st, size_t frames) {
  size_t c, i, frames_out;

  frames_out =
      interp_process(st->d->interp, frames, st->d->resampler_buffer_input,
                     st->d->resampler_buffer_output);

  for (i = 0; i < frames_out; ++i) {
    for (c = 0; c < st->channels; ++c) {
      double val =
          (double) st->d->resampler_buffer_output[i * st->channels + c];

      if (EBUR128_MAX(val, -val) > st->d->prev_true_peak[c]) {
        st->d->prev_true_peak[c] = EBUR128_MAX(val, -val);
      }
    }
  }
}

\#define EBUR128_FILTER(type, min_scale, max_scale)                             \
  static void ebur128_filter_##type(ebur128_state* st, const type* src,        \
                                    size_t frames) {                           \
    static double scaling_factor =                                             \
        EBUR128_MAX(-((double) (min_scale)), (double) (max_scale));            \
    double* audio_data = st->d->audio_data + st->d->audio_data_index;          \
                                                                               \
    // some other code                                                         \
                                                                               \
    if ((st->mode & EBUR128_MODE_TRUE_PEAK) == EBUR128_MODE_TRUE_PEAK &&       \
        st->d->interp) {                                                       \
      for (i = 0; i < frames; ++i) {                                           \
        for (c = 0; c < st->channels; ++c) {                                   \
          st->d->resampler_buffer_input[i * st->channels + c] =                \
              (float) ((double) src[i * st->channels + c] / scaling_factor);   \
        }                                                                      \
      }                                                                        \
      ebur128_check_true_peak(st, frames);                                     \
    }                                                                          \
                                                                               \
    // some other code                                                         \
}

EBUR128_FILTER(short, SHRT_MIN, SHRT_MAX)
EBUR128_FILTER(int, INT_MIN, INT_MAX)
EBUR128_FILTER(float, -1.0f, 1.0f)
EBUR128_FILTER(double, -1.0, 1.0)

What the invocations of the macro at the bottom do is to take the whole macro body and replace the usages of type, min_scale and max_scale accordingly. That is, one ends up with a function ebur128_filter_short() that works on a const short *, uses 32768.0 as scaling_factor and the corresponding for the 3 other types.

To convert this to Rust, first a trait that provides all the required operations has to be defined and then implemented on the 4 numeric types that are supported as audio input. In this case, the only required operation is to convert the input values to an f32 between -1.0 and +1.0.

pub(crate) trait AsF32: Copy {
    fn as_f32(self) -> f32;
}

impl AsF32 for i16 {
    fn as_f32(self) -> f32 {
        self as f32 / -(i16::MIN as f32)
    }
}

// And the same for i32, f32 and f64

Once this trait is defined and implemented on the needed types the Rust function can be written generically over the trait

pub(crate) fn check_true_peak<T: AsF32>(&mut self, src: &[T], peaks: &mut [f64]) {
    assert!(src.len() <= self.buffer_input.len());
    assert!(peaks.len() == self.channels);

    for (o, i) in self.buffer_input.iter_mut().zip(src.iter()) {
        *o = i.as_f32();
    }

    self.interp.process(
        &self.buffer_input[..(src.len())],
        &mut self.buffer_output[..(src.len() * self.interp_factor)],
    );

    for (channel_idx, peak) in peaks.iter_mut().enumerate() {
        for o in self.buffer_output[..(src.len() * self.interp_factor)]
            .chunks_exact(self.channels) {
            let v = o[channel_idx].abs() as f64;
            if v > *peak {
              *peak = v;
            }
        }
    }
}

This is not a direct translation of the C code though. As part of rewriting the C code I also factored out the true peak detection from the filter function into its own function. It is called from the filter function shown in the C code a bit further above. This way it was easy to switch only this part from a C implementation to a Rust implementation while keeping the other parts of the filter, and to also test it separately from the whole filter function.

All this can be found in this commit together with tests and benchmarks. The overall code is a bit different than what is listed above, and also the latest version in the repository looks a bit different but more on that in part 3 of this blog post.

One last thing worth mentioning here is that the AsF32 trait is not public API of the crate and neither are the functions generic over the input type. Instead, the generic functions are only used internally and the public API only provides 4 functions that are specialized to the concrete input types. This keeps the API surface smaller and makes the API easier to understand for users.

Loudness History

The next component I ported to Rust was the loudness history data structure. This is used to keep a history of previous loudness measurements for giving longer-term results and it exists in two variants: a histogram-based one and a queue-based one with a maximum length.

As part of the history data structure there are also a couple of operations to calculate values from it, but I won’t go into details of them here. They were more or less direct translations of the C code.

In C this data structure and the operations on it were distributed all over the code and part of the main state struct, so the first step to port it over to Rust was to identify all those pieces and put them behind some kind of API.

This is also something that I noticed when porting the FFmpeg loudness normalization filter: Rust making it much less effort to define new structs, functions or even modules than C seems to often lead to more modular code with clear component boundaries instead of everything put together in the same place. Requirements from the borrow checker often also make it more natural to split components into separate structs and functions.

Check the commit for the full details but in the end I ended up with the following functions that are called from the C code

typedef void * history;

extern history* history_create(int use_histogram, size_t max);
extern void history_add(history* hist, double energy);
extern void history_set_max_size(history* hist, size_t max);
extern double history_gated_loudness(const history* hist);
extern double history_relative_threshold(const history* hist);
extern double history_loudness_range(const history* hist);
extern void history_destroy(history *hist);

Enums

In the C code the two variants of the history were implemented by having both of them always present in the state struct but only one of them initialized and then at every usage site having code like the following

if (st->d->use_histogram) {
    // histogram code follows here
} else {
    // queue code follows here
}

Doing it like this is error prone, easy to forget and having fields in the struct that are unused in certain configurations seems wasteful. In Rust this situation is naturally expressed with enums

enum History {
    Histogram(Histogram),
    Queue(Queue),
}

struct Histogram { ... }
struct Queue { ... }

This allows to use the same storage for both variants and at each usage site the compiler enforces that both variants are explicitly handled

match history {
    History::Histogram(ref hist) => {
        // histogram code follows here
    }
    History::Queue(ref queue) => {
        // queue code follows here
    }
}

Splitting it up like this with an enum also leads to implementing the operations of the two variants directly on their structs and only implementing the common code directly implemented on the history enum. This also improves readability because it’s immediately clear what a piece of code applies to.

Logarithms

As mentioned in the overview already, there are some portability concerns in the C code. One of them showed up when porting the history and comparing the results of the ported code with the C code. This resulted in the following rather ugly code

fn energy_to_loudness(energy: f64) -> f64 {
    #[cfg(feature = "internal-tests")]
    {
        10.0 * (f64::ln(energy) / f64::ln(10.0)) - 0.691
    }
    #[cfg(not(feature = "internal-tests"))]
    {
        10.0 * f64::log10(energy) - 0.691
    }
}

In the C code, ln(x) / ln(10) is used everywhere for calculating the base 10 logarithm. Mathematically that’s the same thing but in practice this is unfortunately not true and the explicit log10() function is both faster and more accurate. Unfortunately it’s not available everywhere in C (it’s available since C99) so was not used in the C code. In Rust it is always available so I first used it unconditionally.

When running the tests later, they failed because the results of C code where slightly different than the results of the Rust code. In the end I tracked it down to the usage of log10(), so for now when comparing the two implementations the slower and less accurate version is used.

Lazy Initialization

Another topic I already mentioned in the overview is one-time initialization. For using the histogram efficiently it is necessary to calculate the values at the edges of each histogram bin as well as the center values. These values are always the same so they could be calculated once up-front. The C code calculated them whenever a new instance was created.

In Rust, one could build something around std::sync::Once together with static mut variables for storing the data, but that would be not very convenient and also would require using some unsafe code. static mut variables are inherently unsafe. Instead this can be simplified with the lazy_static or once_cell crates, and the API of the latter is also available now as part of the Rust standard library in the nightly versions.

Here I used lazy_static, which leads to the following code

lazy_static::lazy_static! {
    static ref HISTOGRAM_ENERGIES: [f64; 1000] = {
        let mut energies = [0.0; 1000];

        for (i, o) in energies.iter_mut().enumerate() {
            *o = f64::powf(10.0, (i as f64 / 10.0 - 69.95 + 0.691) / 10.0);
        }

        energies
    };

    static ref HISTOGRAM_ENERGY_BOUNDARIES: [f64; 1001] = {
        let mut boundaries = [0.0; 1001];

        for (i, o) in boundaries.iter_mut().enumerate() {
            *o = f64::powf(10.0, (i as f64 / 10.0 - 70.0 + 0.691) / 10.0);
        }

        boundaries
    };
}

On first access to e.g. HISTOGRAM_ENERGIES the corresponding code would be executed and from that point onwards it would be available as a read-only array with the calculated values. In practice this later turned out to cause performance problems, but more on that in part 3 of this blog post.

Another approach for calculating these constant numbers would be to calculate them at compile-time via const functions. This is almost possible with Rust now, the only part missing is a const variant of f64::powf(). It is also not available as a const function in C++ so there is probably a deeper reason behind this. Otherwise the code would look exactly like the code above except that the variables would be plain static instead of static ref and all calculations would happen at compile-time.

In the latest version of the code, and until f64::powf() is available as a const function, I’ve decided to simply include a static array with the calculated values inside the code.

Data Structures

And the last topic for the history is an implementation detail of the queue-based implementation. As I also mentioned during the overview, the C code is using a linked-list-based queue, and this is exactly where it is used.

The queue is storing one f64 value per entry, which means that in the end there is one heap memory allocation of 12 or 16 bytes per entry, depending on pointer size. That’s a lot of very small allocations, each allocation is 50-100% bigger than the actual payload and that’s ignoring any overhead by the allocator itself. By this quite some memory is wasted, and by having each value in a different allocation and having to follow a pointer to the next one, any operations on all values is not going to be very cache efficient.

As C doesn’t have any data structures in its standard library and this linked-list-based queue is something that is readily available on the BSDs and Linux at least, it probably makes sense to use it here instead of implementing a more efficient data structure inside the code. But it still seems really suboptimal for this use-case.

In Rust the standard library provides an ringbuffer-based VecDeque, which offers exactly the API that is needed here, stores all values tightly packed and thus doesn’t have any memory wasted per value and at the same time provides better cache efficiency. And it is available everywhere where the Rust standard library is available, unlike the BSD queue used by the C implementation.

In practice, apart from the obvious savings in memory, this also caused the Rust implementation without any further optimizations to take only 50%-70% of the time that the C implementation took, depending on the operation.

Filter: Flushing Denormals to Zero

Overall porting the filter function from C was the same as everything mentioned before so I won’t go into details here. The whole commit porting it can be found here.

There is only one aspect I want to focus on here: if available on x86/x86-64 the MXCSR register temporarily gets the _MM_FLUSH_ZERO_ON bit set to flush denormal floating point number to zero. That is, denormals (i.e. very small numbers close to zero) as result of any floating point operation are considered to be zero. If hardware support is not available, at the end of each filter call the values kept for the next call are manually set to zero if they contain denormals.

This is done in the C code for performance reasons. Operations on denormals are generally much slower than on normalized floating point numbers and it has a measurable impact on the performance in this case.

In Rust this had to be replicated. Not only for the performance reasons but also because otherwise the results of both implementations would be slightly different and comparing them in the tests would be harder.

On the C side, this requires some build system integration and usage of the C preprocessor to decide whether the hardware support for this can be used or not, and then some conditional code that is used in the EBUR128_FILTER macro that was shown a few sections above already. Specifically this is the code

\#if defined(__SSE2_MATH__) || defined(_M_X64) || _M_IX86_FP >= 2
\#include <xmmintrin.h>
\#define TURN_ON_FTZ                                                            \
  unsigned int mxcsr = _mm_getcsr();                                           \
  _mm_setcsr(mxcsr | _MM_FLUSH_ZERO_ON);
\#define TURN_OFF_FTZ _mm_setcsr(mxcsr);
\#define FLUSH_MANUALLY
\#else
\#warning "manual FTZ is being used, please enable SSE2 (-msse2 -mfpmath=sse)"
\#define TURN_ON_FTZ
\#define TURN_OFF_FTZ
\#define FLUSH_MANUALLY                                                         \
  st->d->v[c][4] = fabs(st->d->v[c][4]) < DBL_MIN ? 0.0 : st->d->v[c][4];      \
  st->d->v[c][3] = fabs(st->d->v[c][3]) < DBL_MIN ? 0.0 : st->d->v[c][3];      \
  st->d->v[c][2] = fabs(st->d->v[c][2]) < DBL_MIN ? 0.0 : st->d->v[c][2];      \
  st->d->v[c][1] = fabs(st->d->v[c][1]) < DBL_MIN ? 0.0 : st->d->v[c][1];
\#endif

This is not really that bad and my only concern here would be that it’s relatively easy to forget calling TURN_OFF_FTZ once the filter is done. This would then affect all future floating point operations outside the filter and potentially cause a lot of problems. This blog post gives a nice example of an interesting bug caused by this and shows how hard it was to debug it.

When porting this to more idiomatic Rust, this problem does not exist anymore.

This is the Rust implementation I ended up with

\#[cfg(all(
    any(target_arch = "x86", target_arch = "x86_64"),
    target_feature = "sse2"
))]
mod ftz {
    #[cfg(target_arch = "x86")]
    use std::arch::x86::{_mm_getcsr, _mm_setcsr, _MM_FLUSH_ZERO_ON};
    #[cfg(target_arch = "x86_64")]
    use std::arch::x86_64::{_mm_getcsr, _mm_setcsr, _MM_FLUSH_ZERO_ON};

    pub struct Ftz(u32);

    impl Ftz {
        pub fn new() -> Option<Self> {
            unsafe {
                let csr = _mm_getcsr();
                _mm_setcsr(csr | _MM_FLUSH_ZERO_ON);
                Some(Ftz(csr))
            }
        }
    }

    impl Drop for Ftz {
        fn drop(&mut self) {
            unsafe {
                _mm_setcsr(self.0);
            }
        }
    }
}

\#[cfg(not(any(all(
    any(target_arch = "x86", target_arch = "x86_64"),
    target_feature = "sse2"
))))]
mod ftz {
    pub struct Ftz;

    impl Ftz {
        pub fn new() -> Option<Self> {
            None
        }
    }
}

While a bit longer, it is also mostly whitespace. The important part to notice here is that when using the hardware support, a struct with a Drop impl is returned and once this struct is leaving the scope it would reset the MXCSR register again to its previous value. This way it can’t be forgotten and would also be reset as part of stack unwinding in case of panics.

On the usage side this looks as follows

let ftz = ftz::Ftz::new();

// all the calculations

if ftz.is_none() {
    // manual flushing of denormals to zero
}

No macros are required in Rust for this and all the platform-specific code is nicely abstracted away in a separate module. In the future support for this on e.g. ARM could be added and it would require no changes anywhere else, just the addition of another implementation of the Ftz struct.

Making it bullet-proof

As Anthony Ramine quickly noticed and told me on Twitter, the above is not actually sufficient. For non-malicious code using the Ftz type everything is alright: on every return path, including panics, the register would be reset again.

However malicious (or simply very confused?) code could make use of e.g. mem::forget(), Box::leak() or some other function to “leak” the Ftz value and cause the Drop implementation to never actually run and reset the register’s value. It’s perfectly valid to leak memory in safe Rust, so it’s not a good idea to rely on Drop implementations too much.

The solution for this can be found in this commit but the basic idea is to never actually give out a value of the Ftz type but only pass an immutable reference to safe Rust code. This then basically looks as follows

mod ftz {
    pub fn with_ftz<F: FnOnce(Option<&Ftz>) -> T, T>(func: F) -> T {
        unsafe {
            let ftz = Ftz::new();
            func(Some(&ftz))
        }
    }
}

ftz::with_ftz(|ftz| {
    // do things or check if `ftz` is None
});

This way it is impossible for any code outside the ftz module to leak the value and prevent resetting of the register.

Input Processing: Order of Operations Matters

The other parts of the processing code were relatively straightforward to port and not really different from anything I already mentioned above. However as part of porting that code I ran into a problem that took quite a while to debug: Once ported, the results of the C and Rust implementation were slightly different again.

I went through the affected code in detail and didn’t notice anything obvious. Both the C code and the Rust code were doing the same, so why are the results different?

This is the relevant part of the C code

size_t i;
double channel_sum;

channel_sum = 0.0;
if (st->d->audio_data_index < frames_per_block * st->channels) {
  for (i = 0; i < st->d->audio_data_index / st->channels; ++i) {
    channel_sum += st->d->audio_data[i * st->channels + c] *
                   st->d->audio_data[i * st->channels + c];
  }
  for (i = st->d->audio_data_frames -
           (frames_per_block - st->d->audio_data_index / st->channels);
       i < st->d->audio_data_frames; ++i) {
    channel_sum += st->d->audio_data[i * st->channels + c] *
                   st->d->audio_data[i * st->channels + c];
  }
} else {
  for (i = st->d->audio_data_index / st->channels - frames_per_block;
       i < st->d->audio_data_index / st->channels; ++i) {
    channel_sum += st->d->audio_data[i * st->channels + c] *
                   st->d->audio_data[i * st->channels + c];
  }
}

and the first version of the Rust code

let mut channel_sum = 0.0;

if audio_data_index < frames_per_block * channels {
    channel_sum += audio_data[..audio_data_index]
        .chunks_exact(channels)
        .map(|f| f[c] * f[c])
        .sum();

    channel_sum += audio_data
        [(audio_data.len() - frames_per_block * channels + audio_data_index)..]
        .chunks_exact(channels)
        .map(|f| f[c] * f[c])
        .sum();
} else {
    channel_sum += audio_data
        [(audio_data_index - frames_per_block * channels)..audio_data_index]
        .chunks_exact(channels)
        .map(|f| f[c] * f[c])
        .sum();
}

The difference between the two variants is the order of floating point operations in the if branch. The C code sums up all values into the same accumulator, while the Rust code first sums both parts into a separate accumulator and then adds them together. I changed it to do exactly the same and that caused the tests to pass again.

The order in which floating point operations are done matters, unfortunately, and in the example above the difference was big enough to cause the tests to fail. And the above is a nice practical example that shows that addition on floating point numbers is actually not associative.

Last C Code: Replacing API Layer

The last step was the most satisfying one: getting rid of all the C code. This can be seen in this commit. Note that the performance numbers in the commit message are wrong. At that point both versions were much closer already performance-wise and the Rust implementation was even faster in some configurations.

Up to that point all the internal code was already ported to Rust and the only remaining C code was the public C API, which did some minor tasks and then called into the Rust code. Technically this was not very interesting so I won’t get into any details here. It doesn’t add any new insights and this blog post is already long enough!

If you check the git history, all commits that followed after this one were cleanups, addition of some new features, adding more tests, performance optimizations (see the next part of this blog post) and adding a C-compatible API around the Rust implementation (see the last part of this blog post). The main part of the work was done.

Difficulty and Code Size

With the initial porting out of the way, I can now also answer the first two questions I wanted to get answered as part of this project.

Porting the C code to Rust was not causing any particular difficulties. The main challenges were

  • Understanding what the C code actually does and mapping the data access patterns to Rust iterators for more idiomatic and faster code. This also has the advantage of making the code clearer than with the C-style for loops. Porting was generally a pleasant experience and the language did not get in the way when implementing such code.
    Also, by being able to port it incrementally I could always do a little bit during breaks and didn’t have to invest longer, contiguous blocks of time into the porting.
  • Refactoring the C code to be able to replace parts of it step by step with Rust implementations. This was complicated a bit by the fact that the C code did not factor out the different logical components nicely but instead kept everything entangled.
    From having worked a lot with C for the more than 15 years, I wouldn’t say that this is because the code is bad (it is not!) but simply because C encourages writing code like this. Defining new structs or new functions seems like effort, and even worse if you try to move code into separate files because then you also have to worry about a header file and keeping the code and the header file in sync. Rust simplifies this noticeably and the way how the language behaves encourages splitting the code more into separate components.

Now for the size of the code. This is a slightly more complicated question. Rust and the default code style of rustfmt cause code to be spread out over more lines and to have more whitespace than the structurally same code in C with the common code styles used in C. In my experience, visually Rust code looks much less dense than C code for this reason.

Intuitively I would say that I have written much less Rust code for the actual implementation than there was C code, but lacking any other metrics let’s take a look at the lines of code while ignoring tests and comments. I used tokei for this.

  • 1211 lines for the Rust code
  • 1741 lines for the C code. Of this, 494 lines are headers and 367 lines of the headers are the queue implementation. That is, there are 1247 lines of non-header C code.

This makes the Rust implementation only slightly smaller if we ignore the C headers. Rust allows to write more concise code so I would have expected the difference to be bigger. At least partially this can probably be attributed to the different code formatting that causes Rust code to be less visually dense and as a result have code spread out over more lines than otherwise.

In any case, overall I’m happy with the results so far.

I will look at another metric of code size in the last part of this blog post for some further comparison: the size of the compiled code.

Next Part

In the next part of this blog post I will describe the performance optimizations I did to make the Rust implementation at least as fast as the C one and the problems I ran into while doing so. The previous two parts of the blog post had nothing negative to say about Rust but this will change in the third part. The Rust implementation without any optimizations was already almost as fast as the C implementation thanks to how well idiomatic Rust code can be optimized by the compiler, but the last few percent were not as painless as one would hope. In practice the performance difference probably wouldn’t have mattered.

From looking at the git history and comparing the code, you will also notice that some of the performance optimizations already happened as part of the porting. The final code is not exactly what I presented above.

Porting EBU R128 audio loudness analysis from C to Rust

Over the last few weeks I ported the libebur128 C library to Rust, both with a proper Rust API as well as a 100% compatible C API.

This blog post will be split into 4 parts that will be published over the next weeks

  1. Overview and motivation
  2. Porting approach with various details, examples and problems I ran into along the way
  3. Performance optimizations
  4. Building Rust code into a C library as drop-in replacement

If you’re only interested in the code, that can be found on GitHub and in the ebur128 crate on crates.io.

The initial versions of the ebur128 crate was built around the libebur128 C library (and included its code for ease of building), version 0.1.2 and newer is the pure Rust implementation.

EBU R128

libebur128 implements the EBU R128 loudness standard. The Wikipedia page gives a good summary of the standard, but in short it describes how to measure loudness of an audio signal and how to use this for loudness normalization.

While this intuitively doesn’t sound very complicated, there are lots of little details (like how human ears are actually working) that make this not as easy as one might expect. This results in there being many different ways for measuring loudness and is one of the reasons why this standard was introduced. Of course it is also not the only standard for this.

libebur128 is also the library that I used in the GStreamer loudness normalization plugin, about which I wrote a few weeks ago already. By porting the underlying loudness measurement code to Rust, the only remaining C dependency of that plugin is GStreamer itself.

Apart from that it is used by FFmpeg, but they include their own modified copy, as well as many other projects that need some kind of loudness measurement and don’t use ReplayGain, another older but widely used standard for the same problem.

Why?

Before going over the details of what I did, let me first explain why I did this work at all. libebur128 is a perfectly well working library, in wide use for a long time and probably rather bug-free at this point and it was already possible to use the C implementation from Rust just fine. That’s what the initial versions of the ebur128 crate were doing.

My main reason for doing this was simply because it seemed like a fun little project. It isn’t a lot of code that is changing often so once ported it should be more or less finished and it shouldn’t be much work to stay in sync with the C version. I started thinking about doing this already after the initial release of the C-based ebur128 release, but after reading Joe Neeman’s blog post about porting another C audio library (RNNoise) to Rust this gave me the final push to actually start with porting the code and to follow through until it’s done.

However, don’t go around and ask other people to rewrite their projects in Rust (don’t be rude) or think that your own rewrite is magically going to be much faster and less buggy than the existing implementation. While Rust saves you from a big class of possible bugs, it doesn’t save you from yourself and usually rewrites contain bugs that didn’t exist in the original implementation. Also getting good performance in Rust requires, like in every other language, some effort. Before rewriting any software, think about the goals of this rewrite realistically as well as the effort required to actually get it finished.

Apart from fun there were also a few technical and non-technical reasons for me to look into this. I’m going to just list two here (curiosity and portability). I will skip the usual Rust memory-safety argument as that seems less important with this code: the C code is widely used for a long time, not changing a lot and has easy to follow memory access patterns. While it definitely had a memory safety bug (see above), it was rather difficult to trigger and it was fixed in the meantime.

Curiosity

Personally and at my company Centricular we try to do any new projects where it makes sense in Rust. While this worked very well in the past and we got great results, there were some questions for future projects that I wanted to get some answers, hard data and personal experience for

  • How difficult is it to port a C codebase function by function to Rust while keeping everything working along the way?
  • How difficult is it to get the same or better performance with idiomatic Rust code for low-level media processing code?
  • How much bigger or smaller is the resulting code and do Rust’s higher-level concepts like iterators help to keep code concise?
  • How difficult is it to create a C-compatible library in Rust with the same API and ABI?

I have some answers to all these questions already but previous work on this was not well structured and the results were also not documented, which I’m trying to change here now. Both to have a reference for myself in the future as well as for convincing other people that Rust is a reasonable technology choice for such projects.

As you can see the general pattern of these questions are introducing Rust into an existing codebase, replacing existing components with Rust and writing new components in Rust, which is also relates to my work on the Rust GStreamer bindings.

Portability

C is a very old language and while there is a standard, each compiler has its own quirks and each platform different APIs on top of the bare minimum that the C standard defines. C itself is very portable, but it is not easy to write portable C code, especially when not using a library like GLib that hides these differences and provides basic data structures and algorithms.

This seems to be something that is often forgotten when the portability of C is given as an argument against Rust, and that’s the reason why I wanted to mention this here specifically. While you can get a C compiler basically everywhere, writing C code that also runs well everywhere is another story and C doesn’t make this easy by design. Rust on the other hand makes writing portable code quite easy in my experience.

In practice there were three specific issues I had for this codebase. Most of the advantages of Rust here are because it is a new language and doesn’t have to carry a lot of historical baggage.

Mathematical Constants and Functions

Mathematical constants are not actually part of any C standard. While most compilers just define M_PI (for π), M_E (for 𝖾) and others in math.h nonetheless as they’re defined by POSIX and UNIX98.

Microsoft’s MSVC doesn’t, but instead you have to #define _USE_MATH_DEFINES before including math.h.

While not a big problem per-se, it is annoying and indeed caused the initial version of the ebur128 Rust crate to not compile with MSVC because I forgot about it.

Similarly, which mathematical functions are available depends a lot on the target platform and which version of the C standard is supported. An example of this is the log10 function to calculate the base-10 logarithm. For portability reasons, libebur128 didn’t use it but instead calculated it via the natural logarithm (ln(x) / ln(10) = log10(x)) because it’s only available in POSIX and since C99. While C99 is from 1999, there are still many compilers out there that don’t fully support it, again most prominently MSVC until very recently.

Using log10 instead of going via the natural logarithm is faster and more precise due to floating point number reasons, which is why the Rust implementation uses it but in C it would be required to check at build-time if the function is available or not, which complicates the build process and can easily be forgotten. libebur128 decided to not bother with these complications and simply not use it. Because of that, some conditional code in the Rust implementation is necessary for ensuring that both implementations return the same results in the tests.

Data Structures

libebur128 uses a linked-list-based queue data structure. As the C standard library is very minimal, no collection data structures are included. However on the BSDs and also on Linux with the GNU C library there is one available in sys/queue.h.

Of course MSVC does not have this and other compilers/platforms probably won’t have it either, so libebur128 included a local copy of that queue implementation. Now when building, one has to decide whether there is a system implementation available or otherwise use the internal version. Or simply always use the internal version.

Copying implementations of basic data structures and algorithms into every single project is ugly and error-prone, so let’s maybe not do that. C not having a standardized mechanism for dependency handling doesn’t help with this, which is unfortunately why this is very common in C projects.

One-time Initialization

Thread-safe one-time initialization is another thing that is not defined by the C standard, and depending on your platform there are different APIs available for it or none at all. POSIX again defines one that is widely available, but you can’t really depend on it unconditionally.

This complicates the code and build procedure, so libebur128 simply did not do that and did its one-time initializations of some global arrays every time a new instance was created. Which is probably fine, but a bit wasteful and probably strictly-speaking according to the C standard not actually thread-safe.

The initial version of the ebur128 Rust crate side-stepped this problem by simply doing this initialization once with the API provided by the Rust standard library. See part 2 and part 3 of this blog post for some more details about this.

Easier to Compile and Integrate

A Rust port only requires a Rust compiler, a mixed C/Rust codebase requires at least a C compiler in addition and some kind of build system for the C code.

libebur128 uses CMake, which would be an additional dependency so in the initial version of the ebur128 crate I went via cargo‘s build.rs build scripts and the cc crate as building libebur128 is easy enough. This works but build scripts are problematic for integration of the Rust code into other build systems than cargo.

The Rust port also makes use of conditional compilation in various places. Unlike in C with the preprocessor, non-standardized and inconsistent platform #defines and it being necessary to integrate everything in a custom way into the build system, Rust has a principled and well-designed approach to this problem. This makes it easier to keep the code clean, easier to maintain and more portable.

In addition to build system related simplifications, by not having any C code it is also much easier to compile the code to other targets like WebAssembly, which is natively supported by Rust. It is also possible to compile C to WebAssembly but getting both toolchains to agree with each other and produce compatible code seems not very easy.

Overview

As mentioned above, the code can be found on GitHub and in the ebur128 crate on crates.io.

The current version of the code produces the exact same results as the C version. This is enforced by the quickcheck tests that are running randomized inputs through both versions and check that the results are the same. The code also succeeds all the tests in the EBU loudness test set, so should hopefully be standards compliant as long as the test implementation is not wrong.

Performance-wise the Rust implementation is at least as fast as the C implementation. In some configurations it’s a few percent faster but probably not enough that it actually matters in practice. There are various benchmarks for both versions in different configurations available. The benchmarks are based on the criterion crate, which uses statistical methods to give as accurate as possible results. criterion also generates nice results with graphs for making analysis of the results more pleasant. See part 3 of this blog post for more details.

Writing tests and benchmarks for Rust is so much easier and feels more natural then doing it in C, so the Rust implementation has quite good coverage of the different code paths now. Especially no struggling with build systems was necessary like it would have been in C thanks to cargo and Rust having built-in support. This alone seems to have the potential to cause Rust code having, on average, better quality than similar code written in C.

It is also possible to compile the Rust implementation into a C library with the great cargo-c tool. This easily builds the code as a static/dynamic C library and installs the library, a C header file and also a pkg-config file. With this the Rust implementation is a 100% drop-in replacement of the C libebur128. It is not even necessary to recompile existing code. See part 4 of this blog post for more details.

Dependencies

Apart from the Rust standard library the Rust implementation depends on two other, small and widely used crates. Unlike with C, depending on external dependencies is rather simple with Rust and cargo. The two crates in question are

  • smallvec for a dynamically sized vectors/arrays that can be stored on the stack up to a certain size and only then fall back to heap allocations. This allows to avoid a couple of heap allocations under normal usage.
  • bitflags, which provides a macro for implementing properly typed bitflags. This is used in the constructor of the main type for selecting the features and modes that should be enabled, which directly maps to how the C API works (just with less type-safety).

Unsafe Code

A common question when announcing a Rust port of some C library is how much unsafe code was necessary to reach the same performance as the C code. In this case there are two uses of unsafe code outside the FFI code to call the C implementation in the tests/benchmarks and the C API.

Resampler

The True Peak measurement is using a resampler to upsample the audio signal to a higher sample rate. As part of the most inner loop of the resampler a statically sized ringbuffer is used.

As part of that ringbuffer, explicit indexing of a slice is needed. While the indexes are already manually checked to wrap around when needed, the Rust compiler and LLVM can’t figure that out so additional bounds checks plus panic handling is present in the compiled code. Apart from slowing down the loop with the additional condition, the panic code also causes the whole loop to be optimized less well.

So to get around that, unsafe indexing into the slice is used for performance reasons. While it requires a human now to check the memory safety of the code instead of relying on the compiler, the code in question is simple and small enough that it shouldn’t be a problem in practice.

More on this in part 2 and part 3 of this blog post.

Flushing Denormals to Zero

The other use of unsafe code is in the filter that is applied to the incoming audio signal. On x86/x86-64 the MXCSR register temporarily gets the _MM_FLUSH_ZERO_ON bit set to flush denormal floating point number to zero. That is, denormals (i.e. very small numbers close to zero) as result of any floating point operation are considered as zero.

This happens both for performance reasons as well as correctness reasons. Operations on denormals are generally much slower than on normalized floating point numbers. This has a measurable impact on the performance in this case.

Also as the C library does the same and not flushing denormals to zero would lead to slightly different results. While this difference doesn’t matter in practice as it’s very very small, it would make it harder to compare the results of both implementations as they wouldn’t be as close to each other anymore.

Doing this affects every floating point operation that happens while that bit is set, but because these are only the floating point operations performed by this crate and it’s guaranteed that the bit is unset again (even in case of panics) before leaving the filter, this shouldn’t cause any problems for other code.

Additional Features

Once the C library was ported and performance was comparable to the C implementation, I shortly checked the issues reported on the C library to check if there’s any useful feature requests or bug reports that I could implement / fix in the Rust implementation. There were three, one of which I also wanted for a future project.

None of the new features are available via the C API at this point for compatibility reasons.

Resetting the State

For this one there was a PR already for the C library. Previously the only way to reset all measurements was to create a new instance, which involves new memory allocations, filter initialization, etc..

It’s easy enough to provide a reset method to do only the minimal work required to reset all measurements and restart with a fresh state so I’ve added that to the Rust implementation.

Fix set_max_window() to actually work

This was a bug introduced in the C implementation a while ago in an attempt to prevent integer overflows when calculating sizes of memory allocations, which then would cause memory safety bugs because less memory was allocated than expected. Accidentally this fix restricted the allowed values for the maximum window size too much. There is a PR for fixing this in the C implementation.

On the Rust side this bug also existed because I simply ported over the checks. If I hadn’t ported over the checks, or ported an earlier version without the checks, there fortunately wouldn’t have been any memory safety bug on the Rust side though but instead one of two situations would have happened instead

  1. In debug builds integer overflows cause a panic, so instead of allocating less memory than expected during the setting of the parameters there would’ve been a panic immediately instead of invalid memory accesses later.
  2. In release builds integer overflows simply wrap around for performance reasons. This would’ve caused less memory than expected to be allocated, but later when trying to access the memory there would’ve been a panic when trying to access memory outside the allocated area.

While a panic is also not nice, it at least leads to no undefined behaviour and prevents worse things from happening.

The proper fix in this case was to not restrict the maximum window size statically but to instead check for overflows during the calculations. This is the same the PR for the C implementation does, but on the Rust side this is much easier because of built-in operations like checked_mul for doing an overflow-checking multiplication. In C this requires some rather convoluted code (check the PR for details).

Support for Planar Audio Input

The last additional feature that I implemented was support for planar audio input, for which also a PR to the C implementation exists already.

Most of the time audio signals have the samples of each channel interleaved with each other, so for example for stereo you have an array of samples with the first sample for the left channel, the first sample for the right channel, the second sample for the left channel, etc.. While this representation has some advantages, in other situations it is easier or faster to work with planar audio: the samples of each channel are contiguous one after another, so you have e.g. first all the samples of the left channel one after another and only then all samples of the right channel.

The PR for the C implementation does this with some code duplication of existing macro code (which can be prevented by making the macros more complicated), on the Rust side I implemented this without any code duplication by adding an internal abstraction for interleaved/planar audio and iterating over the samples and then working with that in normal, generic Rust code. This required some minor refactoring and code reorganization but in the end was rather painless. Note that most of the change is addition of new tests and moving some code around.

When looking at the Samples trait, the main part of this refactoring, one might wonder why I used closures instead of Rust iterators for iterating over the samples and the reason is unfortunately performance. More on this in part 3 of this blog post.

Next Part

In the next part of this blog post I will describe the porting approach in detail and also give various examples for how to port C code to idiomatic Rust, and some examples of problems I was running into.

Automatic retry on error and fallback stream handling for GStreamer sources

A very common problem in GStreamer, especially when working with live network streams, is that the source might just fail at some point. Your own network might have problems, the source of the stream might have problems, …

Without any special handling of such situations, the default behaviour in GStreamer is to simply report an error and let the application worry about handling it. The application might for example want to restart the stream, or it might simply want to show an error to the user, or it might want to show a fallback stream instead, telling the user that the stream is currently not available and then seamlessly switch back to the stream once it comes back.

Implementing all of the aforementioned is quite some effort, especially to do it in a robust way. To make it easier for applications I implemented a new plugin called fallbackswitch that contains two elements to automate this.

It is part of the GStreamer Rust plugins and also included in the recent 0.6.0 release, which can also be found on the Rust package (“crate”) repository crates.io.

Installation

For using the plugin you most likely first need to compile it yourself, unless you’re lucky enough that e.g. your Linux distribution includes it already.

Compiling it requires a Rust toolchain and GStreamer 1.14 or newer. The former you can get via rustup for example, if you don’t have it yet, the latter either from your Linux distribution or by using the macOS, Windows, etc binaries that are provided by the GStreamer project. Once that is done, compiling is mostly a matter of running cargo build in the utils/fallbackswitch directory and copying the resulting libgstfallbackswitch.so (or .dll or .dylib) into one of the GStreamer plugin directories, for example ~/.local/share/gstreamer-1.0/plugins.

fallbackswitch

The first of the two elements is fallbackswitch. It acts as a filter that can be placed into any kind of live stream. It consumes one main stream (which must be live) and outputs this stream as-is if everything works well. Based on the timeout property it detects if this main stream didn’t have any activity for the configured amount of time, or everything arrived too late for that long, and then seamlessly switches to a fallback stream. The fallback stream is the second input of the element and does not have to be live (but it can be).

Switching between main stream and fallback stream doesn’t only work for raw audio and video streams but also works for compressed formats. The element will take constraints like keyframes into account when switching, and if necessary/possible also request new keyframes from the sources.

For example to play the Sintel trailer over the network and displaying a test pattern if it doesn’t produce any data, the following pipeline can be constructed:

gst-launch-1.0 souphttpsrc location=https://www.freedesktop.org/software/gstreamer-sdk/data/media/sintel_trailer-480p.webm ! \
    decodebin ! identity sync=true ! fallbackswitch name=s ! videoconvert ! autovideosink \
    videotestsrc ! s.fallback_sink

Note the identity sync=true in the main stream here as we have to convert it to an actual live stream.

Now when running the above command and disconnecting from the network, the video should freeze at some point and after 5 seconds a test pattern should be displayed.

However, when using fallbackswitch the application will still have to take care of handling actual errors from the main source and possibly restarting it. Waiting a bit longer after disconnecting the network with the above command will report an error, which then stops the pipeline.

To make that part easier there is the second element.

fallbacksrc

The second element is fallbacksrc and as the name suggests it is an actual source element. When using it, the main source can be configured via an URI or by providing a custom source element. Internally it then takes care of buffering the source, converting non-live streams into live streams and restarting the source transparently on errors. The various timeouts for this can be configured via properties.

Different to fallbackswitch it also handles audio and video at the same time and demuxes/decodes the streams.

Currently the only fallback streams that can be configured are still images for video. For audio the element will always output silence for now, and if no fallback image is configured for video it outputs black instead. In the future I would like to add support for arbitrary fallback streams, which hopefully shouldn’t be too hard. The basic infrastructure for it is already there.

To use it again in our previous example and having a JPEG image displayed whenever the source does not produce any new data, the following can be done:

gst-launch-1.0 fallbacksrc uri=https://www.freedesktop.org/software/gstreamer-sdk/data/media/sintel_trailer-480p.webm \
    fallback-uri=file:///path/to/some/jpg ! videoconvert ! autovideosink

Now when disconnecting the network, after a while (longer than before because fallbacksrc does additional buffering for non-live network streams) the fallback image should be shown. Different to before, waiting longer will not lead to an error and reconnecting the network causes the video to reappear. However as this is not an actual live-stream, right now playback would again start from the beginning. Seeking back to the previous position would be another potential feature that could be added in the future.

Overall these two elements should make it easier for applications to handle errors in live network sources. While the two elements are still relatively minimal feature-wise, they should already be usable in various real scenarios and are already used in production.

As usual, if you run into any problems or are missing some features, please create an issue in the GStreamer bug tracker.

GStreamer Rust Bindings & Plugins New Releases

It has been quite a while since the last status update for the GStreamer Rust bindings and the GStreamer Rust plugins, so the new releases last week make for a good opportunity to do so now.

Bindings

I won’t write too much about the bindings this time. The latest version as of now is 0.16.1, which means that since I started working on the bindings there were 8 major releases. In that same time there were 45 contributors working on the bindings, which seems quite a lot and really makes me happy.

Just as before, I don’t think any major APIs are missing from the bindings anymore, even for implementing subclasses of the various GStreamer types. The wide usage of the bindings in Free Software projects and commercial products also shows both the interest in writing GStreamer applications and plugins in Rust as well as that the bindings are complete enough and production-ready.

Most of the changes since the last status update involve API cleanups, usability improvements, various bugfixes and addition of minor API that was not included before. The details of all changes can be read in the changelog.

The bindings work with any GStreamer version since 1.8 (released more than 4 years ago), support APIs up to GStreamer 1.18 (to be released soon) and work with Rust 1.40 or newer.

Plugins

The biggest progress probably happened with the GStreamer Rust plugins.

There also was a new release last week, 0.6.0, which was the first release where selected plugins were also uploaded to the Rust package (“crate”) database crates.io. This makes it easy for Rust applications to embed any of these plugins statically instead of depending on them to be available on the system.

Overall there are now 40 GStreamer elements in 18 plugins by 28 contributors available as part of the gst-plugins-rs repository, one tutorial plugin with 4 elements and various plugins in external locations.

These 40 GStreamer elements are the following:

Audio
  • rsaudioecho: Port of the audioecho element from gst-plugins-good
  • rsaudioloudnorm: Live audio loudness normalization element based on the FFmpeg af_loudnorm filter
  • claxondec: FLAC lossless audio codec decoder element based on the pure-Rust claxon implementation
  • csoundfilter: Audio filter that can use any filter defined via the Csound audio programming language
  • lewtondec: Vorbis audio decoder element based on the pure-Rust lewton implementation
Video
  • cdgdec/cdgparse: Decoder and parser for the CD+G video codec based on a pure-Rust CD+G implementation, used for example by karaoke CDs
  • cea608overlay: CEA-608 Closed Captions overlay element
  • cea608tott: CEA-608 Closed Captions to timed-text (e.g. VTT or SRT subtitles) converter
  • tttocea608: CEA-608 Closed Captions from timed-text converter
  • mccenc/mccparse: MacCaption Closed Caption format encoder and parser
  • sccenc/sccparse: Scenarist Closed Caption format encoder and parser
  • dav1dec: AV1 video decoder based on the dav1d decoder implementation by the VLC project
  • rav1enc: AV1 video encoder based on the fast and pure-Rust rav1e encoder implementation
  • rsflvdemux: Alternative to the flvdemux FLV demuxer element from gst-plugins-good, not feature-equivalent yet
  • rsgifenc/rspngenc: GIF/PNG encoder elements based on the pure-Rust implementations by the image-rs project
Text
  • textwrap: Element for line-wrapping timed text (e.g. subtitles) for better screen-fitting, including hyphenation support for some languages
Network
  • reqwesthttpsrc: HTTP(S) source element based on the Rust reqwest/hyper HTTP implementations and almost feature-equivalent with the main GStreamer HTTP source souphttpsrc
  • s3src/s3sink: Source/sink element for the Amazon S3 cloud storage
  • awstranscriber: Live audio to timed text transcription element using the Amazon AWS Transcribe API
Generic
  • sodiumencrypter/sodiumdecrypter: Encryption/decryption element based on libsodium/NaCl
  • togglerecord: Recording element that allows to pause/resume recordings easily and considers keyframe boundaries
  • fallbackswitch/fallbacksrc: Elements for handling potentially failing (network) sources, restarting them on errors/timeout and showing a fallback stream instead
  • threadshare: Set of elements that provide alternatives for various existing GStreamer elements but allow to share the streaming threads between each other to reduce the number of threads
  • rsfilesrc/rsfilesink: File source/sink elements as replacements for the existing filesrc/filesink elements

Live loudness normalization in GStreamer & experiences with porting a C audio filter to Rust

A few months ago I wrote a new GStreamer plugin: an audio filter for live loudness normalization and automatic gain control.

The plugin can be found as part of the GStreamer Rust plugin in the audiofx plugin. It’s also included in the recent 0.6.0 release of the GStreamer Rust plugins and available from crates.io.

Its code is based on Kyle Swanson’s great FFmpeg filter af_loudnorm, about which he wrote some more technical details on his blog a few years back. I’m not going to repeat all that here, if you’re interested in those details and further links please read Kyle’s blog post.

From a very high-level, the filter works by measuring the loudness of the input following the EBU R128 standard with a 3s lookahead, adjusts the gain to reach the target loudness and then applies a true peak limiter with 10ms to prevent any too high peaks to get passed through. Both the target loudness and the maximum peak can be configured via the loudness-target and max-true-peak properties, same as in the FFmpeg filter. Different to the FFmpeg filter I only implemented the “live” mode and not the two-pass mode that is implemented in FFmpeg, which first measures the loudness of the whole stream and then in a second pass adjusts it.

Below I’ll describe the usage of the filter in GStreamer a bit and also some information about the development process, and the porting of the C code to Rust.

Usage

For using the filter you most likely first need to compile it yourself, unless you’re lucky enough that e.g. your Linux distribution includes it already.

Compiling it requires a Rust toolchain and GStreamer 1.8 or newer. The former you can get via rustup for example, if you don’t have it yet, the latter either from your Linux distribution or by using the macOS, Windows, etc binaries that are provided by the GStreamer project. Once that is done, compiling is mostly a matter of running cargo build in the audio/audiofx directory and copying the resulting libgstrsaudiofx.so (or .dll or .dylib) into one of the GStreamer plugin directories, for example ~/.local/share/gstreamer-1.0/plugins.

After that boring part is done, you can use it for example as follows to run loudness normalization on the Sintel trailer:

gst-launch-1.0 playbin \
    uri=https://www.freedesktop.org/software/gstreamer-sdk/data/media/sintel_trailer-480p.webm \
    audio-filter="audioresample ! rsaudioloudnorm ! audioresample ! capsfilter caps=audio/x-raw,rate=48000"

As can be seen above, it is necessary to put audioresample elements around the filter. The reason for that is that the filter currently only works on 192kHz input. This is a simplification for now to make it easier inside the filter to detect true peaks. You would first upsample your audio to 192kHz and then, if needed, later downsample it again to your target sample rate (48kHz in the example above). See the link mentioned before for details about true peaks and why this is generally a good idea to do. In the future the resampling could be implemented internally and maybe optionally the filter could also work with “normal” peak detection on the non-upsampled input.

Apart from that caveat the filter element works like any other GStreamer audio filter and can be placed accordingly in any GStreamer pipeline.

If you run into any problems using the code or it doesn’t work well for your use-case, please create an issue in the GStreamer bugtracker.

The process

As I wrote above, the GStreamer plugin is part of the GStreamer Rust plugins so the first step was to port the FFmpeg C code to Rust. I expected that to be the biggest part of the work, but as writing Rust is simply so much more enjoyable than writing C and I would have to adjust big parts of the code to fit the GStreamer infrastructure anyway, I took this approach nonetheless. The alternative of working based on the C code and writing the plugin in C didn’t seem very appealing to me. In the end, as usual when developing in Rust, this also allowed me to be more confident about the robustness of the result and probably reduced the amount of time spent debugging. Surprisingly, the translation was actually not the biggest part of the work, but instead I had to debug a couple of issues that were already present in the original FFmpeg code and find solutions for them. But more on that later.

The first step for porting the code was to get an implementation of the EBU R128 loudness analysis. In FFmpeg they’re using a fork of the libebur128 C library. I checked if there was anything similar for Rust already, maybe even a pure-Rust implementation of it, but couldn’t find anything. As I didn’t want to write one myself or port the code of the libebur128 C library to Rust, I wrote safe Rust bindings for that library instead. The end result of that can be found on crates.io as an independent crate, in case someone else also needs it for other purposes at some point. The crate also includes the code of the C library, making it as easy as possible to build and include into other projects.

The next step was to actually port the FFmpeg C code to Rust. In the end that was a rather straightforward translation fortunately. The latest version of that code can be found here.

The biggest difference to the C code is the usage of Rust iterators and iterator combinators like zip and chunks_exact. In my opinion this makes the code quite a bit easier to read compared to the manual iteration in the C code together with array indexing, and as a side effect it should also make the code run faster in Rust as it allows to get rid of a lot of array bounds checks.

Apart from that, one part that was a bit inconvenient during that translation and still required manual array indexing is the usage of ringbuffers everywhere in the code. For now I wrote those like I would in C and used a few unsafe operations like get_unchecked to avoid redundant bounds checks, but at a later time I might refactor this into a proper ringbuffer abstraction for such audio processing use-cases. It’s not going to be the last time I need such a data structure. A short search on crates.io gave various results for ringbuffers but none of them seem to provide an API that fits the use-case here. Once that’s abstracted away into a nice data structure, I believe the Rust code of this filter is really nice to read and follow.

Now to the less pleasant parts, and also a small warning to all the people asking for Rust rewrites of everything: of course I introduced a couple of new bugs while translating the code although this was a rather straightforward translation and I tried to be very careful. I’m sure there is also still a bug or two left that I didn’t find while debugging. So always keep in mind that rewriting a project will also involve adding new bugs that didn’t exist in the original code. Or maybe you’re just a better programmer than me and don’t make such mistakes.

Debugging these issues that showed up while testing the code was a good opportunity to also add extensive code comments everywhere so I don’t have to remind myself every time again what this block of code is doing exactly, and it’s something I was missing a bit from the FFmpeg code (it doesn’t have a single comment currently). While writing those comments and explaining the code to myself, I found the majority of these bugs that I introduced and as a side-effect I now have documentation for my future self or other readers of the code.

Fixing these issues I introduced myself wasn’t that time-consuming neither in the end fortunately, but while writing those code comments and also while doing more testing on various audio streams, I found a couple of bugs that already existed in the original FFmpeg C code. Further testing also showed that they caused quite audible distortions on various test streams. These are the bugs that unfortunately took most of the time in the whole process, but at least to my knowledge there are no known bugs left in the code now.

For these bugs in the FFmpeg code I also provided a fix that is merged already, and reported the other two in their bug tracker.

The first one I’d be happy to provide a fix for if my approach is considered correct, but the second one I’ll leave for someone else. Porting over my Rust solution for that one will take some time and getting all the array indexing involved correct in C would require some serious focusing, for which I currently don’t have the time.

Or maybe my solutions to these problems are actually wrong, or my understanding of the original code was wrong and I actually introduced them in my translation, which also would be useful to know.

Overall, while porting the C code to Rust introduced a few new problems that had to be fixed, I would definitely do this again for similar projects in the future. It’s more fun to write and in my opinion the resulting code is easier readable, and better to maintain and extend.

The GTK Rust bindings are not ready yet? Yes they are!

When talking to various people at conferences in the last year or at conferences, a recurring topic was that they believed that the GTK Rust bindings are not ready for use yet.

I don’t know where that perception comes from but if it was true, there wouldn’t have been applications like Fractal, Podcasts or Shortwave using GTK from Rust, or I wouldn’t be able to do a workshop about desktop application development in Rust with GTK and GStreamer at the Linux Application Summit in Barcelona this Friday (code can be found here already) or earlier this year at GUADEC.

One reason I sometimes hear is that there is not support for creating subclasses of GTK types in Rust yet. While that was true, it is not true anymore nowadays. But even more important: unless you want to create your own special widgets, you don’t need that. Many examples and tutorials in other languages make use of inheritance/subclassing for the applications’ architecture, but that’s because it is the idiomatic pattern in those languages. However, in Rust other patterns are more idiomatic and even for those examples and tutorials in other languages it wouldn’t be the one and only option to design applications.

Almost everything is included in the bindings at this point, so seriously consider writing your next GTK UI application in Rust. While some minor features are still missing from the bindings, none of those should prevent you from successfully writing your application.

And if something is actually missing for your use-case or something is not working as expected, please let us know. We’d be happy to make your life easier!

P.S.

Some people are already experimenting with new UI development patterns on top of the GTK Rust bindings. So if you want to try developing an UI application but want to try something different than the usual signal/callback spaghetti code, also take a look at those.

MPSC Channel API for painless usage of threads with GTK in Rust

A very common question that comes up on IRC or elsewhere by people trying to use the gtk-rs GTK bindings in Rust is how to modify UI state, or more specifically GTK widgets, from another thread.

Due to GTK only allowing access to its UI state from the main thread and Rust actually enforcing this, unlike other languages, this is less trivial than one might expect. To make this as painless as possible, while also encouraging a more robust threading architecture based on message-passing instead of shared state, I’ve added some new API to the glib-rs bindings: An MPSC (multi-producer/single-consumer) channel very similar to (and based on) the one in the standard library but integrated with the GLib/GTK main loop.

While I’ll mostly write about this in the context of GTK here, this can also be useful in other cases when working with a GLib main loop/context from Rust to have a more structured means of communication between different threads than shared mutable state.

This will be part of the next release and you can find some example code making use of this at the very end. But first I’ll take this opportunity to also explain why it’s not so trivial in Rust first and also explain another solution.

Table of Contents

  1. The Problem
  2. One Solution: Safely working around the type system
  3. A better solution: Message passing via channels

The Problem

Let’s consider the example of an application that has to perform a complicated operation and would like to do this from another thread (as it should to not block the UI!) and in the end report back the result to the user. For demonstration purposes let’s take a thread that simply sleeps for a while and then wants to update a label in the UI with a new value.

Naively we might start with code like the following

let label = gtk::Label::new("not finished");
[...]
// Clone the label so we can also have it available in our thread.
// Note that this behaves like an Rc and only increases the
// reference count.
let label_clone = label.clone();
thread::spawn(move || {
    // Let's sleep for 10s
    thread::sleep(time::Duration::from_secs(10));

    label_clone.set_text("finished");
});

This does not compile and the compiler tells us (between a wall of text containing all the details) that the label simply can’t be sent safely between threads. Which is absolutely correct.

error[E0277]: {{EJS155}} cannot be sent between threads safely
  --> src/main.rs:28:5
   |
28 |     thread::spawn(move || {
   |     ^^^^^^^^^^^^^ {{EJS156}} cannot be sent between threads safely
   |
   = help: within {{EJS157}}, the trait {{EJS158}} is not implemented for {{EJS159}}
   = note: required because it appears within the type {{EJS160}}
   = note: required because it appears within the type {{EJS161}}
   = note: required because it appears within the type {{EJS162}}
   = note: required because it appears within the type {{EJS163}}
   = note: required by {{EJS164}}

In, e.g. C, this would not be a problem at all, the compiler does not know about GTK widgets and generally all GTK API to be only safely usable from the main thread, and would happily compile the above. It would the our (the programmer’s) job then to ensure that nothing is ever done with the widget from the other thread, other than passing it around. Among other things, it must also not be destroyed from that other thread (i.e. it must never have the last reference to it and then drop it).

One Solution: Safely working around the type system

So why don’t we do the same as we would do in C and simply pass around raw pointers to the label and do all the memory management ourselves? Well, that would defeat one of the purposes of using Rust and would require quite some unsafe code.

We can do better than that and work around Rust’s type system with regards to thread-safety and instead let the relevant checks (are we only ever using the label from the main thread?) be done at runtime instead. This allows for completely safe code, it might just panic at any time if we accidentally try to do something from wrong thread (like calling a function on it, or dropping it) and not just pass the label around.

The fragile crate provides a type called Fragile for exactly this purpose. It’s a wrapper type like Box, RefCell, Rc, etc. but it allows for any contained type to be safely sent between threads and on access does runtime checks if this is done correctly. In our example this would look like this

let label = gtk::Label::new("not finished");
[...]
// We wrap the label clone in the Fragile type here
// and move that into the new thread instead.
let label_clone = fragile::Fragile::new(label.clone());
thread::spawn(move || {
    // Let's sleep for 10s
    thread::sleep(time::Duration::from_secs(10));

    // To access the contained value, get() has
    // to be called and this is where the runtime
    // checks are happening
    label_clone.get().set_text("finished");
});

Not many changes to the code and it compiles… but at runtime we of course get a panic because we’re accessing the label from the wrong thread

thread '<unnamed>' panicked at 'trying to access wrapped value in fragile container from incorrect thread.', ~/.cargo/registry/src/github.com-1ecc6299db9ec823/fragile-0.3.0/src/fragile.rs:57:13

What we instead need to do here is to somehow defer the change of the label to the main thread, and GLib provides various API for doing exactly that. We’ll make use of the first one here but it’s mostly a matter of taste (and trait bounds: the former takes a FnOnce closure while the latter can be called multiple times and only takes FnMut because of that).

let label = gtk::Label::new("not finished");
[...]
// We wrap the label clone in the Fragile type here
// and move that into the new thread instead.
let label_clone = fragile::Fragile::new(label.clone());
thread::spawn(move || {
    // Let's sleep for 10s
    thread::sleep(time::Duration::from_secs(10));

    // Defer the label update to the main thread.
    // For this we get the default main context,
    // the one used by GTK on the main thread,
    // and use invoke() on it. The closure also
    // takes ownership of the label_clone and drops
    // it at the end. From the correct thread!
    glib::MainContext::default().invoke(move || {
        label_clone.get().set_text("finished");
    });
});

So far so good, this compiles and actually works too. But it feels kind of fragile, and that’s not only because of the name of the crate we use here. The label passed around in different threads is like a landmine only waiting to explode when we use it in the wrong way.

It’s also not very nice because now we conceptually share mutable state between different threads, which is the underlying cause for many thread-safety issues and generally increases complexity of the software considerable.

Let’s try to do better, Rust is all about fearless concurrency after all.

A better solution: Message passing via channels

As the title of this post probably made clear, the better solution is to use channels to do message passing. That’s also a pattern that is generally preferred in many other languages that focus a lot on concurrency, ranging from Erlang to Go, and is also the the recommended way of doing this according to the Rust Book.

So how would this look like? We first of all would have to create a Channel for communicating with our main thread.

As the main thread is running a GLib main loop with its corresponding main context (the loop is the thing that actually is… a loop, and the context is what keeps track of all potential event sources the loop has to handle), we can’t make use of the standard library’s MPSC channel. The Receiver blocks or we would have to poll in intervals, which is rather inefficient.

The futures MPSC channel doesn’t have this problem but requires a futures executor to run on the thread where we want to handle the messages. While the GLib main context also implements a futures executor and we could actually use it, this would pull in the futures crate and all its dependencies and might seem like too much if we only ever use it for message passing anyway. Otherwise, if you use futures also for other parts of your code, go ahead and use the futures MPSC channel instead. It basically works the same as what follows.

For creating a GLib main context channel, there are two functions available: glib::MainContext::channel() and glib::MainContext::sync_channel(). The latter takes a bound for the channel, after which sending to the Sender part will block until there is space in the channel again. Both are returning a tuple containing the Sender and Receiver for this channel, and especially the Sender is working exactly like the one from the standard library. It can be cloned, sent to different threads (as long as the message type of the channel can be) and provides basically the same API.

The Receiver works a bit different, and closer to the for_each() combinator on the futures Receiver. It provides an attach() function that attaches it to a specific main context, and takes a closure that is called from that main context whenever an item is available.

The other part that we need to define on our side then is how the messages should look like that we send through the channel. Usually some kind of enum with all the different kinds of messages you want to handle is a good choice, in our case it could also simply be () as we only have a single kind of message and without payload. But to make it more interesting, let’s add the new string of the label as payload to our messages.

This is how it could look like for example

enum Message {
    UpdateLabel(String),
}
[...]
let label = gtk::Label::new("not finished");
[...]
// Create a new sender/receiver pair with default priority
let (sender, receiver) = glib::MainContext::channel(glib::PRIORITY_DEFAULT);

// Spawn the thread and move the sender in there
thread::spawn(move || {
    thread::sleep(time::Duration::from_secs(10));

    // Sending fails if the receiver is closed
    let _ = sender.send(Message::UpdateLabel(String::from("finished")));
});

// Attach the receiver to the default main context (None)
// and on every message update the label accordingly.
let label_clone = label.clone();
receiver.attach(None, move |msg| {
    match msg {
        Message::UpdateLabel(text) => label_clone.set_text(text.as_str()),
    }

    // Returning false here would close the receiver
    // and have senders fail
    glib::Continue(true)
});

While this is a bit more code than the previous solution, it will also be more easy to maintain and generally allows for clearer code.

We keep all our GTK widgets inside the main thread now, threads only get access to a sender over which they can send messages to the main thread and the main thread handles these messages in whatever way it wants. There is no shared mutable state between the different threads here anymore, apart from the channel itself.

GStreamer Rust bindings 0.12 and GStreamer Plugin 0.3 release

After almost 6 months, a new release of the GStreamer Rust bindings and the GStreamer plugin writing infrastructure for Rust is out. As usual this was coinciding with the release of all the gtk-rs crates to make use of all the new features they contain.

Thanks to all the contributors of both gtk-rs and the GStreamer bindings for all the nice changes that happened over the last 6 months!

And as usual, if you find any bugs please report them and if you have any questions let me know.

GStreamer Bindings

For the full changelog check here.

Most changes this time were internally, especially because many user-facing changes (like Debug impls for various types) were already backported to the minor releases in the 0.11 release series.

WebRTC

The biggest change this time is probably the inclusion of bindings for the GStreamer WebRTC library.

This allows using building all kinds of WebRTC applications outside the browser (or providing a WebRTC implementation for a browser), and while not as full-featured as Google’s own implementation, this interoperates well with the various browsers and generally works much better on embedded devices.

A small example application in Rust is available here.

Serde

Optionally, serde trait implementations for the Serialize and Deserialize trait can be enabled for various fundamental GStreamer types, including caps, buffers, events, messages and tag lists. This allows serializing them into any format that can be handled by serde (which are many!), and deserializing them back to normal Rust structs.

Generic Tag API

Previously only a strongly-typed tag API was exposed that made it impossible to use the wrong data type for a specific tag, e.g. code that tries to store a string for the track number or an integer for the title would simply not compile:

let mut tags = gst::TagList::new();
{
    let tags = tags.get_mut().unwrap();
    tags.add::<Title>(&"some title", gst::TagMergeMode::Append);
    tags.add::<TrackNumber>(&12, gst::TagMergeMode::Append);
}

While this is convenient, it made it rather complicated to work with tag lists if you only wanted to handle them in a generic way. For example by iterating over the tag list and simply checking what kind of tags are available. To solve that, a new generic API was added in addition. This works on glib::Values, which can store any kind of type, and using the wrong type for a specific tag would simply cause an error at runtime instead of compile-time.

let mut tags = gst::TagList::new();
{
    let tags = tags.get_mut().unwrap();
    tags.add_generic(&gst::tags::TAG_TITLE, &"some title", gst::TagMergeMode::Append)
.expect("wrong type for title tag");
    tags.add_generic(&gst::tags::TAG_TRACK_NUMBER, &12, gst::TagMergeMode::Append)
.expect("wrong type for track number tag");
}

This also greatly simplified the serde serialization/deserialization for tag lists.

GStreamer Plugins

For the full changelog check here.

gobject-subclass

The main change this time is that all the generic GObject subclassing infrastructure was moved out of the gst-plugin crate and moved to its own gobject-subclass crate as part of the gtk-rs organization.

As part of this, some major refactoring has happened that allows subclassing more different types but also makes it simpler to add new types. There are also experimental crates for adding some subclassing support to gio and gtk, and a PR for autogenerating part of the code via the gir code generator.

More classes!

The other big addition this time is that it’s now possible to subclass GStreamer Pads and GhostPads, to implement the ChildProxy interface and to subclass the Aggregator and AggregatorPad class.

This now allows to write custom mixer/muxer-style elements (or generally elements that have multiple sink pads) in Rust via the Aggregator base class, and to have custom pad types for elements to allow for setting custom properties on the pads (e.g. to control the opacity of a single video mixer input).

There is currently no example for such an element, but I’ll add a very simple video mixer to the repository some time in the next weeks and will also write a blog post about it for explaining all the steps.

GLib/GIO async operations and Rust futures + async/await

Unfortunately I was not able to attend the Rust+GNOME hackfest in Madrid last week, but I could at least spend some of my work time at Centricular on implementing one of the things I wanted to work on during the hackfest. The other one, more closely related to the gnome-class work, will be the topic of a future blog post once I actually have something to show.

So back to the topic. With the latest GIT version of the Rust bindings for GLib, GTK, etc it is now possible to make use of the Rust futures infrastructure for GIO async operations and various other functions. This should make writing of GNOME, and in general GLib-using, applications in Rust quite a bit more convenient.

For the impatient, the summary is that you can use Rust futures with GLib and GIO now, that it works both on the stable and nightly version of the compiler, and with the nightly version of the compiler it is also possible to use async/await. An example with the latter can be found here, and an example just using futures without async/await here.

Table of Contents

  1. Futures
    1. Futures in Rust
    2. Async/Await
    3. Tokio
  2. Futures & GLib/GIO
    1. Callbacks
    2. GLib Futures
    3. GIO Asynchronous Operations
    4. Async/Await
  3. The Future

Futures

First of all, what are futures and how do they work in Rust. In a few words, a future (also called promise elsewhere) is a value that represents the result of an asynchronous operation, e.g. establishing a TCP connection. The operation itself (usually) runs in the background, and only once the operation is finished (or fails), the future resolves to the result of that operation. There are all kinds of ways to combine futures, e.g. to execute some other (potentially async) code with the result once the first operation has finished.

It’s a concept that is also widely used in various other programming languages (e.g. C#, JavaScript, Python, …) for asynchronous programming and can probably be considered a proven concept at this point.

Futures in Rust

In Rust, a future is basically an implementation of relatively simple trait called Future. The following is the definition as of now, but there are discussions to change/simplify/generalize it currently and to also move it to the Rust standard library:

pub trait Future {
    type Item;
    type Error;

    fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error>;
}

Anything that implements this trait can be considered an asynchronous operation that resolves to either an Item or an Error. Consumers of the future would call the poll method to check if the future has resolved already (to a result or error), or if the future is not ready yet. In case of the latter, the future itself would at a later point, once it is ready to proceed, notify the consumer about that. It would get a way for notifications from the Context that is passed, and proceeding does not necessarily mean that the future will resolve after this but it could just advance its internal state closer to the final resolution.

Calling poll manually is kind of inconvenient, so generally this is handled by an Executor on which the futures are scheduled and which is running them until their resolution. Equally, it’s inconvenient to have to implement that trait directly so for most common operations there are combinators that can be used on futures to build new futures, usually via closures in one way or another. For example the following would run the passed closure with the successful result of the future, and then have it return another future (Ok(()) is converted via IntoFuture to the future that always resolves successfully with ()), and also maps any errors to ()

fn our_future() -> impl Future<Item = (), Err = ()> {
    some_future
        .and_then(|res| {
            do_something(res);
            Ok(())
        })
        .map_err(|_| ())
}

A future represents only a single value, but there is also a trait for something producing multiple values: a Stream. For more details, best to check the documentation.

Async/Await

The above way of combining futures via combinators and closures is still not too great, and is still close to callback hell. In other languages (e.g. C#, JavaScript, Python, …) this was solved by introducing new features to the language: async for declaring futures with normal code flow, and await for suspending execution transparently and resuming at that point in the code with the result of a future.

Of course this was also implemented in Rust. Currently based on procedural macros, but there are discussions to actually move this also directly into the language and standard library.

The above example would look something like the following with the current version of the macros

#[async]
fn our_future() -> Result<(), ()> {
    let res = await!(some_future)
        .map_err(|_| ())?;

    do_something(res);
    Ok(())
}

This looks almost like normal, synchronous code but is internally converted into a future and completely asynchronous.

Unfortunately this is currently only available on the nightly version of Rust until various bits and pieces get stabilized.

Tokio

Most of the time when people talk about futures in Rust, they implicitly also mean Tokio. Tokio is a pure Rust, cross-platform asynchronous IO library and based on the futures abstraction above. It provides a futures executor and various types for asynchronous IO, e.g. sockets and socket streams.

But while Tokio is a great library, we’re not going to use it here and instead implement a futures executor around GLib. And on top of that implement various futures, also around GLib’s sister library GIO, which is providing lots of API for synchronous and asynchronous IO.

Just like all IO operations in Tokio, all GLib/GIO asynchronous operations are dependent on running with their respective event loop (i.e. the futures executor) and while it’s possible to use both in the same process, each operation has to be scheduled on the correct one.

Futures & GLib/GIO

Asynchronous operations and generally everything event related (timeouts, …) are based on callbacks that you have to register, and are running via a GMainLoop that is executing events from a GMainContext. The latter is just something that stores everything that is scheduled and provides API for polling if something is ready to be executed now, while the former does exactly that: executing.

Callbacks

The callback based API is also available via the Rust bindings, and would for example look as follows

glib::timeout_add(20, || {
    do_something_after_20ms();
    glib::Continue(false) // don't call again
});

glib::idle_add(|| {
    do_something_from_the_main_loop();
    glib::Continue(false) // don't call again
});

some_async_operation(|res| {
    match res {
        Err(err) => report_error_somehow(),
        Ok(res) => {
            do_something_with_result(res);
            some_other_async_operation(|res| {
                do_something_with_other_result(res);
            });
        }
    }
});

As can be seen here already, the callback-based approach leads to quite non-linear code and deep indentation due to all the closures. Also error handling becomes quite tricky due to somehow having handle them from a completely different call stack.

Compared to C this is still far more convenient due to actually having closures that can capture their environment, but we can definitely do better in Rust.

The above code also assumes that somewhere a main loop is running on the default main context, which could be achieved with the following e.g. inside main()

let ctx = glib::MainContext::default();
let l = glib::MainLoop::new(Some(&ctx), false);
ctx.push_thread_default();

// All operations here would be scheduled on this main context
do_things(&l);

// Run everything until someone calls l.quit()
l.run();
ctx.pop_thread_default();

It is also possible to explicitly select for various operations on which main context they should run, but that’s just a minor detail.

GLib Futures

To make this situation a bit nicer, I’ve implemented support for futures in the Rust bindings. This means, that the GLib MainContext is now a futures executor (and arbitrary futures can be scheduled on it), all the GSource related operations in GLib (timeouts, UNIX signals, …) have futures- or stream-based variants and all the GIO asynchronous operations also come with futures variants now. The latter are autogenerated with the gir bindings code generator.

For enabling usage of this, the futures feature of the glib and gio crates have to be enabled, but that’s about it. It is currently still hidden behind a feature gate because the futures infrastructure is still going to go through some API incompatible changes in the near future.

So let’s take a look at how to use it. First of all, setting up the main context and executing a trivial future on it

let c = glib::MainContext::default();
let l = glib::MainLoop::new(Some(&c), false);

c.push_thread_default();

// Spawn a future that is called from the main context
// and after printing something just quits the main loop
let l_clone = l.clone();
c.spawn(futures::lazy(move |_| {
    println!("we're called from the main context");
    l_clone.quit();
    Ok(())
});

l.run();

c.pop_thread_default();

Apart from spawn(), there is also a spawn_local(). The former can be called from any thread but requires the future to implement the Send trait (that is, it must be safe to send it to other threads) while the latter can only be called from the thread that owns the main context but it allows any kind of future to be spawned. In addition there is also a block_on() function on the main context, which allows to run non-static futures up to their completion and returns their result. The spawn functions only work with static futures (i.e. they have no references to any stack frame) and requires the futures to be infallible and resolve to ().

The above code already showed one of the advantages of using futures: it is possible to use all generic futures (that don’t require a specific executor), like futures::lazy or the mpsc/oneshot channels with GLib now. And any of the combinators that are available on futures

let c = MainContext::new();
                                                                                                       
let res = c.block_on(timeout_future(20)
    .and_then(move |_| {
        // Called after 20ms
        Ok(1)
    })
);

assert_eq!(res, Ok(1));

This example also shows the block_on functionality to return an actual value from the future (1 in this case).

GIO Asynchronous Operations

Similarly, all asynchronous GIO operations are now available as futures. For example to open a file asynchronously and getting a gio::InputStream to read from, the following could be done

let file = gio::File::new_for_path("Cargo.toml");

let l_clone = l.clone();
c.spawn_local(
    // Try to open the file
    file.read_async_future(glib::PRIORITY_DEFAULT)
        .map_err(|(_file, err)| {
            format!("Failed to open file: {}", err)
        })
        .and_then(move |(_file, strm)| {
            // Here we could now read from the stream, but
            // instead we just quit the main loop
            l_clone.quit();

            Ok(())
        })
);

A bigger example can be found in the gtk-rs examples repository here. This example is basically reading a file asynchronously in 64 byte chunks and printing it to stdout, then closing the file.

In the same way, network operations or any other asynchronous operation can be handled via futures now.

Async/Await

Compared to a callback-based approach, that bigger example is already a lot nicer but still quite heavy to read. With the async/await extension that I mentioned above already, the code looks much nicer in comparison and really almost like synchronous code. Except that it is not synchronous.

#[async]
fn read_file(file: gio::File) -> Result<(), String> {
    // Try to open the file
    let (_file, strm) = await!(file.read_async_future(glib::PRIORITY_DEFAULT))
        .map_err(|(_file, err)| format!("Failed to open file: {}", err))?;

    Ok(())
}

fn main() {
    [...]
    let future = async_block! {
        match await!(read_file(file)) {
            Ok(()) => (),
            Err(err) => eprintln!("Got error: {}", err),
        }
        l_clone.quit();
        Ok(())
    };

    c.spawn_local(future);
    [...]
}

For compiling this code, the futures-nightly feature has to be enabled for the glib crate, and a nightly compiler must be used.

The bigger example from before with async/await can be found here.

With this we’re already very close in Rust to having the same convenience as in other languages with asynchronous programming. And also it is very similar to what is possible in Vala with GIO asynchronous operations.

The Future

For now this is all finished and available from GIT of the glib and gio crates. This will have to be updated in the future whenever the futures API is changing, but it is planned to stabilize all this in Rust until the end of this year.

In the future it might also make sense to add futures variants for all the GObject signal handlers, so that e.g. handling a click on a GTK+ button could be done similarly from a future (or rather from a Stream as a signal can be emitted multiple times). If this is in the end more convenient than the callback-based approach that is currently used, is to be seen. Some experimentation would be necessary here. Also how to handle return values of signal handlers would have to be figured out.

Improving GStreamer performance on a high number of network streams by sharing threads between elements with Rust’s tokio crate

For one of our customers at Centricular we were working on a quite interesting project. Their use-case was basically to receive an as-high-as-possible number of audio RTP streams over UDP, transcode them, and then send them out via UDP again. Due to how GStreamer usually works, they were running into some performance issues.

This blog post will describe the first set of improvements that were implemented for this use-case, together with a minimal benchmark and the results. My colleague Mathieu will follow up with one or two other blog posts with the other improvements and a more full-featured benchmark.

The short version is that CPU usage decreased by about 65-75%, i.e. allowing 3-4x more streams with the same CPU usage. Also parallelization works better and usage of different CPU cores is more controllable, allowing for better scalability. And a fixed, but configurable number of threads is used, which is independent of the number of streams.

The code for this blog post can be found here.

Table of Contents

  1. GStreamer & Threads
  2. Thread-Sharing GStreamer Elements
  3. Available Elements
  4. Little Benchmark
  5. Conclusion

GStreamer & Threads

In GStreamer, by default each source is running from its own OS thread. Additionally, for receiving/sending RTP, there will be another thread in the RTP jitterbuffer, yet another thread for receiving RTCP (another source) and a last thread for sending RTCP at the right times. And RTCP has to be received and sent for the receiver and sender side part of the pipeline, so the number of threads doubles. In the sum this gives at least 1 + 1 + (1 + 1) * 2 = 6 threads per RTP stream in this scenario. In a normal audio scenario, there will be one packet received/sent e.g. every 20ms on each stream, and every now and then an RTCP packet. So most of the time all these threads are only waiting.

Apart from the obvious waste of OS resources (1000 streams would be 6000 threads), this also brings down performance as all the time threads are being woken up. This means that context switches have to happen basically all the time.

To solve this we implemented a mechanism to share threads, and in the end as a result we have a fixed, but configurable number of threads that is independent from the number of streams. And can run e.g. 500 streams just fine on a single thread with a single core, which was completely impossible before. In addition we also did some work to reduce the number of allocations for each packet, so that after startup no additional allocations happen per packet anymore for buffers. See Mathieu’s upcoming blog post for details.

In this blog post, I’m going to write about a generic mechanism for sources, queues and similar elements to share their threads between each other. For the RTP related bits (RTP jitterbuffer and RTCP timer) this was not used due to reuse of existing C codebases.

Thread-Sharing GStreamer Elements

The code in question can be found here, a small benchmark is in the examples directory and it is going to be used for the results later. A full-featured benchmark will come in Mathieu’s blog post.

This is a new GStreamer plugin, written in Rust and around the Tokio crate for asynchronous IO and generally a “task scheduler”.

While this could certainly also have been written in C around something like libuv, doing this kind of work in Rust is simply more productive and fun due to its safety guarantees and the strong type system, which definitely reduced the amount of debugging a lot. And in addition “modern” language features like closures, which make working with futures much more ergonomic.

When using these elements it is important to have full control over the pipeline and its elements, and the dataflow inside the pipeline has to be carefully considered to properly configure how to share threads. For example the following two restrictions should be kept in mind all the time:

  1. Downstream of such an element, the streaming thread must never ever block for considerable amounts of time. Otherwise all other elements inside the same thread-group would be blocked too, even if they could do any work now
  2. This generally all works better in live pipelines, where media is produced in real-time and not as fast as possible

Available Elements

So this repository currently contains the generic infrastructure (see the src/iocontext.rs source file) and a couple of elements:

  • an UDP source: ts-udpsrc, a replacement for udpsrc
  • an app source: ts-appsrc, a replacement for appsrc to inject packets into the pipeline from the application
  • a queue: ts-queue, a replacement for queue that is useful for adding buffering to a pipeline part. The upstream side of the queue will block if not called from another thread-sharing element, but if called from another thread-sharing element it will pause the current task asynchronously. That is, stop the upstream task from producing more data.
  • a proxysink/src element: ts-proxysrc, ts-proxysink, replacements for proxysink/proxysrc for connecting two pipelines with each other. This basically works like the queue, but split into two elements.
  • a tone generator source around spandsp: ts-tonesrc, a replacement for tonegeneratesrc. This also contains some minimal FFI bindings for that part of the spandsp C library.

All these elements have more or less the same API as their non-thread-sharing counterparts.

API-wise, each of these elements has a set of properties for controlling how it is sharing threads with other elements, and with which elements:

  • context: A string that defines in which group this element is. All elements with the same context are running on the same thread or group of threads,
  • context-threads: Number of threads to use in this context. -1 means exactly one thread, 1 and above used N+1 threads (1 thread for polling fds, N worker threads) and 0 sets N to the number of available CPU cores. As long as no considerable work is done in these threads, -1 has shown to be the most efficient. See also this tokio GitHub issue
  • context-wait: Number of milliseconds that the threads will wait on each iteration. This allows to reduce CPU usage even further by handling all events/packets that arrived during that timespan to be handled all at once instead of waking up the thread every time a little event happens, thus reducing context switches again

The elements are all pushing data downstream from a tokio thread whenever data is available, assuming that downstream does not block. If downstream is another thread-sharing element and it would have to block (e.g. a full queue), it instead returns a new future to upstream so that upstream can asynchronously wait on that future before producing more output. By this, back-pressure is implemented between different GStreamer elements without ever blocking any of the tokio threads. All this is implemented around the normal GStreamer data-flow mechanisms, there is no “tokio fast-path” between elements.

Little Benchmark

As mentioned above, there’s a small benchmark application in the examples directory. This basically sets up a configurable number of streams and directly connects them to a fakesink, throwing away all packets. Additionally there is another thread that is sending all these packets. As such, this is really the most basic benchmark and not very realistic but nonetheless it shows the same performance improvement as the real application. Again, see Mathieu’s upcoming blog post for a more realistic and complete benchmark.

When running it, make sure that your user can create enough fds. The benchmark will just abort if not enough fds can be allocated. You can control this with ulimit -n SOME_NUMBER, and allowing a couple of thousands is generally a good idea. The benchmarks below were running with 10000.

After running cargo build –release to build the plugin itself, you can run the benchmark with:

cargo run --release --example udpsrc-benchmark -- 1000 ts-udpsrc -1 1 20

and in another shell the UDP sender with

cargo run --release --example udpsrc-benchmark-sender -- 1000

This runs 1000 streams, uses ts-udpsrc (alternative would be udpsrc), configures exactly one thread -1, 1 context, and a wait time of 20ms. See above for what these settings mean. You can check CPU usage with e.g. top. Testing was done on an Intel i7-4790K, with Rust 1.25 and GStreamer 1.14. One packet is sent every 20ms for each stream.

Source Streams Threads Contexts Wait CPU
udpsrc 1000 1000 x x 44%
ts-udpsrc 1000 -1 1 0 18%
ts-udpsrc 1000 -1 1 20 13%
ts-udpsrc 1000 -1 2 20 15%
ts-udpsrc 1000 2 1 20 16%
ts-udpsrc 1000 2 2 20 27%
Source Streams Threads Contexts Wait CPU
udpsrc 2000 2000 x x 95%
ts-udpsrc 2000 -1 1 20 29%
ts-udpsrc 2000 -1 2 20 31%
Source Streams Threads Contexts Wait CPU
ts-udpsrc 3000 -1 1 20 36%
ts-udpsrc 3000 -1 2 20 47%

Results for 3000 streams for the old udpsrc are not included as starting up that many threads needs too long.

The best configuration is apparently a single thread per context (see this tokio GitHub issue) and waiting 20ms for every iterations. Compared to the old udpsrc, CPU usage is about one third in that setting, and generally it seems to parallelize well. It’s not clear to me why the last test has 11% more CPU with two contexts, while in every other test the number of contexts does not really make a difference, and also not for that many streams in the real test-case.

The waiting does not reduce CPU usage a lot in this benchmark, but on the real test-case it does. The reason is most likely that this benchmark basically sends all packets at once, then waits for the remaining time, then sends the next packets.

Take these numbers with caution, the real test-case in Mathieu’s blog post will show the improvements in the bigger picture, where it was generally a quarter of CPU usage and almost perfect parallelization when increasing the number of contexts.

Conclusion

Generally this was a fun exercise and we’re quite happy with the results, especially the real results. It took me some time to understand how tokio works internally so that I can implement all kinds of customizations on top of it, but for normal usage of tokio that should not be required and the overall design makes a lot of sense to me, as well as the way how futures are implemented in Rust. It requires some learning and understanding how exactly the API can be used and behaves, but once that point is reached it seems like a very productive and performant solution for asynchronous IO. And modelling asynchronous IO problems based on the Rust-style futures seems a nice and intuitive fit.

The performance measurements also showed that GStreamer’s default usage of threads is not always optimal, and a model like in upipe or pipewire (or rather SPA) can provide better performance. But as this also shows, it is possible to implement something like this on top of GStreamer and for the common case, using threads like in GStreamer reduces the cognitive load on the developer a lot.

For a future version of GStreamer, I don’t think we should make the threading “manual” like in these two other projects, but instead provide some API additions that make it nicer to implement thread-sharing elements and to add ways in the GStreamer core to make streaming threads non-blocking. All this can be implemented already, but it could be nicer.

All this “only” improved the number of threads, and thus the threading and context switching overhead. Many other optimizations in other areas are still possible on top of this, for example optimizing receive performance and reducing the number of memory copies inside the pipeline even further. If that’s something you would be interested in, feel free to get in touch.

And with that: Read Mathieu’s upcoming blog posts about the other parts, RTP jitterbuffer / RTCP timer thread sharing, and no allocations, and the full benchmark.