Course 6: Closures, Threads, Channels and Concurrency

Pierre Cochard, Tanguy Risset

Introduction

Using Threads and Closures to Run Code Simultaneously

Course: Closures

Closures in Rust are anonymous functions that can be stored in variables, or passed to other processes as arguments. They can be found in a lot of places in the language, in order to allow functional-style programming, behavior customization or to provide concurrent/parallel code execution. They are defined by the following syntax |arguments| -> return_type { body }, for instance:

#![allow(unused)]
fn main() {
// Define a closure and store it into a variable:
let my_closure = |x: i32| -> i32 {
    println!("my_closure");
    x*2
};

// Execute the closure like you would normally do with a function:
let y = my_closure(2);
println!("{y}");
}

Borrowed captures

Just like in C++, closures can capture the environment it originates in and use external data in its own internal scope. By default, captured data, whether it is mutable or not, will be borrowed:

#![allow(unused)]
fn main() {
let x = 2;

let my_closure = || -> i32 {
    // Use a borrowed (immutable) capture of 'x', 
    // and return two times its value:
    x*2
};

println!("{}", my_closure());
}
#![allow(unused)]
fn main() {
// Same, but this time, we modify 'x' directly in the closure:
let mut x = 2;

// The closure itself also has to be made 'mutable' 
// in the case of a mutable borrow:
let mut my_closure_mut = || x *= 2;

my_closure_mut();
println!("{x}");
}

Moved captures

Instead of being borrowed, data can also be captured by value into a closure scope, using the move keyword before declaration:

#![allow(unused)]
fn main() {
let mut x = 2;

// Capturing 'x' by value. Here, it is made with a simple copy:
let mut my_closure_mut = move || {
    x *= 2;
    println!("x (closure): {x}");
};

my_closure_mut();
println!("x: {x}");
}

Why is the following code invalid? How can we solve the issue?

#![allow(unused)]
fn main() {
let mut x = vec![31, 47, 27, 16];

let mut my_closure_mut = move || {
    x.push(32);
    println!("{:?}", x);
};

my_closure_mut();
println!("{:?}", x);
}

Course: Passing closures as objects or arguments

One big specificity of closures is that they have a unique, anonymous type that cannot be written out. This can for instance be demonstrated by running the following piece of code:

#![allow(unused)]
fn main() {
// Utility function that prints out the type of a variable:
fn print_type_of<T>(_: &T) {
    println!("Type of my closure: {}", std::any::type_name::<T>());
}

let my_closure = |x: i32| -> i32 {
    x*2
};

print_type_of(&my_closure);
}

Therefore, passing a closure as an argument to a function using a specific type is not possible in Rust. Instead, in order to do that, one would have to use a trait. Indeed, all closures implement one or several of the following traits, depending on their nature and properties:

  • FnOnce: applies to closures that can be called once. All closures implement this trait;
  • Fn: applies to closures that don't move captured values out of their body and that don't mutate captured values, as well as closures that capture nothing from their environment. These closures can be called more than once without mutating their environment, which is important in cases such as calling a closure multiple times concurrently.
  • FnMut: applies to closures that don't move captured values out of their body, but that might mutate the captured values. These closures can be called more than once.

A closure can then be passed as an argument the same way we do for passing trait-implementing objects, by using the Fn/FnOnce/FnMut(argument_type) -> return_type:

#![allow(unused)]
fn main() {
// All of the 'Fn' traits have the format:
// 'Fn(argument-types) -> return types'
fn exec_closure(x: i32, closure: impl Fn(i32) -> i32) -> i32 {
    closure(x)
}

let c = |x: i32| x + 27;
let r = exec_closure(31, c);

println!("{r}");
}

The generic form also works (and is usually preferrable):

#![allow(unused)]
fn main() {
fn exec_closure<T>(x: i32, closure: T) -> i32 
where 
    T: Fn(i32) -> i32 
{
    closure(x)
}
let c = |x: i32| x + 27;
let r = exec_closure(31, c);

println!("{r}");
}

Using the generic form, store the following chirp_fn closure as a member of the struct Bird, with a valid type signature:

let chirp_fn = |times: i32| {
    for _ in 0..times {
        println!("chirp!");
    }
}

// The struct to implement (use generics!):
struct Bird<...> { 
    chirp: ... 
}

// Create a new instance of 'Bird' with the chirp_fn closure:
let bird = Bird { chirp: chirp_fn };

// Call the 'chirp' closure from inside the struct:
// operator precedence priority can be found here: https://doc.rust-lang.org/reference/expressions.html
(bird.chirp)(10);

Course: Spawning Threads using Closures

A std::thread object in Rust will execute a given closure in an independent (or parallel) context of execution. In the following example, the std::thread::spawn call will only return when it's done creating the thread, but not when the thread has actually finished executing:

#![allow(unused)]
fn main() {
// Spawn a new thread which will execute its given closure:
std::thread::spawn(|| {
    println!("Thread 1: chirp!");
});
// At the 'same time', print something from the main thread:
println!("Main thread: chirp chirp!");
}

In this case, the main function returns before the independent thread's println! call happens. This is why we can only see the "main thread" print output. Usually, a thread is bound to a local variable, and is waited upon before the parent context of execution finishes. This can be done by calling the .join() method on the thread handle:

#![allow(unused)]
fn main() {
// Bind the thread's "handle" to a variable:
let th = std::thread::spawn(|| {
    println!("Thread 1: chirp!");
});

println!("Main thread: chirp chirp!");

// Wait for 'th' to finish executing, and re-synchronise both threads:
th.join().unwrap();
}

The following code prints a modified value of variable var from 3 different threads, running independently from one another. Is the code safe? Can you guess what will be the resulting output?

use std::thread;
let mut var = 32;

let t1 = thread::spawn(move || {
    var += 1;
    println!("Thread 1: reading value {}!", var);
});

let t2 = thread::spawn(move || {
    var += 2;
    println!("Thread 2: reading value {}!", var);
});

var += 3;
println!("Main thread: reading value {}", var);

// Re-synchronise both threads:
t1.join().unwrap();
t2.join().unwrap();

What would happen if we removed all the move keywords from the code?

Sharing data safely between threads

Using Shared State data sets

Course: Exclusive access with Mutexes

Mutual exclusion, or mutex is a mechanism which prevents accessing the same data from multiple threads running at the same time. It relies on a locking system in order to do so: a thread must first ask to acquire the mutex's lock before being able to access the underlying protected data. The lock is a data structure that keeps track of whichever thread has exclusive access to the data. If the mutex happens to be locked at the time a thread tries to access the data, it will stall until the lock is eventually released, and the data is free to acquire.

#![allow(unused)]
fn main() {
use std::sync::Mutex;
// Instantiate a new Mutex<i32> instance with the value '32':
let var: Mutex<i32> = Mutex::new(32);
{
    // Acquire the mutex's lock (and panic in case of failure):
    let mut v = var.lock().unwrap();
    // Modify the value safely:
    *v += 32;
}
// Print the result:
println!("var = {var:?}");
}

In the following example, we want to try to use a Mutex to get both threads to use and modify var, but the compiler doesn't allow it, what is the underlying issue here?

#![allow(unused)]
fn main() {
use std::thread;

let mut var = 32;

let t1 = thread::spawn(move || {
    var += 1;
});

let t2 = thread::spawn(move || {
    var += 2;
});
// Re-synchronise both threads:
t1.join().unwrap();
t2.join().unwrap();

println!("Result: {var}");
}

Course: Reference Counted Mutexes

As we could see in our previous example, a Mutex in itself is not sufficient to implement viable thread-safe data sharing:

  • First is the issue of ownership, which could be solved using, for instance, a shared pointer.
  • Second would be the issue of concurrency in accessing this shared pointer from multiple threads simultaneously.

In the Rust programming language, Atomic Reference Counting Arc<T>, which can be seen as an atomic shared pointer, is designed to remedy this very specific problem. It firstly solves the ownership issue by being "reference-counted", just like a standard Rc object, but also solves the concurrency issue by being atomic, meaning that is guaranteed to execute as a single unified transaction. When an atomic operation is executed on an object by a specific thread, no other threads can read or modify the object while the atomic operation is in progress. In other words, other threads will only see the object before or after the operation, there would be no intermediary state.

Our previous example can then be replaced by the following:

#![allow(unused)]
fn main() {
use std::thread;
use std::sync::{Arc, Mutex};

// We wrap the Mutex in a Atomically Reference-Counted object:
let arc = Arc::new(Mutex::new(32));

// We prepare two clones, for moving into the two distinct closures:
let rc1 = Arc::clone(&arc);
let rc2 = Arc::clone(&arc);

let t1 = thread::spawn(move || {
    let mut v = rc1.lock().unwrap();
    *v += 1;
});

let t2 = thread::spawn(move || {
    let mut v = rc2.lock().unwrap();
    *v += 2;
});

// Re-synchronise both threads:
t1.join().unwrap();
t2.join().unwrap();

println!("Result: {arc:?}");
}

If an Arc object is sufficient to provide multiple ownership and thread-safe access to data, why do we still need a Mutex guarding our data? TODO: pas sur de savoir expliquer la réponse clairement... tu ne relache jamais les Mutex ou ARC?

Course: Lock-free data sharing with Atomics

Another way of making data thread-safe is by directly using atomic data structures. The Rust standard library provides a few of them in its std::sync::atomic module. The main difference with using Mutexes is that atomics are what we call lock-free: unlike Mutexes, its underlying mechanism never sleeps, but it spins (it will check data availability in a continuous loop), that's why we usually call them spinlocks. In real-time contexts, where thread sleep is not an option, it is always preferrable to use lock-free data structures.

Our previous example would for instance look like this with atomics:

#![allow(unused)]
fn main() {
use std::thread;
use std::sync::Arc;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;

// We replace 'Mutex::new()' with the following:
let arc = Arc::new(AtomicI32::new(32));
let rc1 = Arc::clone(&arc);
let rc2 = Arc::clone(&arc);

let t1 = thread::spawn(move || {
    // Acquire underlying data as a copy:
    let mut v = rc1.load(Ordering::Acquire);
    // Modify the copy:
    v += 1;
    // Update the value atomically, release the lock:
    rc1.store(v, Ordering::Release);
});

let t2 = thread::spawn(move || {
    // Another (more compact) way of doing this:
    rc2.fetch_add(2, Ordering::AcqRel);
});
// Re-synchronise both threads:
t1.join().unwrap();
t2.join().unwrap();

println!("Result: {arc:?}");
}

Using Message passing to transfer data between threads

Another approach to multiple ownership and thread-safety in Rust would be using an mpsc::channel or mpsc::sync_channel, which are asynchronous/synchronous FIFO queues that store all the updated states of a value in a shared infinite buffer.

An mpsc::channel() call will return a tuple containing a handle to a Sender object and a Receiver object, which are by convention respectively named tx and rx. These two objects are positioned at each end of a FIFO which tunnels data between the two:

#![allow(unused)]
fn main() {
// Create a 'data channel' between a Sender `tx`, and a Receiver `rx`:  
let (tx, rx) = std::sync::mpsc::channel::<i32>();

// Send the values `32` and then `16` through the channel:
tx.send(32).unwrap();
tx.send(16).unwrap();

// Poll the channel, read data if available:
println!("n = {}", rx.recv().unwrap());
println!("n = {}", rx.recv().unwrap());
}

Example of sending from a separate thread:

#![allow(unused)]
fn main() {
use std::thread;
use std::sync::mpsc;
let (tx, rx) = mpsc::channel();

// Do the same thing in a separate thread:
let th = thread::spawn(move || {
    tx.send(32).unwrap();
    tx.send(16).unwrap();
});

th.join();

// Poll the channel, read data if available:
println!("n = {}", rx.recv().unwrap());
println!("n = {}", rx.recv().unwrap());
}

Or from multiple threads simultaneously:

#![allow(unused)]
fn main() {
use std::thread;
use std::sync::mpsc;

let (tx, rx) = mpsc::channel();
let mut vec = Vec::new();

for n in 0..8 {
    // Clone the Sender `tx` for each thread:
    let tx = tx.clone();
    vec.push(thread::spawn(move || {
        tx.send(n).unwrap();
    }));
}
for t in vec {
    t.join().unwrap();
}
for _ in 0..8 {
    // Consume the FIFO value-by-value:
    let value = rx.recv().unwrap();
    println!("Received value: {}", value);
}
}

Using an mpsc::channel, an Arc<T> and a Mutex (or an Atomic), implement a program which creates and run two independent threads:

  • A producer thread which continuously counts from 0 to infinity.
  • A consumer thread which continuously reads and prints the count produced by the producer thread.
  • The two threads should run for 5 seconds and then stop.
  • Hint: you can use thread::sleep to pause a thread for a certain amount of time.

Asynchronous programming

While programming with threads is a perfectly valid way of implementing concurrent programming, it also has a few disadvantages, such as having to rely on operating system scheduling, as well as sometimes making the code difficult to re-use or modify. To address these issues, programmers came up with a new way of structuring a program in a different set of tasks (whether they are independent, concurrent, or sequential), which has been called asynchronous programming.

Code within a thread is written in sequential style and the operating system executes them concurrently. With asynchronous programming, concurrency happens entirely within a program: the operating system is not involved, making context switch faster, and memory overhead also lower. By being natively integrated into a programming language, which is the case with Rust, it also makes control flow more flexible and expressive. For instance, a program using threads, like in the following example:

#![allow(unused)]
fn main() {
fn count_to(N: i32) {
    for n in 1..=N {
        println!("{n}");
        std::thread::sleep(std::time::Duration::from_secs(1));
    }
}
let t1 = std::thread::spawn(|| {
    count_to(10)
});
let t2 = std::thread::spawn(|| {
    count_to(20)
});
t1.join().unwrap();
t2.join().unwrap();
}

Could be also described like this using Rust's (with tokio) async and await features:

use tokio::{join, spawn, time::{sleep, Duration}};

async fn count_to(N: i32) {
    for n in 1..=N {
        println!("{n}");
        sleep(Duration::from_secs(1)).await;
    }
}

#[tokio::main]
async fn main() {
    // Run the following code expressions on a same task:
    join!(count_to(10), count_to(15));
}