Building a GStreamer plugin in Rust with meson instead of cargo

Over the Easter holidays I spent some time on a little experiment: How hard is it to build a GStreamer plugin in Rust with meson instead of using the default Rust build system cargo?

meson is a community-developed, open-source, cross-platform (including Windows), multi-language build system that is supposed to be fast and easy to use. It’s nowadays used as a build system by a lot of components of the GNOME desktop, by GStreamer, systemd, Mesa, X.org, QEMU and many other pieces of Free Software. Mostly for building C or C++ code, but also for configuring and installing Python or JavaScript code, Vala, and other languages.

Wouldn’t it be nice if we could also build software in Rust with it, build it together with existing code in other languages and have a common build system for it all? What would be the advantages and disadvantages of that, and what’s the current status of Rust support in meson? How much effort is it to make use of all the existing 100k software packages (“crates”) that are available for Rust?

Especially as most the projects mentioned before are looking into adopting Rust more or less seriously as a safer and more modern successor of C, these seem like useful questions to investigate. Anecdotally, I also heard that a maintainer of one of these projects said that being able to use the same build system as the rest of the codebase would be a requirement to even consider the language. Another project is starting to write some components in Rust and building them with meson, but without depending on any external Rust dependencies for now.

Another reason for looking into this was that there seems to be the opinion that you can’t really use any build system apart from cargo for building Rust code, and using meson would be very hard to impossible and involve a lot of disadvantages. This has lead to currently all GNOME applications written in Rust having a chimera of a build system using both meson and cargo because neither of the two does everything these applications need. Such a setup is hard to maintain, debug and probably almost nobody really understands it. cargo’s design does not make embedding into another build system easy, and both cargo and meson have very specific, and to some degree incompatible, opinions about how things have to be done. Let’s see if that’s actually necessary and what’s missing to move away from that. As Facebook is apparently using buck to build part of their Rust code, and Google bazel, this shouldn’t really be impossible.

As I’m a GStreamer developer, the maintainer of its Rust bindings and the one who started writing plugins for it in Rust, trying to do build a GStreamer plugin in Rust with meson instead of cargo seemed like the obvious choice for this experiment.

However, everything here applies in the same way to building GTK applications with its Rust bindings or similarly to any of the software mentioned before for writing components of them in Rust.

EDIT: After publishing the first version I was told that meson actually supports a couple of things that I missed before. For example, running Rust tests is already supported just fine (but more verbose in the build definition than cargo), and meson can already install executables setuid root by itself. Also there is an equivalent to cargo init, meson init, which makes starting new projects a bit more convenient. Also I wrote that cargo clippy is not really supported yet. While true, you can get around that by telling meson (or any other build system) to use clippy-driver as compiler instead of using rustc. I’ve updated the text below accordingly.

Summary

The code for my experiment can be found here, but at the point of writing it needs some changes to meson that are not merged yet. A list of those changes with some explanation can be found further below. The git repository also includes a cargo build system that gives the same build results for comparison purposes.

I should also make clear that this is only an experiment at this point and while it works fine, it is more manual work than necessary and if you depend on a lot of existing crates from crates.io then you probably want to wait a bit more before considering meson. Also more on that later. However, if you won’t have to depend on a lot of crates, your codebase is relatively self-contained and maybe even has to be built together with C code, then meson is already a viable alternative and has some advantages to offer compared to cargo. But also some disadvantages.

Almost all of the manual work I did as part of this experiment can be automated, and a big part of that is not going to be a lot of work either. I didn’t do that here to get an idea of the problems that would actually be encountered in practice when implementing such an automated system. I’ll get to that in more detail at the very end.

In summary I would say that

  • meson is less convenient and less declarative than cargo, but in exchange more powerful and flexible
  • meson’s Rust support is not very mature yet and there’s very little tooling integration
  • cargo is great and easy to use if your project falls into the exact pattern it handles but easily becomes annoying for you and your users otherwise
  • the developer experience is much better with cargo currently but the build and deployment experience is better with meson

More on each of these items below.

Procedure

As a goal I wanted to build one of the parts of the gst-plugins-rs tutorial, specifically an identity-kind of element that simply passes through its input, and build that into a GStreamer plugin that can be loaded into any GStreamer process. For that it has to be built into cdylib in Rust terms: a shared library that offers a C ABI. For this purpose, meson already has a big advantage over cargo in that it can actually install the build result in its correct place while cargo can only install executables right now. But more on that part later too.

The main task for this was translating all the Rust crate dependencies from cargo to meson, manually one by one. Overall 44 dependencies were needed, and I translated them into so-called meson wrap dependencies. The wrap dependency system of meson, similar to cargo, allows to download source code of dependencies from another location (in this case crates.io) and to extend it with a patch, in this case to add the meson-based build system for each.

In practice this meant creating a lot of meson.build files based on the Cargo.toml of each crate. The following following is the meson.build for the toml_edit crate.

project('toml_edit-rs', 'rust',
  version : '0.19.8',
  meson_version : '>= 1.0.0',
  default_options : ['buildtype=debugoptimized',
                     'rust_std=2021'
                    ]
)

rustc = meson.get_compiler('rust')

toml_datetime_dep = dependency('toml_datetime-rs', version : ['>= 0.6', '< 0.7'])
winnow_dep = dependency('winnow-rs', version : ['>= 0.4', '< 0.5'])
indexmap_dep = dependency('indexmap-rs', version : ['>= 1.9.1', '< 2.0'])

features = []
features += ['--cfg', 'feature="default"']

toml_edit = static_library('toml_edit', 'src/lib.rs',
  rust_args : features,
  rust_crate_type : 'rlib',
  dependencies : [toml_datetime_dep, winnow_dep, indexmap_dep],
  pic : true,
)

toml_edit_dep = declare_dependency(link_with : toml_edit)

and the corresponding wrap file

[wrap-file]
directory = toml_edit-0.19.8
source_url = https://crates.io/api/v1/crates/toml_edit/0.19.8/download
source_filename = toml_edit-0.19.8.tar.gz
source_hash = 239410c8609e8125456927e6707163a3b1fdb40561e4b803bc041f466ccfdc13
diff_files = toml_edit-rs.meson.patch

[provide]
toml_edit-rs = toml_edit_dep

As can be seen from the above, this could all be autogenerated from the corresponding Cargo.toml and that’s the case for a lot of crates. There are also some plans and ideas since quite a while in the meson community to actually develop such a tool, but so far this hasn’t materialized yet. Maybe my experiment can provide some motivation to actually start with that work.

For simplicity, when translating these by hand I didn’t consider including

  • any optional dependencies unless needed for my tasks
  • any tests, examples, executables as part of the crate
  • any cargo feature configuration instead of exactly what I needed

All of this can be easily done with meson though, and an automated tool for translating Cargo.toml into meson wraps should easily be able to handle that. For cargo features there are multiple ways of mapping them to meson, so some conventions have to be defined first of all. Similarly for naming Rust crates as meson dependencies it will be necessary to define some conventions to allow sharing between projects.

The hard part for translating some of the crates were the cargo build.rs build scripts. These build scripts allow running arbitrary Rust code as part of the build, which will also make automatic translation challenging. More on that later.

Once I had translated all the 44 dependencies, including the GStreamer Rust bindings, various procedural Rust macros and a lot of basic crates of the Rust ecosystem, I copied the plugin code into the new repository, changed some minor things and could then write a meson.build file for it.

project('gst-plugin-meson-test', 'rust',
  version : '0.0.1',
  meson_version : '>= 1.0.0',
  default_options : ['buildtype=debugoptimized',
                     'rust_std=2021'
                    ]
)

plugins_install_dir = '@0@/gstreamer-1.0'.format(get_option('libdir'))

rustc = meson.get_compiler('rust')

add_global_arguments(
  '-C', 'embed-bitcode=no',
  language: 'rust'
)

gst_dep = dependency('gstreamer-rs', version : ['>= 0.20', '< 0.21'])
once_cell_dep = dependency('once_cell-rs', version : ['>= 1.0', '< 2.0'])

gst_plugin_meson_test = dynamic_library('gstmesontest', 'src/lib.rs',
  rust_crate_type : 'cdylib',
  rust_dependency_map : {
    'gstreamer' : 'gst',
  },
  dependencies : [gst_dep, once_cell_dep],
  install : true,
  install_dir : plugins_install_dir,
)

To get everything building like this with the latest meson version (1.1.0), at this point some additional changes are needed: 1, 2, 3 and 4. Also currently all of this only supports native compilation. Cross-compilation of proc-macro in meson is currently broken but should be not too hard to fix in the future.

A careful reader will also notice that currently all crates with a dash (-) in their name are named like that for the dependency name, but the library they’re building has the dash replaced by an underscore in its name. This is due to a rustc bug (or undocumented behaviour?), and meson will likely require this translation of forbidden characters in crate names for the foreseeable future.

With all that in place, building the project is a matter of running

$ meson builddir
$ ninja -C builddir

compared to a single command with cargo

$ cargo build

Note that the meson build will be slower because by default meson already builds with optimizations (-C opt-level=2) while cargo does not.

Comparison between cargo and meson

In the following section I’m going to compare some aspects of cargo and meson, and give my general impression of both. To take the conclusion ahead, my perfect build system would include aspects of both cargo and meson and currently both are lacking in their own way. Like with any other tool (or programming language, for that matter): if you don’t have anything to complain about it you don’t know it well enough yet or don’t know any of the alternatives.

To avoid that people focus on the negative aspects, I’m also going to try to describe all the shortcomings of one build system as a good feature of the other. This is no competition but both meson and cargo can learn a lot from each other.

Build Times

Above I shortly mentioned build times. And because that’s something everybody is interested in and a common complaint about Rust, let’s start with that topic even if it’s in my opinion the most boring one.

Overall you would expect both build systems to behave the same if they do the same work as they are both basically just sophisticated Rust compiler process spawners. If you want to improve build times then your time is probably spent better on the Rust compiler itself and LLVM.

All times below are measured on my system very unscientifically with time. This is all only to give an idea of the general behaviour and to check if there are conceptual inefficiencies or problems. Also, when reproducing these results make sure to pass -Dbuildtype=debug to meson for comparable results between meson and cargo.

  • Running meson (build configuration): 1.4s (1.0s user, 0.4s system)
  • Running ninja (actual build): 10.4s (23.5s user, 3.2s system)
  • Running cargo: 11.0s (34.1s user, 6.6s system)

One thing that shows here immediately is that both need approximately the same time. The build alone takes slightly less than cargo, the configure and build steps together slightly more. So far what would be expected. However, cargo uses almost 45% more CPU time. I didn’t investigate this in great detail but the main two reasons for that are likely

  • cargo is building and running a build.rs build script in 23 of the 44 dependencies, which takes a while and also pulls in some more dependencies that are not needed for the meson build
  • meson is currently parallelizing less well compared to cargo when building Rust code, or otherwise the build step would likely be 10% or more faster

cargo build scripts

Interestingly, the main pain point when translating the crate build systems from cargo to meson also seems to be the main factor for cargo being more inefficient than it could be. This also seems to be a known fact in the wider Rust community by now.

But in addition to being inefficient and painful to translate (especially automatically), it is in my opinion also a maintenance nightmare and literally the worst part of cargo. It’s not declarative in what the build script is actually trying to do, it’s very easy to write build scripts that don’t work correctly in other environments and two crates doing the same things in a build script are generally going to behave differently in non-obvious ways.

For all the crates that I translated, the reasons why the build scripts existed in 23 of 44 crates should really be features that cargo should provide directly and that meson can do directly:

  • Checking which version of the Rust compiler is used and based on that enabling/disabling various features
  • Checking features or versions of the underlying platform or operating system
  • Checking for existence of native, external libraries or even building them
  • Not in these cases: code generation

Especially the last two points are painful for build and deployment of Rust code, and that every crate has its own special way of solving it in a build script doesn’t make it better. And in the end both tasks are exactly what you have a build system for: building things, tracking dependencies between them and generating an ideal build plan or schedule.

The system-deps crate is providing a way for expressing external dependencies in a declarative way as part of Cargo.toml, and a similar system built into cargo and integrated with the platform’s mechanism for finding libraries would be a huge improvement. Similar approaches for the other aspects would also be helpful. And not just for making a different build system for crates, but also for developers using cargo.

I’m sure that once cargo gained features for handling the four items above there will still be a need for custom build scripts in some situations, but these four items should cover more than 95% of the currently existing build scripts. It’s a mystery to me why there are no apparent efforts being made to improve cargo in this regard.

Good Parts of meson

Now let’s focus on some of the good parts of meson in comparison to cargo.

More flexible and expressive

Generally, meson has a much more flexible and expressive build definition language. It looks more like a scripting language than the toml files used by cargo, and as such appears more complex.

However thanks to this approach, almost anything for which cargo requires custom build scripts, with the disadvantages listed above, can be written directly as part of the meson build definitions. Expressing many of these things in toml for cargo would likely become convoluted and not very straightforward, as can already be seen nowadays with e.g. platform-specific dependencies.

meson provides built-in modules with support for e.g.

  • finding and checking versions of compilers of different languages and other build tools, e.g. for code generation, including the Rust bindgen tool
  • testing compilation of code snippets
  • finding external library dependencies and checking their versions
  • defining shared/static library, excutable and code generation build targets together with how (and if) they should be installed later
  • installing (and generating) data files, including specific support for e.g. library metadata (pkg-config), documentation, gobject-introspection, …

As an escape hatch, whenever something can’t be expressed by just meson, it is also possible to run external configuration/build/install scripts that could be written as a Python/shell script, C or Rust executable, or anything else really. This is rarely needed though and the meson project seems to add support for anything that people actually need in practice.

Build configuration system

meson provides an extensible configuration mechanism for the build, which allow the user building the software to customize the build process and its results.

  1. The built-in options allow for configuring things like switching between debug and release builds, defining toolchains and defining where to install build results.

  2. The build options allow each project to extend the above with custom configuration, e.g. for enabling/disabling optional features of the project or generally selecting between different configurations. Apart from simple boolean flags this also allows for other types of configuration, including integers and strings.

cargo only provides one of the two in the form of fixed, non-extensible compiler profiles and configuration.

The second is not to be confused with cargo’s feature flags, which are more a mechanism for selecting configuration of dependencies by the developer of the software. The meson build configuration however is for the builder of the software to select different configurations. While for executables cargo feature flags are used in a similar way for boolean configurations, cargo does not provide anything for this in general.

As a workaround for this, a couple of Rust crates are using environment variables together with the env! macro for build configuration but this is fragile and not discoverable, and more of a hack than a real solution.

Support for Rust and non-Rust code in the same project

meson supports mixing Rust and non-Rust code in the same project, and allows tracking dependencies between targets using different languages in the same way.

While it is not supported to mix Rust and e.g. C code in the same build target due to Rust’s compilation model, it is possible to e.g. build a static Rust library, a static C library and link both together into e.g. a D application or Python module. An example for this would be this Python module that combines C, C++, Rust and Fortran code.

Code generation can be handled in a similar way as in the end code generation is just another transformation from one format into another.

cargo doesn’t directly support anything but Rust code. As usual, build scripts provide a mechanism to get around this limitation. The cc crate for example is widely used to build C code and there are also crates for building meson or cmake based software as part of the build process.

All of this is completely opaque to cargo though and can’t be taken into account for defining an optimal build schedule, can’t be configured from the outside and regularly fails in non-standard build situations (e.g. cross-compilation).

Installation of other files than executables

meson allows every build result to be installed in a configurable location. This is especially useful for more complex applications that might have to provide various data files or come as an executable plus multiple libraries or plugins, or simply for projects that only provide a shared library. If any of the built-in installation mechanisms are not sufficient (e.g. the executable should be get specific process capabilities set via setcap), meson also allows to customize the install process via scripts.

cargo only allows to install executables right now. There are cargo extensions that also allow for more complex tasks, e.g. cargo xtask but there is no standard mechanism. There once was an RFC to make cargo’s installation process extensible, but the way how this was proposed would also suffer from the same problems as the cargo build scripts.

External dependency and library support

In addition to mixing build targets with multiple languages in the same project, meson also has a mechanism to find external dependencies in different ways. If an external dependency is not found, it can be provided and built as part of the project via the wrap mechanism mentioned before. The latter is similar to how cargo handles dependencies, but the former is missing completely and currently implemented via build scripts instead, e.g. by using the pkg-config crate.

As Rust does not currently provide a stable ABI and provides no standard mechanism to locate library crates on the system, this mostly applies to library dependencies written in other languages. meson does support building Rust shared/static libraries and installing them too, but because of the lack of a stable ABI this has to be made use of very carefully.

On the other hand, Rust allows building shared/static libraries that provide the stable C ABI of the platform (cdylib, staticlib crate types). meson allows building these correctly too, and also offers mechanisms for installing them together with their (potentially autogenerated header files) and locating them again later in other projects via e.g. pkg-config.

For cargo this job can be taken care of by cargo-c, including actually building shared libraries correctly by setting e.g. the soname correctly and setting other kinds of versioning information.

Good Parts of cargo

After writing so much about meson and how great it is, let’s now look at some aspects of cargo that are better than what meson provides. Like I said before, both have their good sides.

Simpler and more declarative build definitions

The cargo manifest format is clearly a lot simpler and more declarative than the meson build definition format.

For a simple project it looks more like a project description than something written in a scripting language.

[package]
name = "hello_world"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0"

As long as a project stays in the boundaries of what cargo makes easy to express, which should be the case for the majority of existing Rust projects, it is going to be simpler than meson. The lack of various features in cargo that require the use of a build script prevent this currently for many crates, but that seems like something that could be easily improved.

meson on the other hand feels more like writing actual build scripts in some kind of scripting language, and information like “what dependencies does this have” are not as easily visible as from something like a Cargo.toml.

Tooling integration

cargo provides a lot development tools that make development with it a very convenient and smooth experience. There are also dozens of cargo extension commands that provide additional features on top of cargo.

cargo init creates a new project, cargo add adds new dependencies, cargo check type-checks code without full compilation, cargo clippy runs a powerful linter, cargo doc builds the documentation (incl. dependencies), cargo bench and cargo test allow running tests and benchmarks, cargo show-asm shows the generated, annotated assembly for the code, cargo udeps finds unused dependencies, …

All of this makes development of Rust project a smooth and well-integrated experience.

In addition rust-analyzer provides a lot of IDE features via the LSP protocol for various editors and IDEs.

Right now, IDEs and editors are assuming Rust projects to make use of cargo and offer integration with its features.

On the other hand, when using meson almost none of this is currently provided and development feels less well integrated. Right now the only features provided by meson for making Rust development easier is generation of a rust-project.json to make use of rust-analyzer and being able to run tests in a similar way to cargo, and of course actually building the code. Building of documentation could be easily added to meson and is supported for other languages, something like cargo add for wrap dependencies exists already and adding crates.io support to it would be possible, but it’s going to take a while and a bit of effort to handle crates with cargo build scripts. Making use of all the cargo extension commands without actually using cargo seems unrealistic.

In the end, cargo is the default build system for Rust and everything currently assumes usage of cargo so using cargo offers the best developer experience.

Rust dependencies

As shortly mentioned above, cargo add makes it extremely easy to add new Rust dependencies to a project and build them as part of the project. This helps a lot with code reuse and modularity. That an average Rust project has dozens of direct dependencies and maybe a hundred or more indirect dependencies shows this quite clearly, as does the encapsulation of very small tasks in separate dependencies compared to huge multi-purpose libraries that are common with other languages.

cargo also directly handles updating of dependencies via cargo update, including making sure that only semver compatible versions are automatically updated and allows including multiple incompatible versions of dependencies in the same build if necessary.

In addition to just adding dependencies, cargo features allow for conditional compilation and for defining which parts and features of a dependency should be enabled or not.

meson has some explicit handling of dependencies and the wrap system also allows building external dependencies as part of the project, but adding or updating dependencies is more manual process unless they are in the meson wrapdb. For now it is completely manual with regards to Rust crates. It is no different from adding dependencies in non-Rust languages though.

There is also no direct equivalent of the cargo feature flags, which potentially seems like a useful addition to meson.

Next steps

Considering all of the above, there is not really a simple answer to which of the two choices is the best for your project. As of now I would personally always use cargo for Rust projects if I can, especially if they will have other Rust dependencies. It’s a lot more convenient to develop with cargo.

However various features of meson might make it a better choice for some projects, maybe already now or at least in the future. For example the ability to handle multiple languages, to handle dependencies and shared libraries correctly or the ability to install data files together with the application. Or simply because the remainder of the project already uses meson for compiling e.g. C code, meson might be the more natural choice for adding Rust code to the mix.

As outlined above there are many areas of meson that could be improved and where Rust support is not very mature yet, and to make meson a more viable alternative to cargo these will have to happen sooner or later. Similarly, there are various areas where cargo also could be improved and learn from meson, and where improvements to cargo, in addition to making cargo more flexible and easier to use, would also make it easier to handle other build systems.

From a meson point of view, I would consider the following the next steps.

Various bugs and missing features

rustc and cargo

On the Rust compiler side, there are currently two minor issues that would need looking into.

  • rustc#80792: Adding support for passing environment variables via the command-line instead of using actual environment variables. Environment variables are currently used for build configuration by many crates. Allowing to provide them via the command-line would allow for more clean and reliable build rules without accidentally leaking actual environment variables into the build.
  • rustc#110460: Undocumented and maybe unintentional library filename requirement based on the crate name. This currently requires meson to disallow dashes in crate names and to have them explicitly replaced by underscore, which is something cargo does implicitly. Implicit conversion inside meson would also be feasible but probably not desireable because there would then be a mismatch between the name of the build target in the build definition and the actual name of the build target when building it. Similar to the mild confusion that some people ran into when noticing that a crate with a dash in the name can only be referenced with underscores from Rust code.

In addition it would be useful to look into moving various features from build scripts into proper cargo, as outlined above. This will probably need a lot of design and discussion effort, and will likely also take years after implementation until the important crates are moved to it due to very conservative Rust toolchain version requirement policies in various of these crates.

So on the cargo side that’s more something for the long run but also something that would greatly benefit users of cargo.

meson

On the meson side there are a couple of bugs of different severity that will have to be solved, and probably more that will show up once more people are starting to use meson to build Rust projects.

In addition to the ones I mentioned above and that are merged already into the meson git repository, at the time of this writing there were for example the following outstanding issues

  • meson#11681: Add a feature to meson to allow renaming crates when using as a dependency. This is used throughout the Rust ecosystem for handling multiple versions of the same crate at once, or also simply for having a more convenient local name of a dependency.
  • meson#11695: Parallelize the Rust build better by already starting to build the next Rust targets when the metadata of the dependencies is available. This should bring build time improvements of 10% or more on machines with enough cores, and the lack of it is the main reason why the meson build in my experiment was “only” as fast as the cargo build and not faster.
  • meson#11702: Cross-compilation of proc-macro crates is currently using the wrong toolchain and simply doesn’t work.
  • meson#11694: Indirect dependencies of all dependencies are passed onwards on the later compiler invocations, which brings the risk of unnecessary name conflicts and simply causes more work for the compiler than necessary.
  • meson#10030: Add support for passing environment variables to the Rust compiler. As mentioned above, many crates are currently using this for build configuration so this would have to be supported by meson in one way or another.

Apart from the second one in this list these should all be doable relatively fast and generally getting fixes and improvements required for Rust merged into meson was a fast and pleasant experience so far. I didn’t encounter any unnecessary bikeshedding or stop energy.

Tooling for managing cargo dependencies

During my experiment I wrote all the meson wrap files manually. This does not really scale, is inconvenient, and also makes it harder to update dependencies later.

The goal here would be to provide a tool in the shape of cargo add and cargo update that allows to automatically add cargo-based Rust dependencies to a meson project. This is something that was discussed a lot in the past and various people in the meson community have ideas and plans around this. meson already has something similar for the wrapdb, meson wrap add and meson wrap update, but the idea would be to have something similar (or integration into that) to directly support crates from crates.io so Rust dependencies can be added with as little effort to a meson project as they can currently to a cargo project.

Apart from the cargo build scripts this shouldn’t be a lot of effort and a project for an afternoon at most, so maybe I’ll give that a try one of these days if nobody beats me to it.

As part of such a tool, it will also be necessary to define conventions about naming, mapping of cargo features, versioning, etc. of Rust crates inside meson and this should ideally be done from early on to avoid unnecessary churn. The way how I did it as part of my experiment has various drawbacks with regards to versioning and needs improvements.

Handling cargo build scripts is a bigger issue though. As my experiment showed, about half of the crates had build scripts. While all of them were more or less trivial, automatically translating this into meson build definitions seems unrealistic to me.

It might be possible to have meson use the cargo build scripts directly in one way or another, or they would have to be translated manually to meson build definitions. The latter would considerably improve build times so seems like a better approach for common crates at least. And for those the meson build definitions could be stored in a central place, like the meson wrapdb or maybe even be included in the crates on crates.io if their maintainers feel like dealing with two build systems.

Together with all this, some thought will also have to be put into how to locate such Rust dependencies, similar to how pkg-config allows to locate shared libraries. For example, Linux distributions will want to package such dependencies and make sure that a project built for such a distribution is making use of the packaged dependencies instead of using any other version, or worse downloading some version from the Internet at build time. The way how this is currently handled by cargo is also not optimal for Linux distributions and a couple of other build and deployment scenarios.

Because of the lack of a stable Rust ABI this would mean locating Rust source code.

Tooling integration

And last, as mentioned above there is basically no tooling integration right now apart from being able to build Rust code and using rust-analyzer. meson should at least support the most basic tasks that cargo supports, and that meson already supports for other languages: running tests and benchmarks, running linters and building documentation.

Once those basic tasks are done, it might be worth investigating other tooling integration like the various cargo extension commands offer or extending those commands to handle other build systems, e.g. via the rust-project.json that rust-analyzer uses for that purpose.

Instantaneous RTP synchronization & retrieval of absolute sender clock times with GStreamer

Over the last few weeks, GStreamer’s RTP stack got a couple of new and quite useful features. As it is difficult to configure, mostly because there being so many different possible configurations, I decided to write about this a bit with some example code.

The features are RFC 6051-style rapid synchronization of RTP streams, which can be used for inter-stream (e.g. audio/video) synchronization as well as inter-device (i.e. network) synchronization, and the ability to easily retrieve absolute sender clock times per packet on the receiver side.

Note that each of this was already possible before with GStreamer via different mechanisms with different trade-offs. Obviously, not being able to have working audio/video synchronization would be simply not acceptable and I previously talked about how to do inter-device synchronization with GStreamer before, for example at the GStreamer Conference 2015 in Düsseldorf.

The example code below will make use of the GStreamer RTSP Server library but can be applied to any kind of RTP workflow, including WebRTC, and are written in Rust but the same can also be achieved in any other language. The full code can be found in this repository.

And for reference, the merge requests to enable all this are [1], [2] and [3]. You probably don’t want to backport those to an older version of GStreamer though as there are dependencies on various other changes elsewhere. All of the following needs at least GStreamer from the git main branch as of today, or the upcoming 1.22 release.

Baseline Sender / Receiver Code

The starting point of the example code can be found here in the baseline branch. All the important steps are commented so it should be relatively self-explanatory.

Sender

The sender is starting an RTSP server on the local machine on port 8554 and provides a media with H264 video and Opus audio on the mount point /test. It can be started with

$ cargo run -p rtp-rapid-sync-example-send

After starting the server it can be accessed via GStreamer with e.g. gst-play-1.0 rtsp://127.0.0.1:8554/test or similarly via VLC or any other software that supports RTSP.

This does not do anything special yet but lays the foundation for the following steps. It creates an RTSP server instance with a custom RTSP media factory, which in turn creates custom RTSP media instances. All this is not needed at this point yet but will allow for the necessary customization later.

One important aspect here is that the base time of the media’s pipeline is set to zero

pipeline.set_base_time(gst::ClockTime::ZERO);
pipeline.set_start_time(gst::ClockTime::NONE);

This allows the timeoverlay element that is placed in the video part of the pipeline to render the clock time over the video frames. We’re going to use this later to confirm on the receiver that the clock time on the sender and the one retrieved on the receiver are the same.

let video_overlay = gst::ElementFactory::make("timeoverlay", None)
    .context("Creating timeoverlay")?;
[...]
video_overlay.set_property_from_str("time-mode", "running-time");

It actually only supports rendering the running time of each buffer, but in a live pipeline with the base time set to zero the running time and pipeline clock time are the same. See the documentation for some more details about the time concepts in GStreamer.

Overall this creates the following RTSP stream producer bin, which will be used also in all the following steps:

Receiver

The receiver is a simple playbin pipeline that plays an RTSP URI given via command-line parameters and runs until the stream is finished or an error has happened.

It can be run with the following once the sender is started

$ cargo run -p rtp-rapid-sync-example-recv -- "rtsp://192.168.1.101:8554/test"

Please don’t forget to replace the IP with the IP of the machine that is actually running the server.

All the code should be familiar to anyone who ever wrote a GStreamer application in Rust, except for one part that might need a bit more explanation

pipeline.connect_closure(
    "source-setup",
    false,
    glib::closure!(|_playbin: &gst::Pipeline, source: &gst::Element| {
        source.set_property("latency", 40u32);
    }),
);

playbin is going to create an rtspsrc, and at that point it will emit the source-setup signal so that the application can do any additional configuration of the source element. Here we’re connecting a signal handler to that signal to do exactly that.

By default rtspsrc introduces a latency of 2 seconds of latency, which is a lot more than what is usually needed. For live, non-VOD RTSP streams this value should be around the network jitter and here we’re configuring that to 40 milliseconds.

Retrieval of absolute sender clock times

Now as the first step we’re going to retrieve the absolute sender clock times for each video frame on the receiver. They will be rendered by the receiver at the bottom of each video frame and will also be printed to stdout. The changes between the previous version of the code and this version can be seen here and the final code here in the sender-clock-time-retrieval branch.

When running the sender and receiver as before, the video from the receiver should look similar to the following

The upper time that is rendered on the video frames is rendered by the sender, the bottom time is rendered by the receiver and both should always be the same unless something is broken here. Both times are the pipeline clock time when the sender created/captured the video frame.

In this configuration the absolute clock times of the sender are provided to the receiver via the NTP / RTP timestamp mapping provided by the RTCP Sender Reports. That’s also the reason why it takes about 5s for the receiver to know the sender’s clock time as RTCP packets are not scheduled very often and only after about 5s by default. The RTCP interval can be configured on rtpbin together with many other things.

Sender

On the sender-side the configuration changes are rather small and not even absolutely necessary.

rtpbin.set_property_from_str("ntp-time-source", "clock-time");

By default the RTP NTP time used in the RTCP packets is based on the local machine’s walltime clock converted to the NTP epoch. While this works fine, this is not the clock that is used for synchronizing the media and as such there will be drift between the RTP timestamps of the media and the NTP time from the RTCP packets, which will be reset every time the receiver receives a new RTCP Sender Report from the sender.

Instead, we configure rtpbin here to use the pipeline clock as the source for the NTP timestamps used in the RTCP Sender Reports. This doesn’t give us (by default at least, see later) an actual NTP timestamp but it doesn’t have the drift problem mentioned before. Without further configuration, in this pipeline the used clock is the monotonic system clock.

rtpbin.set_property("rtcp-sync-send-time", false);

rtpbin normally uses the time when a packet is sent out for the NTP / RTP timestamp mapping in the RTCP Sender Reports. This is changed with this property to instead use the time when the video frame / audio sample was captured, i.e. it does not include all the latency introduced by encoding and other processing in the sender pipeline.

This doesn’t make any big difference in this scenario but usually one would be interested in the capture clock times and not the send clock times.

Receiver

On the receiver-side there are a few more changes. First of all we have to opt-in to rtpjitterbuffer putting a reference timestamp metadata on every received packet with the sender’s absolute clock time.

pipeline.connect_closure(
    "source-setup",
    false,
    glib::closure!(|_playbin: &gst::Pipeline, source: &gst::Element| {
        source.set_property("latency", 40u32);
        source.set_property("add-reference-timestamp-meta", true);
    }),
);

rtpjitterbuffer will start putting the metadata on packets once it knows the NTP / RTP timestamp mapping, i.e. after the first RTCP Sender Report is received in this case. Between the Sender Reports it is going to interpolate the clock times. The normal timestamps (PTS) on each packet are not affected by this and are still based on whatever clock is used locally by the receiver for synchronization.

To actually make use of the reference timestamp metadata we add a timeoverlay element as video-filter on the receiver:

let timeoverlay =
    gst::ElementFactory::make("timeoverlay", None).context("Creating timeoverlay")?;

timeoverlay.set_property_from_str("time-mode", "reference-timestamp");
timeoverlay.set_property_from_str("valignment", "bottom");

pipeline.set_property("video-filter", &timeoverlay);

This will then render the sender’s absolute clock times at the bottom of each video frame, as seen in the screenshot above.

And last we also add a pad probe on the sink pad of the timeoverlay element to retrieve the reference timestamp metadata of each video frame and then printing the sender’s clock time to stdout:

let sinkpad = timeoverlay
    .static_pad("video_sink")
    .expect("Failed to get timeoverlay sinkpad");
sinkpad
    .add_probe(gst::PadProbeType::BUFFER, |_pad, info| {
        if let Some(gst::PadProbeData::Buffer(ref buffer)) = info.data {
            if let Some(meta) = buffer.meta::<gst::ReferenceTimestampMeta>() {
                println!("Have sender clock time {}", meta.timestamp());
            } else {
                println!("Have no sender clock time");
            }
        }

        gst::PadProbeReturn::Ok
    })
    .expect("Failed to add pad probe");

Rapid synchronization via RTP header extensions

The main problem with the previous code is that the sender’s clock times are only known once the first RTCP Sender Report is received by the receiver. There are many ways to configure rtpbin to make this happen faster (e.g. by reducing the RTCP interval or by switching to the AVPF RTP profile) but in any case the information would be transmitted outside the actual media data flow and it can’t be guaranteed that it is actually known on the receiver from the very first received packet onwards. This is of course not a problem in every use-case, but for the cases where it is there is a solution for this problem.

RFC 6051 defines an RTP header extension that allows to transmit the NTP timestamp that corresponds an RTP packet directly together with this very packet. And that’s what the next changes to the code are making use of.

The changes between the previous version of the code and this version can be seen here and the final code here in the rapid-synchronization branch.

Sender

To add the header extension on the sender-side it is only necessary to add an instance of the corresponding header extension implementation to the payloaders.

let hdr_ext = gst_rtp::RTPHeaderExtension::create_from_uri(
    "urn:ietf:params:rtp-hdrext:ntp-64",
    )
    .context("Creating NTP 64-bit RTP header extension")?;
hdr_ext.set_id(1);
video_pay.emit_by_name::<()>("add-extension", &[&hdr_ext]);

This first instantiates the header extension based on the uniquely defined URI for it, then sets its ID to 1 (see RFC 5285) and then adds it to the video payloader. The same is then done for the audio payloader.

By default this will add the header extension to every RTP packet that has a different RTP timestamp than the previous one. In other words: on the first packet that corresponds to an audio or video frame. Via properties on the header extension this can be configured but generally the default should be sufficient.

Receiver

On the receiver-side no changes would actually be necessary. The use of the header extension is signaled via the SDP (see RFC 5285) and it will be automatically made use of inside rtpbin as another source of NTP / RTP timestamp mappings in addition to the RTCP Sender Reports.

However, we configure one additional property on rtpbin

source.connect_closure(
    "new-manager",
    false,
    glib::closure!(|_rtspsrc: &gst::Element, rtpbin: &gst::Element| {
        rtpbin.set_property("min-ts-offset", gst::ClockTime::from_mseconds(1));
    }),
);

Inter-stream audio/video synchronization

The reason for configuring the min-ts-offset property on the rtpbin is that the NTP / RTP timestamp mapping is not only used for providing the reference timestamp metadata but it is also used for inter-stream synchronization by default. That is, for getting correct audio / video synchronization.

With RTP alone there is no mechanism to synchronize multiple streams against each other as the packet’s RTP timestamps of different streams have no correlation to each other. This is not too much of a problem as usually the packets for audio and video are received approximately at the same time but there’s still some inaccuracy in there.

One approach to fix this is to use the NTP / RTP timestamp mapping for each stream, either from the RTCP Sender Reports or from the RTP header extension, and that’s what is made use of here. And because the mapping is provided very often via the RTP header extension but the RTP timestamps are only accurate up to clock rate (1/90000s for video and 1/48000s) for audio in this case, we configure a threshold of 1ms for adjusting the inter-stream synchronization. Without this it would be adjusted almost continuously by a very small amount back and forth.

Other approaches for inter-stream synchronization are provided by RTSP itself before streaming starts (via the RTP-Info header), but due to a bug this is currently not made use of by GStreamer.

Yet another approach would be via the clock information provided by RFC 7273, about which I already wrote previously and which is also supported by GStreamer. This also allows inter-device, network synchronization and used for that purpose as part of e.g. AES67, Ravenna, SMPTE 2022 / 2110 and many other protocols.

Inter-device network synchronization

Now for the last part, we’re going to add actual inter-device synchronization to this example. The changes between the previous version of the code and this version can be seen here and the final code here in the network-sync branch. This does not use the clock information provided via RFC 7273 (which would be another option) but uses the same NTP / RTP timestamp mapping that was discussed above.

When starting the receiver multiple times on different (or the same) machines, each of them should play back the media synchronized to each other and exactly 2 seconds after the corresponding audio / video frames are produced on the sender.

For this, both sender and all receivers are using an NTP clock (pool.ntp.org in this case) instead of the local monotonic system clock for media synchronization (i.e. as the pipeline clock). Instead of an NTP clock it would also be possible to any other mechanism for network clock synchronization, e.g. PTP or the GStreamer netclock.

println!("Syncing to NTP clock");
clock
    .wait_for_sync(gst::ClockTime::from_seconds(5))
    .context("Syncing NTP clock")?;
println!("Synced to NTP clock");

This code instantiates a GStreamer NTP clock and then synchronously waits up to 5 seconds for it to synchronize. If that fails then the application simply exits with an error.

Sender

On the sender side all that is needed is to configure the RTSP media factory, and as such the pipeline used inside it, to use the NTP clock

factory.set_clock(Some(&clock));

This causes all media inside the sender’s pipeline to be synchronized according to this NTP clock and to also use it for the NTP timestamps in the RTCP Sender Reports and the RTP header extension.

Receiver

On the receiver side the same has to happen

pipeline.use_clock(Some(&clock));

In addition a couple more settings have to be configured on the receiver though. First of all we configure a static latency of 2 seconds on the receiver’s pipeline.

pipeline.set_latency(gst::ClockTime::from_seconds(2));

This is necessary as GStreamer can’t know the latency of every receiver (e.g. different decoders might be used), and also because the sender latency can’t be automatically known. Each audio / video frame will be timestamped on the receiver with the NTP timestamp when it was captured / created, but since then all the latency of the sender, the network and the receiver pipeline has passed and for this some compensation must happen.

Which value to use here depends a lot on the overall setup, but 2 seconds is a (very) safe guess in this case. The value only has to be larger than the sum of sender, network and receiver latency and in the end has the effect that the receiver is showing the media exactly that much later than the sender has produced it.

And last we also have to tell rtpbin that

  1. sender and receiver clock are synchronized to each other, i.e. in this case both are using exactly the same NTP clock, and that no translation to the pipeline’s clock is necessary, and
  2. that the outgoing timestamps on the receiver should be exactly the sender timestamps and that this conversion should happen based on the NTP / RTP timestamp mapping
source.set_property_from_str("buffer-mode", "synced");
source.set_property("ntp-sync", true);

And that’s it.

A careful reader will also have noticed that all of the above would also work without the RTP header extension, but then the receivers would only be synchronized once the first RTCP Sender Report is received. That’s what the test-netclock.c / test-netclock-client.c example from the GStreamer RTSP server is doing.

As usual with RTP, the above is by far not the only way of doing this and GStreamer also supports various other synchronization mechanisms. Which one is the correct one for a specific use-case depends on a lot of factors.

GStreamer Rust Bindings & Plugins New Releases

It has been quite a while since the last status update for the GStreamer Rust bindings and the GStreamer Rust plugins, so the new releases last week make for a good opportunity to do so now.

Bindings

I won’t write too much about the bindings this time. The latest version as of now is 0.16.1, which means that since I started working on the bindings there were 8 major releases. In that same time there were 45 contributors working on the bindings, which seems quite a lot and really makes me happy.

Just as before, I don’t think any major APIs are missing from the bindings anymore, even for implementing subclasses of the various GStreamer types. The wide usage of the bindings in Free Software projects and commercial products also shows both the interest in writing GStreamer applications and plugins in Rust as well as that the bindings are complete enough and production-ready.

Most of the changes since the last status update involve API cleanups, usability improvements, various bugfixes and addition of minor API that was not included before. The details of all changes can be read in the changelog.

The bindings work with any GStreamer version since 1.8 (released more than 4 years ago), support APIs up to GStreamer 1.18 (to be released soon) and work with Rust 1.40 or newer.

Plugins

The biggest progress probably happened with the GStreamer Rust plugins.

There also was a new release last week, 0.6.0, which was the first release where selected plugins were also uploaded to the Rust package (“crate”) database crates.io. This makes it easy for Rust applications to embed any of these plugins statically instead of depending on them to be available on the system.

Overall there are now 40 GStreamer elements in 18 plugins by 28 contributors available as part of the gst-plugins-rs repository, one tutorial plugin with 4 elements and various plugins in external locations.

These 40 GStreamer elements are the following:

Audio
  • rsaudioecho: Port of the audioecho element from gst-plugins-good
  • rsaudioloudnorm: Live audio loudness normalization element based on the FFmpeg af_loudnorm filter
  • claxondec: FLAC lossless audio codec decoder element based on the pure-Rust claxon implementation
  • csoundfilter: Audio filter that can use any filter defined via the Csound audio programming language
  • lewtondec: Vorbis audio decoder element based on the pure-Rust lewton implementation
Video
  • cdgdec/cdgparse: Decoder and parser for the CD+G video codec based on a pure-Rust CD+G implementation, used for example by karaoke CDs
  • cea608overlay: CEA-608 Closed Captions overlay element
  • cea608tott: CEA-608 Closed Captions to timed-text (e.g. VTT or SRT subtitles) converter
  • tttocea608: CEA-608 Closed Captions from timed-text converter
  • mccenc/mccparse: MacCaption Closed Caption format encoder and parser
  • sccenc/sccparse: Scenarist Closed Caption format encoder and parser
  • dav1dec: AV1 video decoder based on the dav1d decoder implementation by the VLC project
  • rav1enc: AV1 video encoder based on the fast and pure-Rust rav1e encoder implementation
  • rsflvdemux: Alternative to the flvdemux FLV demuxer element from gst-plugins-good, not feature-equivalent yet
  • rsgifenc/rspngenc: GIF/PNG encoder elements based on the pure-Rust implementations by the image-rs project
Text
  • textwrap: Element for line-wrapping timed text (e.g. subtitles) for better screen-fitting, including hyphenation support for some languages
Network
  • reqwesthttpsrc: HTTP(S) source element based on the Rust reqwest/hyper HTTP implementations and almost feature-equivalent with the main GStreamer HTTP source souphttpsrc
  • s3src/s3sink: Source/sink element for the Amazon S3 cloud storage
  • awstranscriber: Live audio to timed text transcription element using the Amazon AWS Transcribe API
Generic
  • sodiumencrypter/sodiumdecrypter: Encryption/decryption element based on libsodium/NaCl
  • togglerecord: Recording element that allows to pause/resume recordings easily and considers keyframe boundaries
  • fallbackswitch/fallbacksrc: Elements for handling potentially failing (network) sources, restarting them on errors/timeout and showing a fallback stream instead
  • threadshare: Set of elements that provide alternatives for various existing GStreamer elements but allow to share the streaming threads between each other to reduce the number of threads
  • rsfilesrc/rsfilesink: File source/sink elements as replacements for the existing filesrc/filesink elements

The GTK Rust bindings are not ready yet? Yes they are!

When talking to various people at conferences in the last year or at conferences, a recurring topic was that they believed that the GTK Rust bindings are not ready for use yet.

I don’t know where that perception comes from but if it was true, there wouldn’t have been applications like Fractal, Podcasts or Shortwave using GTK from Rust, or I wouldn’t be able to do a workshop about desktop application development in Rust with GTK and GStreamer at the Linux Application Summit in Barcelona this Friday (code can be found here already) or earlier this year at GUADEC.

One reason I sometimes hear is that there is not support for creating subclasses of GTK types in Rust yet. While that was true, it is not true anymore nowadays. But even more important: unless you want to create your own special widgets, you don’t need that. Many examples and tutorials in other languages make use of inheritance/subclassing for the applications’ architecture, but that’s because it is the idiomatic pattern in those languages. However, in Rust other patterns are more idiomatic and even for those examples and tutorials in other languages it wouldn’t be the one and only option to design applications.

Almost everything is included in the bindings at this point, so seriously consider writing your next GTK UI application in Rust. While some minor features are still missing from the bindings, none of those should prevent you from successfully writing your application.

And if something is actually missing for your use-case or something is not working as expected, please let us know. We’d be happy to make your life easier!

P.S.

Some people are already experimenting with new UI development patterns on top of the GTK Rust bindings. So if you want to try developing an UI application but want to try something different than the usual signal/callback spaghetti code, also take a look at those.

MPSC Channel API for painless usage of threads with GTK in Rust

A very common question that comes up on IRC or elsewhere by people trying to use the gtk-rs GTK bindings in Rust is how to modify UI state, or more specifically GTK widgets, from another thread.

Due to GTK only allowing access to its UI state from the main thread and Rust actually enforcing this, unlike other languages, this is less trivial than one might expect. To make this as painless as possible, while also encouraging a more robust threading architecture based on message-passing instead of shared state, I’ve added some new API to the glib-rs bindings: An MPSC (multi-producer/single-consumer) channel very similar to (and based on) the one in the standard library but integrated with the GLib/GTK main loop.

While I’ll mostly write about this in the context of GTK here, this can also be useful in other cases when working with a GLib main loop/context from Rust to have a more structured means of communication between different threads than shared mutable state.

This will be part of the next release and you can find some example code making use of this at the very end. But first I’ll take this opportunity to also explain why it’s not so trivial in Rust first and also explain another solution.

Table of Contents

  1. The Problem
  2. One Solution: Safely working around the type system
  3. A better solution: Message passing via channels

The Problem

Let’s consider the example of an application that has to perform a complicated operation and would like to do this from another thread (as it should to not block the UI!) and in the end report back the result to the user. For demonstration purposes let’s take a thread that simply sleeps for a while and then wants to update a label in the UI with a new value.

Naively we might start with code like the following

let label = gtk::Label::new("not finished");
[...]
// Clone the label so we can also have it available in our thread.
// Note that this behaves like an Rc and only increases the
// reference count.
let label_clone = label.clone();
thread::spawn(move || {
    // Let's sleep for 10s
    thread::sleep(time::Duration::from_secs(10));

    label_clone.set_text("finished");
});

This does not compile and the compiler tells us (between a wall of text containing all the details) that the label simply can’t be sent safely between threads. Which is absolutely correct.

error[E0277]: <code>{{EJS73}}</code> cannot be sent between threads safely
  --> src/main.rs:28:5
   |
28 |     thread::spawn(move || {
   |     ^^^^^^^^^^^^^ <code>{{EJS74}}</code> cannot be sent between threads safely
   |
   = help: within <code>{{EJS75}}</code>, the trait <code>{{EJS76}}</code> is not implemented for <code>{{EJS77}}</code>
   = note: required because it appears within the type <code>{{EJS78}}</code>
   = note: required because it appears within the type <code>{{EJS79}}</code>
   = note: required because it appears within the type <code>{{EJS80}}</code>
   = note: required because it appears within the type <code>{{EJS81}}</code>
   = note: required by <code>{{EJS82}}</code>

In, e.g. C, this would not be a problem at all, the compiler does not know about GTK widgets and generally all GTK API to be only safely usable from the main thread, and would happily compile the above. It would the our (the programmer’s) job then to ensure that nothing is ever done with the widget from the other thread, other than passing it around. Among other things, it must also not be destroyed from that other thread (i.e. it must never have the last reference to it and then drop it).

One Solution: Safely working around the type system

So why don’t we do the same as we would do in C and simply pass around raw pointers to the label and do all the memory management ourselves? Well, that would defeat one of the purposes of using Rust and would require quite some unsafe code.

We can do better than that and work around Rust’s type system with regards to thread-safety and instead let the relevant checks (are we only ever using the label from the main thread?) be done at runtime instead. This allows for completely safe code, it might just panic at any time if we accidentally try to do something from wrong thread (like calling a function on it, or dropping it) and not just pass the label around.

The fragile crate provides a type called Fragile for exactly this purpose. It’s a wrapper type like Box, RefCell, Rc, etc. but it allows for any contained type to be safely sent between threads and on access does runtime checks if this is done correctly. In our example this would look like this

let label = gtk::Label::new("not finished");
[...]
// We wrap the label clone in the Fragile type here
// and move that into the new thread instead.
let label_clone = fragile::Fragile::new(label.clone());
thread::spawn(move || {
    // Let's sleep for 10s
    thread::sleep(time::Duration::from_secs(10));

    // To access the contained value, get() has
    // to be called and this is where the runtime
    // checks are happening
    label_clone.get().set_text("finished");
});

Not many changes to the code and it compiles… but at runtime we of course get a panic because we’re accessing the label from the wrong thread

thread '<unnamed>' panicked at 'trying to access wrapped value in fragile container from incorrect thread.', ~/.cargo/registry/src/github.com-1ecc6299db9ec823/fragile-0.3.0/src/fragile.rs:57:13

What we instead need to do here is to somehow defer the change of the label to the main thread, and GLib provides various API for doing exactly that. We’ll make use of the first one here but it’s mostly a matter of taste (and trait bounds: the former takes a FnOnce closure while the latter can be called multiple times and only takes FnMut because of that).

let label = gtk::Label::new("not finished");
[...]
// We wrap the label clone in the Fragile type here
// and move that into the new thread instead.
let label_clone = fragile::Fragile::new(label.clone());
thread::spawn(move || {
    // Let's sleep for 10s
    thread::sleep(time::Duration::from_secs(10));

    // Defer the label update to the main thread.
    // For this we get the default main context,
    // the one used by GTK on the main thread,
    // and use invoke() on it. The closure also
    // takes ownership of the label_clone and drops
    // it at the end. From the correct thread!
    glib::MainContext::default().invoke(move || {
        label_clone.get().set_text("finished");
    });
});

So far so good, this compiles and actually works too. But it feels kind of fragile, and that’s not only because of the name of the crate we use here. The label passed around in different threads is like a landmine only waiting to explode when we use it in the wrong way.

It’s also not very nice because now we conceptually share mutable state between different threads, which is the underlying cause for many thread-safety issues and generally increases complexity of the software considerable.

Let’s try to do better, Rust is all about fearless concurrency after all.

A better solution: Message passing via channels

As the title of this post probably made clear, the better solution is to use channels to do message passing. That’s also a pattern that is generally preferred in many other languages that focus a lot on concurrency, ranging from Erlang to Go, and is also the the recommended way of doing this according to the Rust Book.

So how would this look like? We first of all would have to create a Channel for communicating with our main thread.

As the main thread is running a GLib main loop with its corresponding main context (the loop is the thing that actually is… a loop, and the context is what keeps track of all potential event sources the loop has to handle), we can’t make use of the standard library’s MPSC channel. The Receiver blocks or we would have to poll in intervals, which is rather inefficient.

The futures MPSC channel doesn’t have this problem but requires a futures executor to run on the thread where we want to handle the messages. While the GLib main context also implements a futures executor and we could actually use it, this would pull in the futures crate and all its dependencies and might seem like too much if we only ever use it for message passing anyway. Otherwise, if you use futures also for other parts of your code, go ahead and use the futures MPSC channel instead. It basically works the same as what follows.

For creating a GLib main context channel, there are two functions available: glib::MainContext::channel() and glib::MainContext::sync_channel(). The latter takes a bound for the channel, after which sending to the Sender part will block until there is space in the channel again. Both are returning a tuple containing the Sender and Receiver for this channel, and especially the Sender is working exactly like the one from the standard library. It can be cloned, sent to different threads (as long as the message type of the channel can be) and provides basically the same API.

The Receiver works a bit different, and closer to the for_each() combinator on the futures Receiver. It provides an attach() function that attaches it to a specific main context, and takes a closure that is called from that main context whenever an item is available.

The other part that we need to define on our side then is how the messages should look like that we send through the channel. Usually some kind of enum with all the different kinds of messages you want to handle is a good choice, in our case it could also simply be () as we only have a single kind of message and without payload. But to make it more interesting, let’s add the new string of the label as payload to our messages.

This is how it could look like for example

enum Message {
    UpdateLabel(String),
}
[...]
let label = gtk::Label::new("not finished");
[...]
// Create a new sender/receiver pair with default priority
let (sender, receiver) = glib::MainContext::channel(glib::PRIORITY_DEFAULT);

// Spawn the thread and move the sender in there
thread::spawn(move || {
    thread::sleep(time::Duration::from_secs(10));

    // Sending fails if the receiver is closed
    let _ = sender.send(Message::UpdateLabel(String::from("finished")));
});

// Attach the receiver to the default main context (None)
// and on every message update the label accordingly.
let label_clone = label.clone();
receiver.attach(None, move |msg| {
    match msg {
        Message::UpdateLabel(text) => label_clone.set_text(text.as_str()),
    }

    // Returning false here would close the receiver
    // and have senders fail
    glib::Continue(true)
});

While this is a bit more code than the previous solution, it will also be more easy to maintain and generally allows for clearer code.

We keep all our GTK widgets inside the main thread now, threads only get access to a sender over which they can send messages to the main thread and the main thread handles these messages in whatever way it wants. There is no shared mutable state between the different threads here anymore, apart from the channel itself.

GStreamer Rust bindings 0.12 and GStreamer Plugin 0.3 release

After almost 6 months, a new release of the GStreamer Rust bindings and the GStreamer plugin writing infrastructure for Rust is out. As usual this was coinciding with the release of all the gtk-rs crates to make use of all the new features they contain.

Thanks to all the contributors of both gtk-rs and the GStreamer bindings for all the nice changes that happened over the last 6 months!

And as usual, if you find any bugs please report them and if you have any questions let me know.

GStreamer Bindings

For the full changelog check here.

Most changes this time were internally, especially because many user-facing changes (like Debug impls for various types) were already backported to the minor releases in the 0.11 release series.

WebRTC

The biggest change this time is probably the inclusion of bindings for the GStreamer WebRTC library.

This allows using building all kinds of WebRTC applications outside the browser (or providing a WebRTC implementation for a browser), and while not as full-featured as Google’s own implementation, this interoperates well with the various browsers and generally works much better on embedded devices.

A small example application in Rust is available here.

Serde

Optionally, serde trait implementations for the Serialize and Deserialize trait can be enabled for various fundamental GStreamer types, including caps, buffers, events, messages and tag lists. This allows serializing them into any format that can be handled by serde (which are many!), and deserializing them back to normal Rust structs.

Generic Tag API

Previously only a strongly-typed tag API was exposed that made it impossible to use the wrong data type for a specific tag, e.g. code that tries to store a string for the track number or an integer for the title would simply not compile:

let mut tags = gst::TagList::new();
{
    let tags = tags.get_mut().unwrap();
    tags.add::<Title>(&"some title", gst::TagMergeMode::Append);
    tags.add::<TrackNumber>(&12, gst::TagMergeMode::Append);
}

While this is convenient, it made it rather complicated to work with tag lists if you only wanted to handle them in a generic way. For example by iterating over the tag list and simply checking what kind of tags are available. To solve that, a new generic API was added in addition. This works on glib::Values, which can store any kind of type, and using the wrong type for a specific tag would simply cause an error at runtime instead of compile-time.

let mut tags = gst::TagList::new();
{
    let tags = tags.get_mut().unwrap();
    tags.add_generic(&gst::tags::TAG_TITLE, &"some title", gst::TagMergeMode::Append)
.expect("wrong type for title tag");
    tags.add_generic(&gst::tags::TAG_TRACK_NUMBER, &12, gst::TagMergeMode::Append)
.expect("wrong type for track number tag");
}

This also greatly simplified the serde serialization/deserialization for tag lists.

GStreamer Plugins

For the full changelog check here.

gobject-subclass

The main change this time is that all the generic GObject subclassing infrastructure was moved out of the gst-plugin crate and moved to its own gobject-subclass crate as part of the gtk-rs organization.

As part of this, some major refactoring has happened that allows subclassing more different types but also makes it simpler to add new types. There are also experimental crates for adding some subclassing support to gio and gtk, and a PR for autogenerating part of the code via the gir code generator.

More classes!

The other big addition this time is that it’s now possible to subclass GStreamer Pads and GhostPads, to implement the ChildProxy interface and to subclass the Aggregator and AggregatorPad class.

This now allows to write custom mixer/muxer-style elements (or generally elements that have multiple sink pads) in Rust via the Aggregator base class, and to have custom pad types for elements to allow for setting custom properties on the pads (e.g. to control the opacity of a single video mixer input).

There is currently no example for such an element, but I’ll add a very simple video mixer to the repository some time in the next weeks and will also write a blog post about it for explaining all the steps.

GLib/GIO async operations and Rust futures + async/await

Unfortunately I was not able to attend the Rust+GNOME hackfest in Madrid last week, but I could at least spend some of my work time at Centricular on implementing one of the things I wanted to work on during the hackfest. The other one, more closely related to the gnome-class work, will be the topic of a future blog post once I actually have something to show.

So back to the topic. With the latest GIT version of the Rust bindings for GLib, GTK, etc it is now possible to make use of the Rust futures infrastructure for GIO async operations and various other functions. This should make writing of GNOME, and in general GLib-using, applications in Rust quite a bit more convenient.

For the impatient, the summary is that you can use Rust futures with GLib and GIO now, that it works both on the stable and nightly version of the compiler, and with the nightly version of the compiler it is also possible to use async/await. An example with the latter can be found here, and an example just using futures without async/await here.

Table of Contents

  1. Futures
    1. Futures in Rust
    2. Async/Await
    3. Tokio
  2. Futures & GLib/GIO
    1. Callbacks
    2. GLib Futures
    3. GIO Asynchronous Operations
    4. Async/Await
  3. The Future

Futures

First of all, what are futures and how do they work in Rust. In a few words, a future (also called promise elsewhere) is a value that represents the result of an asynchronous operation, e.g. establishing a TCP connection. The operation itself (usually) runs in the background, and only once the operation is finished (or fails), the future resolves to the result of that operation. There are all kinds of ways to combine futures, e.g. to execute some other (potentially async) code with the result once the first operation has finished.

It’s a concept that is also widely used in various other programming languages (e.g. C#, JavaScript, Python, …) for asynchronous programming and can probably be considered a proven concept at this point.

Futures in Rust

In Rust, a future is basically an implementation of relatively simple trait called Future. The following is the definition as of now, but there are discussions to change/simplify/generalize it currently and to also move it to the Rust standard library:

pub trait Future {
    type Item;
    type Error;

    fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error>;
}

Anything that implements this trait can be considered an asynchronous operation that resolves to either an Item or an Error. Consumers of the future would call the poll method to check if the future has resolved already (to a result or error), or if the future is not ready yet. In case of the latter, the future itself would at a later point, once it is ready to proceed, notify the consumer about that. It would get a way for notifications from the Context that is passed, and proceeding does not necessarily mean that the future will resolve after this but it could just advance its internal state closer to the final resolution.

Calling poll manually is kind of inconvenient, so generally this is handled by an Executor on which the futures are scheduled and which is running them until their resolution. Equally, it’s inconvenient to have to implement that trait directly so for most common operations there are combinators that can be used on futures to build new futures, usually via closures in one way or another. For example the following would run the passed closure with the successful result of the future, and then have it return another future (Ok(()) is converted via IntoFuture to the future that always resolves successfully with ()), and also maps any errors to ()

fn our_future() -> impl Future<Item = (), Err = ()> {
    some_future
        .and_then(|res| {
            do_something(res);
            Ok(())
        })
        .map_err(|_| ())
}

A future represents only a single value, but there is also a trait for something producing multiple values: a Stream. For more details, best to check the documentation.

Async/Await

The above way of combining futures via combinators and closures is still not too great, and is still close to callback hell. In other languages (e.g. C#, JavaScript, Python, …) this was solved by introducing new features to the language: async for declaring futures with normal code flow, and await for suspending execution transparently and resuming at that point in the code with the result of a future.

Of course this was also implemented in Rust. Currently based on procedural macros, but there are discussions to actually move this also directly into the language and standard library.

The above example would look something like the following with the current version of the macros

#[async]
fn our_future() -> Result<(), ()> {
    let res = await!(some_future)
        .map_err(|_| ())?;

    do_something(res);
    Ok(())
}

This looks almost like normal, synchronous code but is internally converted into a future and completely asynchronous.

Unfortunately this is currently only available on the nightly version of Rust until various bits and pieces get stabilized.

Tokio

Most of the time when people talk about futures in Rust, they implicitly also mean Tokio. Tokio is a pure Rust, cross-platform asynchronous IO library and based on the futures abstraction above. It provides a futures executor and various types for asynchronous IO, e.g. sockets and socket streams.

But while Tokio is a great library, we’re not going to use it here and instead implement a futures executor around GLib. And on top of that implement various futures, also around GLib’s sister library GIO, which is providing lots of API for synchronous and asynchronous IO.

Just like all IO operations in Tokio, all GLib/GIO asynchronous operations are dependent on running with their respective event loop (i.e. the futures executor) and while it’s possible to use both in the same process, each operation has to be scheduled on the correct one.

Futures & GLib/GIO

Asynchronous operations and generally everything event related (timeouts, …) are based on callbacks that you have to register, and are running via a GMainLoop that is executing events from a GMainContext. The latter is just something that stores everything that is scheduled and provides API for polling if something is ready to be executed now, while the former does exactly that: executing.

Callbacks

The callback based API is also available via the Rust bindings, and would for example look as follows

glib::timeout_add(20, || {
    do_something_after_20ms();
    glib::Continue(false) // don't call again
});

glib::idle_add(|| {
    do_something_from_the_main_loop();
    glib::Continue(false) // don't call again
});

some_async_operation(|res| {
    match res {
        Err(err) => report_error_somehow(),
        Ok(res) => {
            do_something_with_result(res);
            some_other_async_operation(|res| {
                do_something_with_other_result(res);
            });
        }
    }
});

As can be seen here already, the callback-based approach leads to quite non-linear code and deep indentation due to all the closures. Also error handling becomes quite tricky due to somehow having handle them from a completely different call stack.

Compared to C this is still far more convenient due to actually having closures that can capture their environment, but we can definitely do better in Rust.

The above code also assumes that somewhere a main loop is running on the default main context, which could be achieved with the following e.g. inside main()

let ctx = glib::MainContext::default();
let l = glib::MainLoop::new(Some(&ctx), false);
ctx.push_thread_default();

// All operations here would be scheduled on this main context
do_things(&l);

// Run everything until someone calls l.quit()
l.run();
ctx.pop_thread_default();

It is also possible to explicitly select for various operations on which main context they should run, but that’s just a minor detail.

GLib Futures

To make this situation a bit nicer, I’ve implemented support for futures in the Rust bindings. This means, that the GLib MainContext is now a futures executor (and arbitrary futures can be scheduled on it), all the GSource related operations in GLib (timeouts, UNIX signals, …) have futures- or stream-based variants and all the GIO asynchronous operations also come with futures variants now. The latter are autogenerated with the gir bindings code generator.

For enabling usage of this, the futures feature of the glib and gio crates have to be enabled, but that’s about it. It is currently still hidden behind a feature gate because the futures infrastructure is still going to go through some API incompatible changes in the near future.

So let’s take a look at how to use it. First of all, setting up the main context and executing a trivial future on it

let c = glib::MainContext::default();
let l = glib::MainLoop::new(Some(&c), false);

c.push_thread_default();

// Spawn a future that is called from the main context
// and after printing something just quits the main loop
let l_clone = l.clone();
c.spawn(futures::lazy(move |_| {
    println!("we're called from the main context");
    l_clone.quit();
    Ok(())
});

l.run();

c.pop_thread_default();

Apart from spawn(), there is also a spawn_local(). The former can be called from any thread but requires the future to implement the Send trait (that is, it must be safe to send it to other threads) while the latter can only be called from the thread that owns the main context but it allows any kind of future to be spawned. In addition there is also a block_on() function on the main context, which allows to run non-static futures up to their completion and returns their result. The spawn functions only work with static futures (i.e. they have no references to any stack frame) and requires the futures to be infallible and resolve to ().

The above code already showed one of the advantages of using futures: it is possible to use all generic futures (that don’t require a specific executor), like futures::lazy or the mpsc/oneshot channels with GLib now. And any of the combinators that are available on futures

let c = MainContext::new();
                                                                                                       
let res = c.block_on(timeout_future(20)
    .and_then(move |_| {
        // Called after 20ms
        Ok(1)
    })
);

assert_eq!(res, Ok(1));

This example also shows the block_on functionality to return an actual value from the future (1 in this case).

GIO Asynchronous Operations

Similarly, all asynchronous GIO operations are now available as futures. For example to open a file asynchronously and getting a gio::InputStream to read from, the following could be done

let file = gio::File::new_for_path("Cargo.toml");

let l_clone = l.clone();
c.spawn_local(
    // Try to open the file
    file.read_async_future(glib::PRIORITY_DEFAULT)
        .map_err(|(_file, err)| {
            format!("Failed to open file: {}", err)
        })
        .and_then(move |(_file, strm)| {
            // Here we could now read from the stream, but
            // instead we just quit the main loop
            l_clone.quit();

            Ok(())
        })
);

A bigger example can be found in the gtk-rs examples repository here. This example is basically reading a file asynchronously in 64 byte chunks and printing it to stdout, then closing the file.

In the same way, network operations or any other asynchronous operation can be handled via futures now.

Async/Await

Compared to a callback-based approach, that bigger example is already a lot nicer but still quite heavy to read. With the async/await extension that I mentioned above already, the code looks much nicer in comparison and really almost like synchronous code. Except that it is not synchronous.

#[async]
fn read_file(file: gio::File) -> Result<(), String> {
    // Try to open the file
    let (_file, strm) = await!(file.read_async_future(glib::PRIORITY_DEFAULT))
        .map_err(|(_file, err)| format!("Failed to open file: {}", err))?;

    Ok(())
}

fn main() {
    [...]
    let future = async_block! {
        match await!(read_file(file)) {
            Ok(()) => (),
            Err(err) => eprintln!("Got error: {}", err),
        }
        l_clone.quit();
        Ok(())
    };

    c.spawn_local(future);
    [...]
}

For compiling this code, the futures-nightly feature has to be enabled for the glib crate, and a nightly compiler must be used.

The bigger example from before with async/await can be found here.

With this we’re already very close in Rust to having the same convenience as in other languages with asynchronous programming. And also it is very similar to what is possible in Vala with GIO asynchronous operations.

The Future

For now this is all finished and available from GIT of the glib and gio crates. This will have to be updated in the future whenever the futures API is changing, but it is planned to stabilize all this in Rust until the end of this year.

In the future it might also make sense to add futures variants for all the GObject signal handlers, so that e.g. handling a click on a GTK+ button could be done similarly from a future (or rather from a Stream as a signal can be emitted multiple times). If this is in the end more convenient than the callback-based approach that is currently used, is to be seen. Some experimentation would be necessary here. Also how to handle return values of signal handlers would have to be figured out.

Improving GStreamer performance on a high number of network streams by sharing threads between elements with Rust’s tokio crate

For one of our customers at Centricular we were working on a quite interesting project. Their use-case was basically to receive an as-high-as-possible number of audio RTP streams over UDP, transcode them, and then send them out via UDP again. Due to how GStreamer usually works, they were running into some performance issues.

This blog post will describe the first set of improvements that were implemented for this use-case, together with a minimal benchmark and the results. My colleague Mathieu will follow up with one or two other blog posts with the other improvements and a more full-featured benchmark.

The short version is that CPU usage decreased by about 65-75%, i.e. allowing 3-4x more streams with the same CPU usage. Also parallelization works better and usage of different CPU cores is more controllable, allowing for better scalability. And a fixed, but configurable number of threads is used, which is independent of the number of streams.

The code for this blog post can be found here.

Table of Contents

  1. GStreamer & Threads
  2. Thread-Sharing GStreamer Elements
  3. Available Elements
  4. Little Benchmark
  5. Conclusion

GStreamer & Threads

In GStreamer, by default each source is running from its own OS thread. Additionally, for receiving/sending RTP, there will be another thread in the RTP jitterbuffer, yet another thread for receiving RTCP (another source) and a last thread for sending RTCP at the right times. And RTCP has to be received and sent for the receiver and sender side part of the pipeline, so the number of threads doubles. In the sum this gives at least 1 + 1 + (1 + 1) * 2 = 6 threads per RTP stream in this scenario. In a normal audio scenario, there will be one packet received/sent e.g. every 20ms on each stream, and every now and then an RTCP packet. So most of the time all these threads are only waiting.

Apart from the obvious waste of OS resources (1000 streams would be 6000 threads), this also brings down performance as all the time threads are being woken up. This means that context switches have to happen basically all the time.

To solve this we implemented a mechanism to share threads, and in the end as a result we have a fixed, but configurable number of threads that is independent from the number of streams. And can run e.g. 500 streams just fine on a single thread with a single core, which was completely impossible before. In addition we also did some work to reduce the number of allocations for each packet, so that after startup no additional allocations happen per packet anymore for buffers. See Mathieu’s upcoming blog post for details.

In this blog post, I’m going to write about a generic mechanism for sources, queues and similar elements to share their threads between each other. For the RTP related bits (RTP jitterbuffer and RTCP timer) this was not used due to reuse of existing C codebases.

Thread-Sharing GStreamer Elements

The code in question can be found here, a small benchmark is in the examples directory and it is going to be used for the results later. A full-featured benchmark will come in Mathieu’s blog post.

This is a new GStreamer plugin, written in Rust and around the Tokio crate for asynchronous IO and generally a “task scheduler”.

While this could certainly also have been written in C around something like libuv, doing this kind of work in Rust is simply more productive and fun due to its safety guarantees and the strong type system, which definitely reduced the amount of debugging a lot. And in addition “modern” language features like closures, which make working with futures much more ergonomic.

When using these elements it is important to have full control over the pipeline and its elements, and the dataflow inside the pipeline has to be carefully considered to properly configure how to share threads. For example the following two restrictions should be kept in mind all the time:

  1. Downstream of such an element, the streaming thread must never ever block for considerable amounts of time. Otherwise all other elements inside the same thread-group would be blocked too, even if they could do any work now
  2. This generally all works better in live pipelines, where media is produced in real-time and not as fast as possible

Available Elements

So this repository currently contains the generic infrastructure (see the src/iocontext.rs source file) and a couple of elements:

  • an UDP source: ts-udpsrc, a replacement for udpsrc
  • an app source: ts-appsrc, a replacement for appsrc to inject packets into the pipeline from the application
  • a queue: ts-queue, a replacement for queue that is useful for adding buffering to a pipeline part. The upstream side of the queue will block if not called from another thread-sharing element, but if called from another thread-sharing element it will pause the current task asynchronously. That is, stop the upstream task from producing more data.
  • a proxysink/src element: ts-proxysrc, ts-proxysink, replacements for proxysink/proxysrc for connecting two pipelines with each other. This basically works like the queue, but split into two elements.
  • a tone generator source around spandsp: ts-tonesrc, a replacement for tonegeneratesrc. This also contains some minimal FFI bindings for that part of the spandsp C library.

All these elements have more or less the same API as their non-thread-sharing counterparts.

API-wise, each of these elements has a set of properties for controlling how it is sharing threads with other elements, and with which elements:

  • context: A string that defines in which group this element is. All elements with the same context are running on the same thread or group of threads,
  • context-threads: Number of threads to use in this context. -1 means exactly one thread, 1 and above used N+1 threads (1 thread for polling fds, N worker threads) and 0 sets N to the number of available CPU cores. As long as no considerable work is done in these threads, -1 has shown to be the most efficient. See also this tokio GitHub issue
  • context-wait: Number of milliseconds that the threads will wait on each iteration. This allows to reduce CPU usage even further by handling all events/packets that arrived during that timespan to be handled all at once instead of waking up the thread every time a little event happens, thus reducing context switches again

The elements are all pushing data downstream from a tokio thread whenever data is available, assuming that downstream does not block. If downstream is another thread-sharing element and it would have to block (e.g. a full queue), it instead returns a new future to upstream so that upstream can asynchronously wait on that future before producing more output. By this, back-pressure is implemented between different GStreamer elements without ever blocking any of the tokio threads. All this is implemented around the normal GStreamer data-flow mechanisms, there is no “tokio fast-path” between elements.

Little Benchmark

As mentioned above, there’s a small benchmark application in the examples directory. This basically sets up a configurable number of streams and directly connects them to a fakesink, throwing away all packets. Additionally there is another thread that is sending all these packets. As such, this is really the most basic benchmark and not very realistic but nonetheless it shows the same performance improvement as the real application. Again, see Mathieu’s upcoming blog post for a more realistic and complete benchmark.

When running it, make sure that your user can create enough fds. The benchmark will just abort if not enough fds can be allocated. You can control this with ulimit -n SOME_NUMBER, and allowing a couple of thousands is generally a good idea. The benchmarks below were running with 10000.

After running cargo build –release to build the plugin itself, you can run the benchmark with:

cargo run --release --example udpsrc-benchmark -- 1000 ts-udpsrc -1 1 20

and in another shell the UDP sender with

cargo run --release --example udpsrc-benchmark-sender -- 1000

This runs 1000 streams, uses ts-udpsrc (alternative would be udpsrc), configures exactly one thread -1, 1 context, and a wait time of 20ms. See above for what these settings mean. You can check CPU usage with e.g. top. Testing was done on an Intel i7-4790K, with Rust 1.25 and GStreamer 1.14. One packet is sent every 20ms for each stream.

Source Streams Threads Contexts Wait CPU
udpsrc 1000 1000 x x 44%
ts-udpsrc 1000 -1 1 0 18%
ts-udpsrc 1000 -1 1 20 13%
ts-udpsrc 1000 -1 2 20 15%
ts-udpsrc 1000 2 1 20 16%
ts-udpsrc 1000 2 2 20 27%
Source Streams Threads Contexts Wait CPU
udpsrc 2000 2000 x x 95%
ts-udpsrc 2000 -1 1 20 29%
ts-udpsrc 2000 -1 2 20 31%
Source Streams Threads Contexts Wait CPU
ts-udpsrc 3000 -1 1 20 36%
ts-udpsrc 3000 -1 2 20 47%

Results for 3000 streams for the old udpsrc are not included as starting up that many threads needs too long.

The best configuration is apparently a single thread per context (see this tokio GitHub issue) and waiting 20ms for every iterations. Compared to the old udpsrc, CPU usage is about one third in that setting, and generally it seems to parallelize well. It’s not clear to me why the last test has 11% more CPU with two contexts, while in every other test the number of contexts does not really make a difference, and also not for that many streams in the real test-case.

The waiting does not reduce CPU usage a lot in this benchmark, but on the real test-case it does. The reason is most likely that this benchmark basically sends all packets at once, then waits for the remaining time, then sends the next packets.

Take these numbers with caution, the real test-case in Mathieu’s blog post will show the improvements in the bigger picture, where it was generally a quarter of CPU usage and almost perfect parallelization when increasing the number of contexts.

Conclusion

Generally this was a fun exercise and we’re quite happy with the results, especially the real results. It took me some time to understand how tokio works internally so that I can implement all kinds of customizations on top of it, but for normal usage of tokio that should not be required and the overall design makes a lot of sense to me, as well as the way how futures are implemented in Rust. It requires some learning and understanding how exactly the API can be used and behaves, but once that point is reached it seems like a very productive and performant solution for asynchronous IO. And modelling asynchronous IO problems based on the Rust-style futures seems a nice and intuitive fit.

The performance measurements also showed that GStreamer’s default usage of threads is not always optimal, and a model like in upipe or pipewire (or rather SPA) can provide better performance. But as this also shows, it is possible to implement something like this on top of GStreamer and for the common case, using threads like in GStreamer reduces the cognitive load on the developer a lot.

For a future version of GStreamer, I don’t think we should make the threading “manual” like in these two other projects, but instead provide some API additions that make it nicer to implement thread-sharing elements and to add ways in the GStreamer core to make streaming threads non-blocking. All this can be implemented already, but it could be nicer.

All this “only” improved the number of threads, and thus the threading and context switching overhead. Many other optimizations in other areas are still possible on top of this, for example optimizing receive performance and reducing the number of memory copies inside the pipeline even further. If that’s something you would be interested in, feel free to get in touch.

And with that: Read Mathieu’s upcoming blog posts about the other parts, RTP jitterbuffer / RTCP timer thread sharing, and no allocations, and the full benchmark.

GStreamer Rust bindings 0.11 / plugin writing infrastructure 0.2 release

Following the GStreamer 1.14 release and the new round of gtk-rs releases, there are also new releases for the GStreamer Rust bindings (0.11) and the plugin writing infrastructure (0.2).

Thanks also to all the contributors for making these releases happen and adding lots of valuable changes and API additions.

GStreamer Rust Bindings

The main changes in the Rust bindings were the update to GStreamer 1.14 (which brings in quite some new API, like GstPromise), a couple of API additions (GstBufferPool specifically) and the addition of the GstRtspServer and GstPbutils crates. The former allows writing a full RTSP server in a couple of lines of code (with lots of potential for customizations), the latter provides access to the GstDiscoverer helper object that allows inspecting files and streams for their container format, codecs, tags and all kinds of other metadata.

The GstPbutils crate will also get other features added in the near future, like encoding profile bindings to allow using the encodebin GStreamer element (a helper element for automatically selecting/configuring encoders and muxers) from Rust.

But the biggest changes in my opinion is some refactoring that was done to the Event, Message and Query APIs. Previously you would have to use a view on a newly created query to be able to use the type-specific functions on it

let mut q = gst::Query::new_position(gst::Format::Time);
if pipeline.query(q.get_mut().unwrap()) {
    match q.view() {
        QueryView::Position(ref p) => Some(p.get_result()),
        _ => None,
    }
} else {
    None
}

Now you can directly use the type-specific functions on a newly created query

let mut q = gst::Query::new_position(gst::Format::Time);
if pipeline.query(&mut q) {
    Some(q.get_result())
} else {
    None
}

In addition, the views can now dereference directly to the event/message/query itself and provide access to their API, which simplifies some code even more.

Plugin Writing Infrastructure

While the plugin writing infrastructure did not see that many changes apart from a couple of bugfixes and updating to the new versions of everything else, this does not mean that development on it stalled. Quite the opposite. The existing code works very well already and there was just no need for adding anything new for the projects I and others did on top of it, most of the required API additions were in the GStreamer bindings.

So the status here is the same as last time, get started writing GStreamer plugins in Rust. It works well!

How to write GStreamer Elements in Rust Part 2: A raw audio sine wave source

A bit later than anticipated, this is now part two of the blog post series about writing GStreamer elements in Rust. Part one can be found here, and I’ll assume that everything written there is known already.

In this part, a raw audio sine wave source element is going to be written. It will be similar to the one Mathieu was writing in his blog post about writing such a GStreamer element in Python. Various details will be different though, but more about that later.

The final code can be found here.

Table of Contents

  1. Boilerplate
  2. Caps Negotiation
  3. Query Handling
  4. Buffer Creation
  5. (Pseudo) Live Mode
  6. Unlocking
  7. Seeking

Boilerplate

The first part here will be all the boilerplate required to set up the element. You can safely skip this if you remember all this from the previous blog post.

Our sine wave element is going to produce raw audio, with a number of channels and any possible sample rate with both 32 bit and 64 bit floating point samples. It will produce a simple sine wave with a configurable frequency, volume/mute and number of samples per audio buffer. In addition it will be possible to configure the element in (pseudo) live mode, meaning that it will only produce data in real-time according to the pipeline clock. And it will be possible to seek to any time/sample position on our source element. It will basically be a more simply version of the audiotestsrc element from gst-plugins-base.

So let’s get started with all the boilerplate. This time our element will be based on the BaseSrc base class instead of BaseTransform.

use glib;
use gst;
use gst::prelude::*;
use gst_base::prelude::*;
use gst_audio;

use byte_slice_cast::*;

use gst_plugin::properties::*;
use gst_plugin::object::*;
use gst_plugin::element::*;
use gst_plugin::base_src::*;

use std::{i32, u32};
use std::sync::Mutex;
use std::ops::Rem;

use num_traits::float::Float;
use num_traits::cast::NumCast;

// Default values of properties
const DEFAULT_SAMPLES_PER_BUFFER: u32 = 1024;
const DEFAULT_FREQ: u32 = 440;
const DEFAULT_VOLUME: f64 = 0.8;
const DEFAULT_MUTE: bool = false;
const DEFAULT_IS_LIVE: bool = false;

// Property value storage
#[derive(Debug, Clone, Copy)]
struct Settings {
    samples_per_buffer: u32,
    freq: u32,
    volume: f64,
    mute: bool,
    is_live: bool,
}

impl Default for Settings {
    fn default() -> Self {
        Settings {
            samples_per_buffer: DEFAULT_SAMPLES_PER_BUFFER,
            freq: DEFAULT_FREQ,
            volume: DEFAULT_VOLUME,
            mute: DEFAULT_MUTE,
            is_live: DEFAULT_IS_LIVE,
        }
    }
}

// Metadata for the properties
static PROPERTIES: [Property; 5] = [
    Property::UInt(
        "samples-per-buffer",
        "Samples Per Buffer",
        "Number of samples per output buffer",
        (1, u32::MAX),
        DEFAULT_SAMPLES_PER_BUFFER,
        PropertyMutability::ReadWrite,
    ),
    Property::UInt(
        "freq",
        "Frequency",
        "Frequency",
        (1, u32::MAX),
        DEFAULT_FREQ,
        PropertyMutability::ReadWrite,
    ),
    Property::Double(
        "volume",
        "Volume",
        "Output volume",
        (0.0, 10.0),
        DEFAULT_VOLUME,
        PropertyMutability::ReadWrite,
    ),
    Property::Boolean(
        "mute",
        "Mute",
        "Mute",
        DEFAULT_MUTE,
        PropertyMutability::ReadWrite,
    ),
    Property::Boolean(
        "is-live",
        "Is Live",
        "(Pseudo) live output",
        DEFAULT_IS_LIVE,
        PropertyMutability::ReadWrite,
    ),
];

// Stream-specific state, i.e. audio format configuration
// and sample offset
struct State {
    info: Option<gst_audio::AudioInfo>,
    sample_offset: u64,
    sample_stop: Option<u64>,
    accumulator: f64,
}

impl Default for State {
    fn default() -> State {
        State {
            info: None,
            sample_offset: 0,
            sample_stop: None,
            accumulator: 0.0,
        }
    }
}

// Struct containing all the element data
struct SineSrc {
    cat: gst::DebugCategory,
    settings: Mutex<Settings>,
    state: Mutex<State>,
}

impl SineSrc {
    // Called when a new instance is to be created
    fn new(element: &BaseSrc) -> Box<BaseSrcImpl<BaseSrc>> {
        // Initialize live-ness and notify the base class that
        // we'd like to operate in Time format
        element.set_live(DEFAULT_IS_LIVE);
        element.set_format(gst::Format::Time);

        Box::new(Self {
            cat: gst::DebugCategory::new(
                "rssinesrc",
                gst::DebugColorFlags::empty(),
                "Rust Sine Wave Source",
            ),
            settings: Mutex::new(Default::default()),
            state: Mutex::new(Default::default()),
        })
    }

    // Called exactly once when registering the type. Used for
    // setting up metadata for all instances, e.g. the name and
    // classification and the pad templates with their caps.
    //
    // Actual instances can create pads based on those pad templates
    // with a subset of the caps given here. In case of basesrc,
    // a "src" and "sink" pad template are required here and the base class
    // will automatically instantiate pads for them.
    //
    // Our element here can output f32 and f64
    fn class_init(klass: &mut BaseSrcClass) {
        klass.set_metadata(
            "Sine Wave Source",
            "Source/Audio",
            "Creates a sine wave",
            "Sebastian Dröge <sebastian@centricular.com>",
        );

        // On the src pad, we can produce F32/F64 with any sample rate
        // and any number of channels
        let caps = gst::Caps::new_simple(
            "audio/x-raw",
            &[
                (
                    "format",
                    &gst::List::new(&[
                        &gst_audio::AUDIO_FORMAT_F32.to_string(),
                        &gst_audio::AUDIO_FORMAT_F64.to_string(),
                    ]),
                ),
                ("layout", &"interleaved"),
                ("rate", &gst::IntRange::<i32>::new(1, i32::MAX)),
                ("channels", &gst::IntRange::<i32>::new(1, i32::MAX)),
            ],
        );
        // The src pad template must be named "src" for basesrc
        // and specific a pad that is always there
        let src_pad_template = gst::PadTemplate::new(
            "src",
            gst::PadDirection::Src,
            gst::PadPresence::Always,
            &caps,
        );
        klass.add_pad_template(src_pad_template);

        // Install all our properties
        klass.install_properties(&PROPERTIES);
    }
}

impl ObjectImpl<BaseSrc> for SineSrc {
    // Called whenever a value of a property is changed. It can be called
    // at any time from any thread.
    fn set_property(&self, obj: &glib::Object, id: u32, value: &glib::Value) {
        let prop = &PROPERTIES[id as usize];
        let element = obj.clone().downcast::<BaseSrc>().unwrap();

        match *prop {
            Property::UInt("samples-per-buffer", ..) => {
                let mut settings = self.settings.lock().unwrap();
                let samples_per_buffer = value.get().unwrap();
                gst_info!(
                    self.cat,
                    obj: &element,
                    "Changing samples-per-buffer from {} to {}",
                    settings.samples_per_buffer,
                    samples_per_buffer
                );
                settings.samples_per_buffer = samples_per_buffer;
                drop(settings);

                let _ =
                    element.post_message(&gst::Message::new_latency().src(Some(&element)).build());
            }
            Property::UInt("freq", ..) => {
                let mut settings = self.settings.lock().unwrap();
                let freq = value.get().unwrap();
                gst_info!(
                    self.cat,
                    obj: &element,
                    "Changing freq from {} to {}",
                    settings.freq,
                    freq
                );
                settings.freq = freq;
            }
            Property::Double("volume", ..) => {
                let mut settings = self.settings.lock().unwrap();
                let volume = value.get().unwrap();
                gst_info!(
                    self.cat,
                    obj: &element,
                    "Changing volume from {} to {}",
                    settings.volume,
                    volume
                );
                settings.volume = volume;
            }
            Property::Boolean("mute", ..) => {
                let mut settings = self.settings.lock().unwrap();
                let mute = value.get().unwrap();
                gst_info!(
                    self.cat,
                    obj: &element,
                    "Changing mute from {} to {}",
                    settings.mute,
                    mute
                );
                settings.mute = mute;
            }
            Property::Boolean("is-live", ..) => {
                let mut settings = self.settings.lock().unwrap();
                let is_live = value.get().unwrap();
                gst_info!(
                    self.cat,
                    obj: &element,
                    "Changing is-live from {} to {}",
                    settings.is_live,
                    is_live
                );
                settings.is_live = is_live;
            }
            _ => unimplemented!(),
        }
    }

    // Called whenever a value of a property is read. It can be called
    // at any time from any thread.
    fn get_property(&self, _obj: &glib::Object, id: u32) -> Result<glib::Value, ()> {
        let prop = &PROPERTIES[id as usize];

        match *prop {
            Property::UInt("samples-per-buffer", ..) => {
                let settings = self.settings.lock().unwrap();
                Ok(settings.samples_per_buffer.to_value())
            }
            Property::UInt("freq", ..) => {
                let settings = self.settings.lock().unwrap();
                Ok(settings.freq.to_value())
            }
            Property::Double("volume", ..) => {
                let settings = self.settings.lock().unwrap();
                Ok(settings.volume.to_value())
            }
            Property::Boolean("mute", ..) => {
                let settings = self.settings.lock().unwrap();
                Ok(settings.mute.to_value())
            }
            Property::Boolean("is-live", ..) => {
                let settings = self.settings.lock().unwrap();
                Ok(settings.is_live.to_value())
            }
            _ => unimplemented!(),
        }
    }
}

// Virtual methods of gst::Element. We override none
impl ElementImpl<BaseSrc> for SineSrc { }

impl BaseSrcImpl<BaseSrc> for SineSrc {
    // Called when starting, so we can initialize all stream-related state to its defaults
    fn start(&self, element: &BaseSrc) -> bool {
        // Reset state
        *self.state.lock().unwrap() = Default::default();

        gst_info!(self.cat, obj: element, "Started");

        true
    }

    // Called when shutting down the element so we can release all stream-related state
    fn stop(&self, element: &BaseSrc) -> bool {
        // Reset state
        *self.state.lock().unwrap() = Default::default();

        gst_info!(self.cat, obj: element, "Stopped");

        true
    }
}

struct SineSrcStatic;

// The basic trait for registering the type: This returns a name for the type and registers the
// instance and class initializations functions with the type system, thus hooking everything
// together.
impl ImplTypeStatic<BaseSrc> for SineSrcStatic {
    fn get_name(&self) -> &str {
        "SineSrc"
    }

    fn new(&self, element: &BaseSrc) -> Box<BaseSrcImpl<BaseSrc>> {
        SineSrc::new(element)
    }

    fn class_init(&self, klass: &mut BaseSrcClass) {
        SineSrc::class_init(klass);
    }
}

// Registers the type for our element, and then registers in GStreamer under
// the name "sinesrc" for being able to instantiate it via e.g.
// gst::ElementFactory::make().
pub fn register(plugin: &gst::Plugin) {
    let type_ = register_type(SineSrcStatic);
    gst::Element::register(plugin, "rssinesrc", 0, type_);
}

If any of this needs explanation, please see the previous blog post and the comments in the code. The explanation for all the structs fields and what they’re good for will follow in the next sections.

With all of the above and a small addition to src/lib.rs this should compile now.

mod sinesrc;
[...]

fn plugin_init(plugin: &gst::Plugin) -> bool {
    [...]
    sinesrc::register(plugin);
    true
}

Also a couple of new crates have to be added to Cargo.toml and src/lib.rs, but you best check the code in the repository for details.

Caps Negotiation

The first part that we have to implement, just like last time, is caps negotiation. We already notified the base class about any caps that we can potentially handle via the caps in the pad template in class_init but there are still two more steps of behaviour left that we have to implement.

First of all, we need to get notified whenever the caps that our source is configured for are changing. This will happen once in the very beginning and then whenever the pipeline topology or state changes and new caps would be more optimal for the new situation. This notification happens via the BaseTransform::set_caps virtual method.

    fn set_caps(&self, element: &BaseSrc, caps: &gst::CapsRef) -> bool {
        use std::f64::consts::PI;

        let info = match gst_audio::AudioInfo::from_caps(caps) {
            None => return false,
            Some(info) => info,
        };

        gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);

        element.set_blocksize(info.bpf() * (*self.settings.lock().unwrap()).samples_per_buffer);

        let settings = *self.settings.lock().unwrap();
        let mut state = self.state.lock().unwrap();

        // If we have no caps yet, any old sample_offset and sample_stop will be
        // in nanoseconds
        let old_rate = match state.info {
            Some(ref info) => info.rate() as u64,
            None => gst::SECOND_VAL,
        };

        // Update sample offset and accumulator based on the previous values and the
        // sample rate change, if any
        let old_sample_offset = state.sample_offset;
        let sample_offset = old_sample_offset
            .mul_div_floor(info.rate() as u64, old_rate)
            .unwrap();

        let old_sample_stop = state.sample_stop;
        let sample_stop =
            old_sample_stop.map(|v| v.mul_div_floor(info.rate() as u64, old_rate).unwrap());

        let accumulator =
            (sample_offset as f64).rem(2.0 * PI * (settings.freq as f64) / (info.rate() as f64));

        *state = State {
            info: Some(info),
            sample_offset: sample_offset,
            sample_stop: sample_stop,
            accumulator: accumulator,
        };

        drop(state);

        let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build());

        true
    }

In here we parse the caps into a AudioInfo and then store that in our internal state, while updating various fields. We tell the base class about the number of bytes each buffer is usually going to hold, and update our current sample position, the stop sample position (when a seek with stop position happens, we need to know when to stop) and our accumulator. This happens by scaling both positions by the old and new sample rate. If we don’t have an old sample rate, we assume nanoseconds (this will make more sense once seeking is implemented). The scaling is done with the help of the muldiv crate, which implements scaling of integer types by a fraction with protection against overflows by doing up to 128 bit integer arithmetic for intermediate values.

The accumulator is the updated based on the current phase of the sine wave at the current sample position.

As a last step we post a new LATENCY message on the bus whenever the sample rate has changed. Our latency (in live mode) is going to be the duration of a single buffer, but more about that later.

BaseSrc is by default already selecting possible caps for us, if there are multiple options. However these defaults might not be (and often are not) ideal and we should override the default behaviour slightly. This is done in the BaseSrc::fixate virtual method.

    fn fixate(&self, element: &BaseSrc, caps: gst::Caps) -> gst::Caps {
        // Fixate the caps. BaseSrc will do some fixation for us, but
        // as we allow any rate between 1 and MAX it would fixate to 1. 1Hz
        // is generally not a useful sample rate.
        //
        // We fixate to the closest integer value to 48kHz that is possible
        // here, and for good measure also decide that the closest value to 1
        // channel is good.
        let mut caps = gst::Caps::truncate(caps);
        {
            let caps = caps.make_mut();
            let s = caps.get_mut_structure(0).unwrap();
            s.fixate_field_nearest_int("rate", 48_000);
            s.fixate_field_nearest_int("channels", 1);
        }

        // Let BaseSrc fixate anything else for us. We could've alternatively have
        // called Caps::fixate() here
        element.parent_fixate(caps)
    }

Here we take the caps that are passed in, truncate them (i.e. remove all but the very first Structure) and then manually fixate the sample rate to the closest value to 48kHz. By default, caps fixation would result in the lowest possible sample rate but this is usually not desired.

For good measure, we also fixate the number of channels to the closest value to 1, but this would already be the default behaviour anyway. And then chain up to the parent class’ implementation of fixate, which for now basically does the same as Caps::fixate(). After this, the caps are fixated, i.e. there is only a single Structure left and all fields have concrete values (no ranges or sets).

Query Handling

As our source element will work by generating a new audio buffer from a specific offset, and especially works in Time format, we want to notify downstream elements that we don’t want to run in Pull mode, only in Push mode. In addition would prefer sequential reading. However we still allow seeking later. For a source that does not know about Time, e.g. a file source, the format would be configured as Bytes. Other values than Time and Bytes generally don’t make any sense.

The main difference here is that otherwise the base class would ask us to produce data for arbitrary Byte offsets, and we would have to produce data for that. While possible in our case, it’s a bit annoying and for other audio sources it’s not easily possible at all.

Downstream elements will try to query this very information from us, so we now have to override the default query handling of BaseSrc and handle the SCHEDULING query differently. Later we will also handle other queries differently.

    fn query(&self, element: &BaseSrc, query: &mut gst::QueryRef) -> bool {
        use gst::QueryView;

        match query.view_mut() {
            // We only work in Push mode. In Pull mode, create() could be called with
            // arbitrary offsets and we would have to produce for that specific offset
            QueryView::Scheduling(ref mut q) => {
                q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0);
                q.add_scheduling_modes(&[gst::PadMode::Push]);
                return true;
            }
            _ => (),
        }
        BaseSrcBase::parent_query(element, query)
    }

To handle the SCHEDULING query specifically, we first have to match on a view (mutable because we want to modify the view) of the query check the type of the query. If it indeed is a scheduling query, we can set the SEQUENTIAL flag and specify that we handle only Push mode, then return true directly as we handled the query already.

In all other cases we fall back to the parent class’ implementation of the query virtual method.

Buffer Creation

Now we have everything in place for a working element, apart from the virtual method to actually generate the raw audio buffers with the sine wave. From a high-level BaseSrc works by calling the create virtual method over and over again to let the subclass produce a buffer until it returns an error or signals the end of the stream.

Let’s first talk about how to generate the sine wave samples themselves. As we want to operate on 32 bit and 64 bit floating point numbers, we implement a generic function for generating samples and storing them in a mutable byte slice. This is done with the help of the num_traits crate, which provides all kinds of useful traits for abstracting over numeric types. In our case we only need the Float and NumCast traits.

Instead of writing a generic implementation with those traits, it would also be possible to do the same with a simple macro that generates a function for both types. Which approach is nicer is a matter of taste in the end, the compiler output should be equivalent for both cases.

    fn process<F: Float + FromByteSlice>(
        data: &mut [u8],
        accumulator_ref: &mut f64,
        freq: u32,
        rate: u32,
        channels: u32,
        vol: f64,
    ) {
        use std::f64::consts::PI;

        // Reinterpret our byte-slice as a slice containing elements of the type
        // we're interested in. GStreamer requires for raw audio that the alignment
        // of memory is correct, so this will never ever fail unless there is an
        // actual bug elsewhere.
        let data = data.as_mut_slice_of::<F>().unwrap();

        // Convert all our parameters to the target type for calculations
        let vol: F = NumCast::from(vol).unwrap();
        let freq = freq as f64;
        let rate = rate as f64;
        let two_pi = 2.0 * PI;

        // We're carrying a accumulator with up to 2pi around instead of working
        // on the sample offset. High sample offsets cause too much inaccuracy when
        // converted to floating point numbers and then iterated over in 1-steps
        let mut accumulator = *accumulator_ref;
        let step = two_pi * freq / rate;

        for chunk in data.chunks_mut(channels as usize) {
            let value = vol * F::sin(NumCast::from(accumulator).unwrap());
            for sample in chunk {
                *sample = value;
            }

            accumulator += step;
            if accumulator >= two_pi {
                accumulator -= two_pi;
            }
        }

        *accumulator_ref = accumulator;
    }

This function takes the mutable byte slice from our buffer as argument, as well as the current value of the accumulator and the relevant settings for generating the sine wave.

As a first step, we “cast” the byte slice to one of the target type (f32 or f64) with the help of the byte_slice_cast crate. This ensures that alignment and sizes are all matching and returns a mutable slice of our target type if successful. In case of GStreamer, the buffer alignment is guaranteed to be big enough for our types here and we allocate the buffer of a correct size later.

Now we convert all the parameters to the types we will use later, and store them together with the current accumulator value in local variables. Then we iterate over the whole floating point number slice in chunks with all channels, and fill each channel with the current value of our sine wave.

The sine wave itself is calculated by val = volume * sin(2 * PI * frequency * (i + accumulator) / rate), but we actually calculate it by simply increasing the accumulator by 2 * PI * frequency / rate for every sample instead of doing the multiplication for each sample. We also make sure that the accumulator always stays between 0 and 2 * PI to prevent any inaccuracies from floating point numbers to affect our produced samples.

Now that this is done, we need to implement the BaseSrc::create virtual method for actually allocating the buffer, setting timestamps and other metadata and it and calling our above function.

    fn create(
        &self,
        element: &BaseSrc,
        _offset: u64,
        _length: u32,
    ) -> Result<gst::Buffer, gst::FlowReturn> {
        // Keep a local copy of the values of all our properties at this very moment. This
        // ensures that the mutex is never locked for long and the application wouldn't
        // have to block until this function returns when getting/setting property values
        let settings = *self.settings.lock().unwrap();

        // Get a locked reference to our state, i.e. the input and output AudioInfo
        let mut state = self.state.lock().unwrap();
        let info = match state.info {
            None => {
                gst_element_error!(element, gst::CoreError::Negotiation, ["Have no caps yet"]);
                return Err(gst::FlowReturn::NotNegotiated);
            }
            Some(ref info) => info.clone(),
        };

        // If a stop position is set (from a seek), only produce samples up to that
        // point but at most samples_per_buffer samples per buffer
        let n_samples = if let Some(sample_stop) = state.sample_stop {
            if sample_stop <= state.sample_offset {
                gst_log!(self.cat, obj: element, "At EOS");
                return Err(gst::FlowReturn::Eos);
            }

            sample_stop - state.sample_offset
        } else {
            settings.samples_per_buffer as u64
        };

        // Allocate a new buffer of the required size, update the metadata with the
        // current timestamp and duration and then fill it according to the current
        // caps
        let mut buffer =
            gst::Buffer::with_size((n_samples as usize) * (info.bpf() as usize)).unwrap();
        {
            let buffer = buffer.get_mut().unwrap();

            // Calculate the current timestamp (PTS) and the next one,
            // and calculate the duration from the difference instead of
            // simply the number of samples to prevent rounding errors
            let pts = state
                .sample_offset
                .mul_div_floor(gst::SECOND_VAL, info.rate() as u64)
                .unwrap()
                .into();
            let next_pts: gst::ClockTime = (state.sample_offset + n_samples)
                .mul_div_floor(gst::SECOND_VAL, info.rate() as u64)
                .unwrap()
                .into();
            buffer.set_pts(pts);
            buffer.set_duration(next_pts - pts);

            // Map the buffer writable and create the actual samples
            let mut map = buffer.map_writable().unwrap();
            let data = map.as_mut_slice();

            if info.format() == gst_audio::AUDIO_FORMAT_F32 {
                Self::process::<f32>(
                    data,
                    &mut state.accumulator,
                    settings.freq,
                    info.rate(),
                    info.channels(),
                    settings.volume,
                );
            } else {
                Self::process::<f64>(
                    data,
                    &mut state.accumulator,
                    settings.freq,
                    info.rate(),
                    info.channels(),
                    settings.volume,
                );
            }
        }
        state.sample_offset += n_samples;
        drop(state);

        gst_debug!(self.cat, obj: element, "Produced buffer {:?}", buffer);

        Ok(buffer)
    }

Just like last time, we start with creating a copy of our properties (settings) and keeping a mutex guard of the internal state around. If the internal state has no AudioInfo yet, we error out. This would mean that no caps were negotiated yet, which is something we can’t handle and is not really possible in our case.

Next we calculate how many samples we have to generate. If a sample stop position was set by a seek event, we have to generate samples up to at most that point. Otherwise we create at most the number of samples per buffer that were set via the property. Then we allocate a buffer of the corresponding size, with the help of the bpf field of the AudioInfo, and then set its metadata and fill the samples.

The metadata that is set is the timestamp (PTS), and the duration. The duration is calculated from the difference of the following buffer’s timestamp and the current buffer’s. By this we ensure that rounding errors are not causing the next buffer’s timestamp to have a different timestamp than the sum of the current’s and its duration. While this would not be much of a problem in GStreamer (inaccurate and jitterish timestamps are handled just fine), we can prevent it here and do so.

Afterwards we call our previously defined function on the writably mapped buffer and fill it with the sample values.

With all this, the element should already work just fine in any GStreamer-based application, for example gst-launch-1.0. Don’t forget to set the GST_PLUGIN_PATH environment variable correctly like last time. Before running this, make sure to turn down the volume of your speakers/headphones a bit.

export GST_PLUGIN_PATH=<code>{{EJS104}}</code>/target/debug
gst-launch-1.0 rssinesrc freq=440 volume=0.9 ! audioconvert ! autoaudiosink

You should hear a 440Hz sine wave now.

(Pseudo) Live Mode

Many audio (and video) sources can actually only produce data in real-time and data is produced according to some clock. So far our source element can produce data as fast as downstream is consuming data, but we optionally can change that. We simulate a live source here now by waiting on the pipeline clock, but with a real live source you would only ever be able to have the data in real-time without any need to wait on a clock. And usually that data is produced according to a different clock than the pipeline clock, in which case translation between the two clocks is needed but we ignore this aspect for now. For details check the GStreamer documentation.

For working in live mode, we have to add a few different parts in various places. First of all, we implement waiting on the clock in the create function.

    fn create(...
        [...]
        state.sample_offset += n_samples;
        drop(state);

        // If we're live, we are waiting until the time of the last sample in our buffer has
        // arrived. This is the very reason why we have to report that much latency.
        // A real live-source would of course only allow us to have the data available after
        // that latency, e.g. when capturing from a microphone, and no waiting from our side
        // would be necessary..
        //
        // Waiting happens based on the pipeline clock, which means that a real live source
        // with its own clock would require various translations between the two clocks.
        // This is out of scope for the tutorial though.
        if element.is_live() {
            let clock = match element.get_clock() {
                None => return Ok(buffer),
                Some(clock) => clock,
            };

            let segment = element
                .get_segment()
                .downcast::<gst::format::Time>()
                .unwrap();
            let base_time = element.get_base_time();
            let running_time = segment.to_running_time(buffer.get_pts() + buffer.get_duration());

            // The last sample's clock time is the base time of the element plus the
            // running time of the last sample
            let wait_until = running_time + base_time;
            if wait_until.is_none() {
                return Ok(buffer);
            }

            let id = clock.new_single_shot_id(wait_until).unwrap();

            gst_log!(
                self.cat,
                obj: element,
                "Waiting until {}, now {}",
                wait_until,
                clock.get_time()
            );
            let (res, jitter) = id.wait();
            gst_log!(
                self.cat,
                obj: element,
                "Waited res {:?} jitter {}",
                res,
                jitter
            );
        }

        gst_debug!(self.cat, obj: element, "Produced buffer {:?}", buffer);

        Ok(buffer)
    }

To be able to wait on the clock, we first of all need to calculate the clock time until when we want to wait. In our case that will be the clock time right after the end of the last sample in the buffer we just produced. Simply because you can’t capture a sample before it was produced.

We calculate the running time from the PTS and duration of the buffer with the help of the currently configured segment and then add the base time of the element on this to get the clock time as result. Please check the GStreamer documentation for details, but in short the running time of a pipeline is the time since the start of the pipeline (or the last reset of the running time) and the running time of a buffer can be calculated from its PTS and the segment, which provides the information to translate between the two. The base time is the clock time when the pipeline went to the Playing state, so just an offset.

Next we wait and then return the buffer as before.

Now we also have to tell the base class that we’re running in live mode now. This is done by calling set_live(true) on the base class before changing the element state from Ready to Paused. For this we override the Element::change_state virtual method.

impl ElementImpl<BaseSrc> for SineSrc {
    fn change_state(
        &self,
        element: &BaseSrc,
        transition: gst::StateChange,
    ) -> gst::StateChangeReturn {
        // Configure live'ness once here just before starting the source
        match transition {
            gst::StateChange::ReadyToPaused => {
                element.set_live(self.settings.lock().unwrap().is_live);
            }
            _ => (),
        }

        element.parent_change_state(transition)
    }
}

And as a last step, we also need to notify downstream elements about our latency. Live elements always have to report their latency so that synchronization can work correctly. As the clock time of each buffer is equal to the time when it was created, all buffers would otherwise arrive late in the sinks (they would appear as if they should’ve been played already at the time when they were created). So all the sinks will have to compensate for the latency that it took from capturing to the sink, and they have to do that in a coordinated way (otherwise audio and video would be out of sync if both have different latencies). For this the pipeline is querying each sink for the latency on its own branch, and then configures a global latency on all sinks according to that.

This querying is done with the LATENCY query, which we will now also have to handle.

    fn query(&self, element: &BaseSrc, query: &mut gst::QueryRef) -> bool {
        use gst::QueryView;

        match query.view_mut() {
            // We only work in Push mode. In Pull mode, create() could be called with
            // arbitrary offsets and we would have to produce for that specific offset
            QueryView::Scheduling(ref mut q) => {
                [...]
            }
            // In Live mode we will have a latency equal to the number of samples in each buffer.
            // We can't output samples before they were produced, and the last sample of a buffer
            // is produced that much after the beginning, leading to this latency calculation
            QueryView::Latency(ref mut q) => {
                let settings = *self.settings.lock().unwrap();
                let state = self.state.lock().unwrap();

                if let Some(ref info) = state.info {
                    let latency = gst::SECOND
                        .mul_div_floor(settings.samples_per_buffer as u64, info.rate() as u64)
                        .unwrap();
                    gst_debug!(self.cat, obj: element, "Returning latency {}", latency);
                    q.set(settings.is_live, latency, gst::CLOCK_TIME_NONE);
                    return true;
                } else {
                    return false;
                }
            }
            _ => (),
        }
        BaseSrcBase::parent_query(element, query)
    }

The latency that we report is the duration of a single audio buffer, because we’re simulating a real live source here. A real live source won’t be able to output the buffer before the last sample of it is captured, and the difference between when the first and last sample were captured is exactly the latency that we add here. Other elements further downstream that introduce further latency would then add their own latency on top of this.

Inside the latency query we also signal that we are indeed a live source, and additionally how much buffering we can do (in our case, infinite) until data would be lost. The last part is important if e.g. the video branch has a higher latency, causing the audio sink to have to wait some additional time (so that audio and video stay in sync), which would then require the whole audio branch to buffer some data. As we have an artificial live source, we can always generate data for the next time but a real live source would only have a limited buffer and if no data is read and forwarded once that runs full, data would get lost.

You can test this again with e.g. gst-launch-1.0 by setting the is-live property to true. It should write in the output now that the pipeline is live.

In Mathieu’s blog post this was implemented without explicit waiting and the usage of the get_times virtual method, but as this is only really useful for pseudo live sources like this one I decided to explain how waiting on the clock can be achieved correctly and even more important how that relates to the next section.

Unlocking

With the addition of the live mode, the create function is now blocking and waiting on the clock for some time. This is suboptimal as for example a (flushing) seek would have to wait now until the clock waiting is done, or when shutting down the application would have to wait.

To prevent this, all waiting/blocking in GStreamer streaming threads should be interruptible/cancellable when requested. And for example the ClockID that we got from the clock for waiting can be cancelled by calling unschedule() on it. We only have to do it from the right place and keep it accessible. The right place is the BaseSrc::unlock virtual method.

struct ClockWait {
    clock_id: Option<gst::ClockId>,
    flushing: bool,
}

struct SineSrc {
    cat: gst::DebugCategory,
    settings: Mutex<Settings>,
    state: Mutex<State>,
    clock_wait: Mutex<ClockWait>,
}

[...]

    fn unlock(&self, element: &BaseSrc) -> bool {
        // This should unblock the create() function ASAP, so we
        // just unschedule the clock it here, if any.
        gst_debug!(self.cat, obj: element, "Unlocking");
        let mut clock_wait = self.clock_wait.lock().unwrap();
        if let Some(clock_id) = clock_wait.clock_id.take() {
            clock_id.unschedule();
        }
        clock_wait.flushing = true;

        true
    }

We store the clock ID in our struct, together with a boolean to signal whether we’re supposed to flush already or not. And then inside unlock unschedule the clock ID and set this boolean flag to true.

Once everything is unlocked, we need to reset things again so that data flow can happen in the future. This is done in the unlock_stop virtual method.

    fn unlock_stop(&self, element: &BaseSrc) -> bool {
        // This signals that unlocking is done, so we can reset
        // all values again.
        gst_debug!(self.cat, obj: element, "Unlock stop");
        let mut clock_wait = self.clock_wait.lock().unwrap();
        clock_wait.flushing = false;

        true
    }

To make sure that this struct is always initialized correctly, we also call unlock from stop, and unlock_stop from start.

Now as a last step, we need to actually make use of the new struct we added around the code where we wait for the clock.

            // Store the clock ID in our struct unless we're flushing anyway.
            // This allows to asynchronously cancel the waiting from unlock()
            // so that we immediately stop waiting on e.g. shutdown.
            let mut clock_wait = self.clock_wait.lock().unwrap();
            if clock_wait.flushing {
                gst_debug!(self.cat, obj: element, "Flushing");
                return Err(gst::FlowReturn::Flushing);
            }

            let id = clock.new_single_shot_id(wait_until).unwrap();
            clock_wait.clock_id = Some(id.clone());
            drop(clock_wait);

            gst_log!(
                self.cat,
                obj: element,
                "Waiting until {}, now {}",
                wait_until,
                clock.get_time()
            );
            let (res, jitter) = id.wait();
            gst_log!(
                self.cat,
                obj: element,
                "Waited res {:?} jitter {}",
                res,
                jitter
            );
            self.clock_wait.lock().unwrap().clock_id.take();

            // If the clock ID was unscheduled, unlock() was called
            // and we should return Flushing immediately.
            if res == gst::ClockReturn::Unscheduled {
                gst_debug!(self.cat, obj: element, "Flushing");
                return Err(gst::FlowReturn::Flushing);
            }

The important part in this code is that we first have to check if we are already supposed to unlock, before even starting to wait. Otherwise we would start waiting without anybody ever being able to unlock. Then we need to store the clock id in the struct and make sure to drop the mutex guard so that the unlock function can take it again for unscheduling the clock ID. And once waiting is done, we need to remove the clock id from the struct again and in case of ClockReturn::Unscheduled we directly return FlowReturn::Flushing instead of the error.

Similarly when using other blocking APIs it is important that they are woken up in a similar way when unlock is called. Otherwise the application developer’s and thus user experience will be far from ideal.

Seeking

As a last feature we implement seeking on our source element. In our case that only means that we have to update the sample_offset and sample_stop fields accordingly, other sources might have to do more work than that.

Seeking is implemented in the BaseSrc::do_seek virtual method, and signalling whether we can actually seek in the is_seekable virtual method.

    fn is_seekable(&self, _element: &BaseSrc) -> bool {
        true
    }

    fn do_seek(&self, element: &BaseSrc, segment: &mut gst::Segment) -> bool {
        // Handle seeking here. For Time and Default (sample offset) seeks we can
        // do something and have to update our sample offset and accumulator accordingly.
        //
        // Also we should remember the stop time (so we can stop at that point), and if
        // reverse playback is requested. These values will all be used during buffer creation
        // and for calculating the timestamps, etc.

        if segment.get_rate() < 0.0 {
            gst_error!(self.cat, obj: element, "Reverse playback not supported");
            return false;
        }

        let settings = *self.settings.lock().unwrap();
        let mut state = self.state.lock().unwrap();

        // We store sample_offset and sample_stop in nanoseconds if we
        // don't know any sample rate yet. It will be converted correctly
        // once a sample rate is known.
        let rate = match state.info {
            None => gst::SECOND_VAL,
            Some(ref info) => info.rate() as u64,
        };

        if let Some(segment) = segment.downcast_ref::<gst::format::Time>() {
            use std::f64::consts::PI;

            let sample_offset = segment
                .get_start()
                .unwrap()
                .mul_div_floor(rate, gst::SECOND_VAL)
                .unwrap();

            let sample_stop = segment
                .get_stop()
                .map(|v| v.mul_div_floor(rate, gst::SECOND_VAL).unwrap());

            let accumulator =
                (sample_offset as f64).rem(2.0 * PI * (settings.freq as f64) / (rate as f64));

            gst_debug!(
                self.cat,
                obj: element,
                "Seeked to {}-{:?} (accum: {}) for segment {:?}",
                sample_offset,
                sample_stop,
                accumulator,
                segment
            );

            *state = State {
                info: state.info.clone(),
                sample_offset: sample_offset,
                sample_stop: sample_stop,
                accumulator: accumulator,
            };

            true
        } else if let Some(segment) = segment.downcast_ref::<gst::format::Default>() {
            use std::f64::consts::PI;

            if state.info.is_none() {
                gst_error!(
                    self.cat,
                    obj: element,
                    "Can only seek in Default format if sample rate is known"
                );
                return false;
            }

            let sample_offset = segment.get_start().unwrap();
            let sample_stop = segment.get_stop().0;

            let accumulator =
                (sample_offset as f64).rem(2.0 * PI * (settings.freq as f64) / (rate as f64));

            gst_debug!(
                self.cat,
                obj: element,
                "Seeked to {}-{:?} (accum: {}) for segment {:?}",
                sample_offset,
                sample_stop,
                accumulator,
                segment
            );

            *state = State {
                info: state.info.clone(),
                sample_offset: sample_offset,
                sample_stop: sample_stop,
                accumulator: accumulator,
            };

            true
        } else {
            gst_error!(
                self.cat,
                obj: element,
                "Can't seek in format {:?}",
                segment.get_format()
            );

            false
        }
    }

Currently no support for reverse playback is implemented here, that is left as an exercise for the reader. So as a first step we check if the segment has a negative rate, in which case we just fail and return false.

Afterwards we again take a copy of the settings, keep a mutable mutex guard of our state and then start handling the actual seek.

If no caps are known yet, i.e. the AudioInfo is None, we assume a rate of 1 billion. That is, we just store the time in nanoseconds for now and let the set_caps function take care of that (which we already implemented accordingly) once the sample rate is known.

Then, if a Time seek is performed, we convert the segment start and stop position from time to sample offsets and save them. And then update the accumulator in a similar way as in the set_caps function. If a seek is in Default format (i.e. sample offsets for raw audio), we just have to store the values and update the accumulator but only do so if the sample rate is known already. A sample offset seek does not make any sense until the sample rate is known, so we just fail here to prevent unexpected surprises later.

Try the following pipeline for testing seeking. You should be able to seek the current time drawn over the video, and with the left/right cursor key you can seek. Also this shows that we create a quite nice sine wave.

gst-launch-1.0 rssinesrc ! audioconvert ! monoscope ! timeoverlay ! navseek ! glimagesink

And with that all features are implemented in our sine wave raw audio source.