16 min read

The crates.io ecosystem in Rust can make use of approaches to improve our development speed as well as the performance of our code. In this tutorial, we’ll learn how to use the crates ecosystem to manipulate threads in Rust.

This article is an extract from Rust High Performance, authored by Iban Eguia Moraza.

Using non-blocking data structures

One of the issues we saw earlier was that if we wanted to share something more complex than an integer or a Boolean between threads and if we wanted to mutate it, we needed to use a Mutex. This is not entirely true, since one crate, Crossbeam, allows us to use great data structures that do not require locking a Mutex. They are therefore much faster and more efficient.

Often, when we want to share information between threads, it’s usually a list of tasks that we want to work on cooperatively. Other times, we want to create information in multiple threads and add it to a list of information. It’s therefore not so usual for multiple threads to be working with exactly the same variables since as we have seen, that requires synchronization and it will be slow.

This is where Crossbeam shows all its potential. Crossbeam gives us some multithreaded queues and stacks, where we can insert data and consume data from different threads. We can, in fact, have some threads doing an initial processing of the data and others performing a second phase of the processing. Let’s see how we can use these features. First, add crossbeam to the dependencies of the crate in the Cargo.toml file. Then, we start with a simple example:


extern crate crossbeam;
use std::thread;
use std::sync::Arc;

use crossbeam::sync::MsQueue;

fn main() {
let queue = Arc::new(MsQueue::new());
let handles: Vec<_> = (1..6)
.map(|_| {
let t_queue = queue.clone();
thread::spawn(move || {
for _ in 0..1_000_000 {
t_queue.push(10);
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
let final_queue = Arc::try_unwrap(queue).unwrap();
let mut sum = 0;
while let Some(i) = final_queue.try_pop() {
sum += i;
}

println!("Final sum: {}", sum);
}

Let’s first understand what this example does. It will iterate 1,000,000 times in 5 different threads, and each time it will push a 10 to a queue. Queues are FIFO lists, first input, first output. This means that the first number entered will be the first one to pop() and the last one will be the last to do so. In this case, all of them are a 10, so it doesn’t matter.

Once the threads finish populating the queue, we iterate over it and we add all the numbers. A simple computation should make you able to guess that if everything goes perfectly, the final number should be 50,000,000. If you run it, that will be the result, and that’s not all. If you run it by executing cargo run --release, it will run blazingly fast. On my computer, it took about one second to complete. If you want, try to implement this code with the standard library Mutex and vector, and you will see that the performance difference is amazing.

As you can see, we still needed to use an Arc to control the multiple references to the queue. This is needed because the queue itself cannot be duplicated and shared, it has no reference count.

Crossbeam not only gives us FIFO queues. We also have LIFO stacks. LIFO comes from last input, first output, and it means that the last element you inserted in the stack will be the first one to pop(). Let’s see the difference with a couple of threads:

extern crate crossbeam;
use std::thread;
use std::sync::Arc;
use std::time::Duration;

use crossbeam::sync::{MsQueue, TreiberStack};
fn main() {
let queue = Arc::new(MsQueue::new());
let stack = Arc::new(TreiberStack::new());

let in_queue = queue.clone();
let in_stack = stack.clone();
let in_handle = thread::spawn(move || {
for i in 0..5 {
in_queue.push(i);
in_stack.push(i);
println!("Pushed :D");
thread::sleep(Duration::from_millis(50));
}
});
let mut final_queue = Vec::new();
let mut final_stack = Vec::new();

let mut last_q_failed = 0;
let mut last_s_failed = 0;
loop {
// Get the queue
match queue.try_pop() {
Some(i) => {
final_queue.push(i);
last_q_failed = 0;
println!("Something in the queue! :)");
}
None => {
println!("Nothing in the queue :(");
last_q_failed += 1;
}
}
// Get the stack
match stack.try_pop() {
Some(i) => {
final_stack.push(i);
last_s_failed = 0;
println!("Something in the stack! :)");
}
None => {
println!("Nothing in the stack :(");
last_s_failed += 1;
}
}
// Check if we finished
if last_q_failed > 1 && last_s_failed > 1 {
break;
} else if last_q_failed > 0 || last_s_failed > 0 {
thread::sleep(Duration::from_millis(100));
}
}
in_handle.join().unwrap();

println!("Queue: {:?}", final_queue);
println!("Stack: {:?}", final_stack);
}

As you can see in the code, we have two shared variables: a queue and a stack. The secondary thread will push new values to each of them, in the same order, from 0 to 4. Then, the main thread will try to get them back. It will loop indefinitely and use the try_pop() method. The pop() method can be used, but it will block the thread if the queue or the stack is empty. This will happen in any case once all values get popped since no new values are being added, so the try_pop() method will help not to block the main thread and end gracefully.

The way it checks whether all the values were popped is by counting how many times it failed to pop a new value. Every time it fails, it will wait for 100 milliseconds, while the push thread only waits for 50 milliseconds between pushes. This means that if it tries to pop new values two times and there are no new values, the pusher thread has already finished.

It will add values as they are popped to two vectors and then print the result. In the meantime, it will print messages about pushing and popping new values. You will understand this better by seeing the output:

Note that the output can be different in your case, since threads don’t need to be executed in any particular order.

In this example output, as you can see, it first tries to get something from the queue and the stack but there is nothing there, so it sleeps. The second thread then starts pushing things, two numbers actually. After this, the queue and the stack will be [0, 1]. Then, it pops the first item from each of them. From the queue, it will pop the 0 and from the stack it will pop the 1 (the last one), leaving the queue as [1] and the stack as [0].

It will go back to sleep and the secondary thread will insert a 2 in each variable, leaving the queue as [1, 2] and the stack as [0, 2]. Then, the main thread will pop two elements from each of them. From the queue, it will pop the 1 and the 2, while from the stack it will pop the 2 and then the 0, leaving both empty.

The main thread then goes to sleep, and for the next two tries, the secondary thread will push one element and the main thread will pop it, twice.

It might seem a little bit complex, but the idea is that these queues and stacks can be used efficiently between threads without requiring a Mutex, and they accept any Send type. This means that they are great for complex computations, and even for multi-staged complex computations.

The Crossbeam crate also has some helpers to deal with epochs and even some variants of the mentioned types. For multithreading, Crossbeam also adds a great utility: scoped threads.

Scoped threads

In all our examples, we have used standard library threads. As we have discussed, these threads have their own stack, so if we want to use variables that we created in the main thread we will need to send them to the thread. This means that we will need to use things such as Arc to share non-mutable data. Not only that, having their own stack means that they will also consume more memory and eventually make the system slower if they use too much.

Crossbeam gives us some special threads that allow sharing stacks between them. They are called scoped threads. Using them is pretty simple and the crate documentation explains them perfectly; you will just need to create a Scope by calling crossbeam::scope(). You will need to pass a closure that receives the Scope. You can then call spawn() in that scope the same way you would do it in std::thread, but with one difference, you can share immutable variables among threads if they were created inside the scope or moved to it.

This means that for the queues or stacks we just talked about, or for atomic data, you can simply call their methods without requiring an Arc! This will improve the performance even further. Let’s see how it works with a simple example:

extern crate crossbeam;
fn main() {
let all_nums: Vec<_> = (0..1_000_u64).into_iter().collect();
let mut results = Vec::new();
crossbeam::scope(|scope| {
for num in &all_nums {
results.push(scope.spawn(move || num * num + num * 5 + 250));
}
});
let final_result: u64 = results.into_iter().map(|res| res.join()).sum();
println!("Final result: {}", final_result);
}

Let’s see what this code does. It will first just create a vector with all the numbers from 0 to 1000. Then, for each of them, in a crossbeam scope, it will run one scoped thread per number and perform a supposedly complex computation. This is just an example, since it will just return a result of a simple second-order function.

Interestingly enough, though, the scope.spawn() method allows returning a result of any type, which is great in our case. The code will add each result to a vector. This won’t directly add the resulting number, since it will be executed in parallel. It will add a result guard, which we will be able to check outside the scope.

Then, after all the threads run and return the results, the scope will end. We can now check all the results, which are guaranteed to be ready for us. For each of them, we just need to call join() and we will get the result. Then, we sum it up to check that they are actual results from the computation.

This join() method can also be called inside the scope and get the results, but it will mean that if you do it inside the for loop, for example, you will block the loop until the result is generated, which is not efficient. The best thing is to at least run all the computations first and then start checking the results. If you want to perform more computations after them, you might find it useful to run the new computation in another loop or iterator inside the crossbeam scope.

But, how does crossbeam allow you to use the variables outside the scope freely? Won’t there be data races? Here is where the magic happens. The scope will join all the inner threads before exiting, which means that no further code will be executed in the main thread until all the scoped threads finish. This means that we can use the variables of the main thread, also called parent stack, due to the main thread being the parent of the scope in this case without any issue.

We can actually check what is happening by using the println!() macro. If we remember from previous examples, printing to the console after spawning some threads would usually run even before the spawned threads, due to the time it takes to set them up. In this case, since we have crossbeam preventing it, we won’t see it. Let’s check the example:

extern crate crossbeam;
fn main() {
let all_nums: Vec<_> = (0..10).into_iter().collect();
crossbeam::scope(|scope| {
for num in all_nums {
scope.spawn(move || {
println!("Next number is {}", num);
});
}
});

println!("Main thread continues :)");
}

If you run this code, you will see something similar to the following output:

As you can see, scoped threads will run without any particular order. In this case, it will first run the 1, then the 0, then the 2, and so on. Your output will probably be different. The interesting thing, though, is that the main thread won’t continue executing until all the threads have finished. Therefore, reading and modifying variables in the main thread is perfectly safe.

There are two main performance advantages with this approach; Arc will require a call to malloc() to allocate memory in the heap, which will take time if it’s a big structure and the memory is a bit full. Interestingly enough, that data is already in our stack, so if possible, we should try to avoid duplicating it in the heap. Moreover, the Arc will have a reference counter, as we saw. And it will even be an atomic reference counter, which means that every time we clone the reference, we will need to atomically increment the count. This takes time, even more than incrementing simple integers.

Most of the time, we might be waiting for some expensive computations to run, and it would be great if they just gave all the results when finished. We can still add some more chained computations, using scoped threads, that will only be executed after the first ones finish, so we should use scoped threads more often than normal threads, if possible.

Using thread pool

So far, we have seen multiple ways of creating new threads and sharing information between them. Nevertheless, the ideal number of threads we should spawn to do all the work should be around the number of virtual processors in the system. This means we should not spawn one thread for each chunk of work. Nevertheless, controlling what work each thread does can be complex, since you have to make sure that all threads have work to do at any given point in time.

Here is where thread pooling comes in handy. The Threadpool crate will enable you to iterate over all your work and for each of your small chunks, you can call something similar to a thread::spawn(). The interesting thing is that each task will be assigned to an idle thread, and no new thread will be created for each task. The number of threads is configurable and you can get the number of CPUs with other crates. Not only that, if one of the threads panics, it will automatically add a new one to the pool.

To see an example, first, let’s add threadpool and num_cpus as dependencies in our Cargo.toml file.  Then, let’s see an example code:

extern crate num_cpus;
extern crate threadpool;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use threadpool::ThreadPool;

fn main() {
let pool = ThreadPool::with_name("my worker".to_owned(), num_cpus::get());
println!("Pool threads: {}", pool.max_count());
let result = Arc::new(AtomicUsize::new(0));

for i in 0..1_0000_000 {
let t_result = result.clone();
pool.execute(move || {
t_result.fetch_add(i, Ordering::Relaxed);
});
}
pool.join();

let final_res = Arc::try_unwrap(result).unwrap().into_inner();
println!("Final result: {}", final_res);
}

This code will create a thread pool of threads with the number of logical CPUs in your computer. Then, it will add a number from 0 to 1,000,000 to an atomic usize, just to test parallel processing. Each addition will be performed by one thread. Doing this with one thread per operation (1,000,000 threads) would be really inefficient. In this case, though, it will use the appropriate number of threads, and the execution will be really fast. There is another crate that gives thread pools an even more interesting parallel processing feature: Rayon.

Using parallel iterators

If you can see the big picture in these code examples, you’ll have realized that most of the parallel work has a long loop, giving work to different threads. It happened with simple threads and it happens even more with scoped threads and thread pools. It’s usually the case in real life, too. You might have a bunch of data to process, and you can probably separate that processing into chunks, iterate over them, and hand them over to various threads to do the work for you.

The main issue with that approach is that if you need to use multiple stages to process a given piece of data, you might end up with lots of boilerplate code that can make it difficult to maintain. Not only that, you might find yourself not using parallel processing sometimes due to the hassle of having to write all that code.

Luckily, Rayon has multiple data parallelism primitives around iterators that you can use to parallelize any iterative computation. You can almost forget about the Iterator trait and use Rayon’s ParallelIterator alternative, which is as easy to use as the standard library trait!

Rayon uses a parallel iteration technique called work stealing. For each iteration of the parallel iterator, the new value or values get added to a queue of pending work. Then, when a thread finishes its work, it checks whether there is any pending work to do and if there is, it starts processing it. This, in most languages, is a clear source of data races, but thanks to Rust, this is no longer an issue, and your algorithms can run extremely fast and in parallel.

Let’s look at how to use it for an example similar to those we have seen in this chapter. First, add rayon to your Cargo.toml file and then let’s start with the code:

extern crate rayon;
use rayon::prelude::*;

fn main() {
let result = (0..1_000_000_u64)
.into_par_iter()
.map(|e| e * 2)
.sum::();

println!("Result: {}", result);
}

As you can see, this works just as you would write it in a sequential iterator, yet, it’s running in parallel. Of course, running this example sequentially will be faster than running it in parallel thanks to compiler optimizations, but when you need to process data from files, for example, or perform very complex mathematical computations, parallelizing the input can give great performance gains.

Rayon implements these parallel iteration traits to all standard library iterators and ranges. Not only that, it can also work with standard library collections, such as HashMap and Vec. In most cases, if you are using the iter() or into_iter() methods from the standard library in your code, you can simply use par_iter() or into_par_iter() in those calls and your code should now be parallel and work perfectly.

But, beware, sometimes parallelizing something doesn’t automatically improve its performance. Take into account that if you need to update some shared information between the threads, they will need to synchronize somehow, and you will lose performance. Therefore, multithreading is only great if workloads are completely independent and you can execute one without any dependency on the rest.

If you found this article useful and would like to learn more such tips, head over to pick up this book, Rust High Performance, authored by Iban Eguia Moraza.

Read Next:

Rust 1.28 is here with global allocators, nonZero types and more

Java Multithreading: How to synchronize threads to implement critical sections and avoid race conditions

Multithreading with Qt

I'm a technology enthusiast who designs and creates learning content for IT professionals, in my role as a Category Manager at Packt. I also blog about what's trending in technology and IT. I'm a foodie, an adventure freak, a beard grower and a doggie lover.

LEAVE A REPLY

Please enter your comment!
Please enter your name here