Pierre Cochard, Tanguy Risset
- Introduction
- Using Threads and Closures to Run Code Simultaneously
- Sharing data safely between threads
- Asynchronous programming
Introduction
Modern CPU architectures offer many ways to run high-performance programs. The most obvious is threading: because most CPUs have multiple cores, threads can be executed in parallel. A thread is a standard abstraction available in most programming languages and managed by the operating system. The Rust thread API is presented in Section 2, and the common tools used for distributed programming are presented in Section 3.
Another way to handle parallel tasks is to use concurrency, i.e., having a single process or thread switch between different tasks, so that no time is wasted when a task is idle. Rust provides the async mode to support concurrency. In async mode, Rust launches a runtime (sometimes called an executor) that can interrupt idle tasks and schedule tasks that are ready to run. The async model may use threading if the runtime is multithreaded by it is transparent to the programmer, conceptually it runs sequentially and implements a hidden state machine that regularly polls suspended tasks to determine whether they can resume. Several runtimes exist that support the async model. The most widely used is Tokio, which is introduced in Section 3.
Using Threads and Closures to Run Code Simultaneously
Course: Closures
Closures in Rust are anonymous functions (sometimes called lambda functions) that can be stored in variables, or passed to other processes as arguments. They can be found in a lot of places in the language, in order to allow functional-style programming, behavior customization or to provide concurrent/parallel code execution. They are defined by the following syntax |arguments| -> return_type { body }, for instance:
#![allow(unused)] fn main() { // Define a closure and store it into a variable: let my_closure = |x: i32| -> i32 { println!("my_closure"); x*2 }; // Execute the closure like you would normally do with a function: let y = my_closure(2); println!("{y}"); }
Borrowed captures
Just like in C++, closures can capture the environment it originates in and use external data in its own internal scope. By default, captured data, whether it is mutable or not, will be borrowed:
#![allow(unused)] fn main() { let x = 2; let my_closure = || -> i32 { // Use a borrowed (immutable) capture of 'x', // and return two times its value: x*2 }; println!("{}", my_closure()); }
#![allow(unused)] fn main() { // Same, but this time, we modify 'x' directly in the closure: let mut x = 2; // The closure itself also has to be made 'mutable' // in the case of a mutable borrow: let mut my_closure_mut = || x *= 2; my_closure_mut(); println!("{x}"); }
Moved captures
Instead of being borrowed, data can also be moved into a closure scope, using the move keyword before declaration. The semantic is the same as for assignment: if the type of the data implements the Copy Trait, the data is transmitted by value (as shown below for i32 type). if it does not, the data is moved (as shown in next question)
#![allow(unused)] fn main() { let mut x = 2; // Capturing 'x' by value. Here, it is made with a simple copy: let mut my_closure_mut = move || { x *= 2; println!("x (closure): {x}"); }; my_closure_mut(); println!("x: {x}"); }
Why is the following code invalid? How can we solve the issue?
#![allow(unused)] fn main() { let mut x = vec![31, 47, 27, 16]; let mut my_closure_mut = move || { x.push(32); println!("{:?}", x); }; my_closure_mut(); println!("{:?}", x); }
Correction
A Vec object does not implement the copy trait, so it is moved instead into the closure. The last println! macro call refers to an object that is not in scope anymore. Depending on what the user wants to eventually do with this code, there would be two ways to fix this, either:
1. Remove the move keyword, the Vec would be borrowed instead;
#![allow(unused)] fn main() { let mut x = vec![31, 47, 27, 16]; let mut my_closure_mut = || x.push(32); my_closure_mut(); println!("{:?}", x); }
2. Clone (deep copy) the Vec object and move it into the closure. This way, the first one would still be valid in the main scope.
#![allow(unused)] fn main() { let mut x = vec![31, 47, 27, 16]; let mut x2 = x.clone(); let mut my_closure_mut = move || { x2.push(32); x2 }; x = my_closure_mut(); println!("{:?}", x); }
Course: Passing closures as objects or arguments
One big specificity of closures is that they have a unique, anonymous type that cannot be written out. This can for instance be demonstrated by running the following piece of code:
#![allow(unused)] fn main() { // Utility function that prints out the type of a variable: fn print_type_of<T>(_: &T) { println!("Type of my closure: {}", std::any::type_name::<T>()); } let my_closure = |x: i32| -> i32 { x*2 }; print_type_of(&my_closure); }
Therefore, passing a closure as an argument to a function using a specific type is not possible in Rust. Instead, in order to do that, one would have to use a trait. Indeed, all closures implement one or several of the following traits, depending on their nature and properties:
FnOnce: applies to closures that can be called once. All closures implement this trait;Fn: applies to closures that don't move captured values out of their body and that don't mutate captured values, as well as closures that capture nothing from their environment. These closures can be called more than once without mutating their environment, which is important in cases such as calling a closure multiple times concurrently.FnMut: applies to closures that don't move captured values out of their body, but that might mutate the captured values. These closures can be called more than once.
In general:
-
If a closure moves a captured variable : we use
FnOnce. -
If a closure modifies a captured variable : we use
FnMut. -
If it only reads : we simply use
Fn.
A closure can then be passed as an argument the same way we do for passing trait-implementing objects, by using the Fn/FnOnce/FnMut(argument_type) -> return_type:
#![allow(unused)] fn main() { // All of the 'Fn' traits have the format: // 'Fn(argument-types) -> return types' fn exec_closure(x: i32, closure: impl Fn(i32) -> i32) -> i32 { closure(x) } let c = |x: i32| x + 27; let r = exec_closure(31, c); println!("{r}"); }
The generic form also works (and is usually preferrable):
#![allow(unused)] fn main() { fn exec_closure<T>(x: i32, closure: T) -> i32 where T: Fn(i32) -> i32 { closure(x) } let c = |x: i32| x + 27; let r = exec_closure(31, c); println!("{r}"); }
Using the generic form, store the following chirp_fnclosure as a member of thestructBird, with a valid type signature:
let chirp_fn = |times: i32| {
for _ in 0..times {
println!("chirp!");
}
}
// The struct to implement (use generics!):
struct Bird<...> {
chirp: ...
}
// Create a new instance of 'Bird' with the chirp_fn closure:
let bird = Bird { chirp: chirp_fn };
// Call the 'chirp' closure from inside the struct:
// operator precedence priority can be found here: https://doc.rust-lang.org/reference/expressions.html
(bird.chirp)(10);
Correction
#![allow(unused)] fn main() { struct Bird<F> where F: Fn(i32) -> () { chirp: F, } let chirp_fn = |times: i32| { for _ in 0..times { println!("chirp!"); } }; let bird = Bird { chirp: chirp_fn }; (bird.chirp)(10); }
Course: Spawning Threads using Closures
A std::thread object in Rust will execute a given closure in an independent (or parallel) context of execution. In the following example, the std::thread::spawn call will only return when it's done creating the thread, but not when the thread has actually finished executing:
#![allow(unused)] fn main() { // Spawn a new thread which will execute its given closure: std::thread::spawn(|| { println!("Thread 1: chirp!"); }); // At the 'same time', print something from the main thread: println!("Main thread: chirp chirp!"); }
In this case, the main function returns before the independent thread's println! call happens. This is why we can only see the "main thread" print output. Usually, a thread is bound to a local variable, and is waited upon before the parent context of execution finishes. This can be done by calling the .join() method on the thread handle:
#![allow(unused)] fn main() { // Bind the thread's "handle" to a variable: let th = std::thread::spawn(|| { println!("Thread 1: chirp!"); }); println!("Main thread: chirp chirp!"); // Wait for 'th' to finish executing, and re-synchronise both threads: th.join().unwrap(); }
The following code prints a modified value of variable varfrom 3 different threads, running independently from one another. Is the code safe? Can you guess what will be the resulting output?
use std::thread;
let mut var = 32;
let t1 = thread::spawn(move || {
var += 1;
println!("Thread 1: reading value {}!", var);
});
let t2 = thread::spawn(move || {
var += 2;
println!("Thread 2: reading value {}!", var);
});
var += 3;
println!("Main thread: reading value {}", var);
// Re-synchronise both threads:
t1.join().unwrap();
t2.join().unwrap();
Correction
Since we capture var by value in the two additional threads t1 and t2, a local copy of the variable is made in their respective scopes. This means that there is no actual concurrency at play in this code, which is perfectly safe. The output will be (in no particular order):
Main thread: reading value 35
Thread 1: reading value 33!
Thread 2: reading value 34!
What would happen if we removed all the movekeywords from the code?
Correction
move captures would be replaced by borrow captures, which wouldn't work: there would indeed be a a data race, since 3 different threads would be simultaneously overwriting the same data.
Sharing data safely between threads
Using Shared State data sets
Course: Exclusive access with Mutexes
Mutual exclusion, or mutex is a mechanism which prevents accessing the same data from multiple threads running at the same time. It relies on a locking system in order to do so: a thread must first ask to acquire the mutex's lock before being able to access the underlying protected data. The lock is a data structure that keeps track of whichever thread has exclusive access to the data. If the mutex happens to be locked at the time a thread tries to access the data, it will stall until the lock is eventually released, and the data is free to acquire.
#![allow(unused)] fn main() { use std::sync::Mutex; // Instantiate a new Mutex<i32> instance with the value '32': let var: Mutex<i32> = Mutex::new(32); { // Acquire the mutex's lock (and panic in case of failure): let mut v = var.lock().unwrap(); // Modify the value safely: *v += 32; } // Print the result: println!("var = {var:?}"); }
In the following example, we want to try to use a Mutexto get both threads to use and modifyvar, but the compiler doesn't allow it, what is the underlying issue here?
#![allow(unused)] fn main() { use std::thread; let mut var = 32; let t1 = thread::spawn(move || { var += 1; }); let t2 = thread::spawn(move || { var += 2; }); // Re-synchronise both threads: t1.join().unwrap(); t2.join().unwrap(); println!("Result: {var}"); }
Correction
The compiler complains that mtx is already moved in t1, and cannot be moved in another closure.
Here, the Rust move and ownership semantics apply, and using a Mutex makes no exception.
#![allow(unused)] fn main() { use std::thread; use std::sync::Mutex; let mtx = Mutex::new(32); let t1 = thread::spawn(move || { let mut v = mtx.lock().unwrap(); *v += 1; }); let t2 = thread::spawn(move || { let mut v = mtx.lock().unwrap(); *v += 2; }); // Re-synchronise both threads: t1.join().unwrap(); t2.join().unwrap(); println!("Result: {mtx:?}"); }
Course: Reference Counted Mutexes
As we could see in our previous example, a Mutex in itself is not sufficient to implement viable thread-safe data sharing:
- First is the issue of ownership, which could be solved using, for instance, a shared pointer.
- Second would be the issue of concurrency in accessing this shared pointer from multiple threads simultaneously.
In the Rust programming language, Atomic Reference Counting Arc<T>, which can be seen as an atomic shared pointer, is designed to remedy this very specific problem. It firstly solves the ownership issue by being "reference-counted", just like a standard Rc object, but also solves the concurrency issue by being atomic, meaning that is guaranteed to execute as a single unified transaction. When an atomic operation is executed on an object by a specific thread, no other threads can read or modify the object while the atomic operation is in progress. In other words, other threads will only see the object before or after the operation, there would be no intermediary state.
Our previous example can then be replaced by the following:
#![allow(unused)] fn main() { use std::thread; use std::sync::{Arc, Mutex}; // We wrap the Mutex in a Atomically Reference-Counted object: let arc = Arc::new(Mutex::new(32)); // We prepare two clones, for moving into the two distinct closures: let rc1 = Arc::clone(&arc); let rc2 = Arc::clone(&arc); let t1 = thread::spawn(move || { let mut v = rc1.lock().unwrap(); *v += 1; }); let t2 = thread::spawn(move || { let mut v = rc2.lock().unwrap(); *v += 2; }); // Re-synchronise both threads: t1.join().unwrap(); t2.join().unwrap(); println!("Result: {arc:?}"); }
If an Arcobject is sufficient to provide multiple ownership and thread-safe access to data, why do we still need aMutexguarding our data?
Correction
The atomic nature of an Arc means that the underlying reference-counted shared pointer (necessary in order to solve the ownership issue) (materialized by the Rc object), is thread-safe, but that does not extend to the data it points to. We still need a Mutex to guarantee thread-safe read/write accesses to the data itself. As we will see in the next subsection, we could also make the data itself atomic.
Course: Lock-free data sharing with Atomics
Another way of making data thread-safe is by directly using atomic data structures. The Rust standard library provides a few of them in its std::sync::atomic module. The main difference with using Mutexes is that atomics are what we call lock-free: unlike Mutexes, its underlying mechanism never sleeps, but it spins (it will check data availability in a continuous loop), that's why we usually call them spinlocks. In real-time contexts, where thread sleep is not an option, it is always preferrable to use lock-free data structures.
Our previous example would for instance look like this with atomics:
#![allow(unused)] fn main() { use std::thread; use std::sync::Arc; use std::sync::atomic::AtomicI32; use std::sync::atomic::Ordering; // We replace 'Mutex::new()' with the following: let arc = Arc::new(AtomicI32::new(32)); let rc1 = Arc::clone(&arc); let rc2 = Arc::clone(&arc); let t1 = thread::spawn(move || { // Acquire underlying data as a copy: let mut v = rc1.load(Ordering::Acquire); // Modify the copy: v += 1; // Update the value atomically, release the lock: rc1.store(v, Ordering::Release); }); let t2 = thread::spawn(move || { // Another (more compact) way of doing this: rc2.fetch_add(2, Ordering::AcqRel); }); // Re-synchronise both threads: t1.join().unwrap(); t2.join().unwrap(); println!("Result: {arc:?}"); }
Using Message passing to transfer data between threads
Another approach to multiple ownership and thread-safety in Rust would be using an mpsc::channel or mpsc::sync_channel, which are asynchronous/synchronous FIFO queues that store all the updated states of a value in a shared infinite buffer.
An mpsc::channel() call will return a tuple containing a handle to a Sender object and a Receiver object, which are by convention respectively named tx and rx. These two objects are positioned at each end of a FIFO which tunnels data between the two:
#![allow(unused)] fn main() { // Create a 'data channel' between a Sender `tx`, and a Receiver `rx`: let (tx, rx) = std::sync::mpsc::channel::<i32>(); // Send the values `32` and then `16` through the channel: tx.send(32).unwrap(); tx.send(16).unwrap(); // Poll the channel, read data if available: println!("n = {}", rx.recv().unwrap()); println!("n = {}", rx.recv().unwrap()); }
Example of sending from a separate thread:
#![allow(unused)] fn main() { use std::thread; use std::sync::mpsc; let (tx, rx) = mpsc::channel(); // Do the same thing in a separate thread: let th = thread::spawn(move || { tx.send(32).unwrap(); tx.send(16).unwrap(); }); th.join(); // Poll the channel, read data if available: println!("n = {}", rx.recv().unwrap()); println!("n = {}", rx.recv().unwrap()); }
Or from multiple threads simultaneously:
#![allow(unused)] fn main() { use std::thread; use std::sync::mpsc; let (tx, rx) = mpsc::channel(); let mut vec = Vec::new(); for n in 0..8 { // Clone the Sender `tx` for each thread: let tx = tx.clone(); vec.push(thread::spawn(move || { tx.send(n).unwrap(); })); } for t in vec { t.join().unwrap(); } for _ in 0..8 { // Consume the FIFO value-by-value: let value = rx.recv().unwrap(); println!("Received value: {}", value); } }
Using an mpsc::channel, anArc<T>and aMutex(or anAtomic), implement a program which creates and run two independent threads:
- A producer thread which continuously counts from 0 to infinity.
- A consumer thread which continuously reads and prints the count produced by the producer thread.
- The two threads should run for 5 seconds and then stop.
- Hint: you can use
thread::sleepto pause a thread for a certain amount of time.
Correction
#![allow(unused)] fn main() { use std::thread; use std::sync::mpsc; use std::sync::Arc; use std::sync::atomic::Ordering; use std::sync::atomic::AtomicBool; use std::time::Duration; let (tx, rx) = mpsc::channel(); let running = Arc::new(AtomicBool::new(true)); let run_arc1 = running.clone(); let run_arc2 = running.clone(); let pth = thread::spawn(move || { let mut n = 0; while run_arc1.load(Ordering::Acquire) { n += 1; tx.send(n).unwrap(); println!("Sending value: {n}"); } }); let cth = thread::spawn(move || { while run_arc2.load(Ordering::Acquire) { match rx.try_recv() { Ok(v) => { println!("Received value: {v}"); } Err(..) => () } } }); thread::sleep(Duration::from_secs(5)); running.store(false, Ordering::Release); pth.join().unwrap(); cth.join().unwrap(); }
Asynchronous programming
While programming with threads is a perfectly valid way of implementing concurrent programming, it also has a few disadvantages, such as having to rely on operating system scheduling, as well as sometimes making the code difficult to re-use or modify. To address these issues, programmers came up with a new way of structuring a program in a different set of tasks (whether they are independent, concurrent, or sequential), which has been called asynchronous programming.
Code within a thread is written in sequential style and the operating system executes them concurrently. With asynchronous programming, concurrency happens entirely within a program: the operating system is not involved, making context switch faster, and memory overhead also lower. By being natively integrated into a programming language, which is the case with Rust, it also makes control flow more flexible and expressive.
Futures and await: the Async syntax
Rust relies on two keywords async and await, as well as a few underlying concepts (such as futures) to implement asynchronous programming within the language.
Futures and .await
A future is a placeholder data structure for a future value, which is not ready/accessible when it is declared and defined in the first place. In Rust, a Future trait is provided as a building block for implementing async operations.
#![allow(unused)] fn main() { // in std::future module pub trait Future { type Output; // Required method: fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } }
Its core method poll() is called in attempt to resolve the future into a final value. The method is non-blocking, i.e. it will not block scheduling if its value is not yet 'ready'. Instead, it will be rescheduled to be called at a certain time in the future. A Poll result consists in two different value states:
Poll::Pending: the future is for now still unresolved.Poll::Ready(value): the future is resolved and its value accessible.
There are two other types associated with the poll(..) method: Pin<&mut Self>, which ensure that the object will not be moved in memory, and Context, which allows you to give information about when to reschedule a poll(). Both are both linked to the underlying async runtime context of execution and task scheduling. Fortunately, Rust provides an easier way to handle on all of that, through the .await suffix keyword, everything else is hidden behind the scenes:
#![allow(unused)] fn main() { let result = some_future.await; }
would be roughly expanded to something like this:
#![allow(unused)] fn main() { let result = match unsafe {Pin::new_unchecked(&mut some_future)}.poll(cx) { Poll::Ready(val) => val, Poll::Pending => return Poll::Pending } }
Under the hood, the process is the following:
- Future is passed to an executor (such as
tokio) - Executor calls
poll(future, cx)with its underlyingContextinstance - If
Poll::Ready, future is completed, value is retrieved - If
Poll::Pending: executor stops polling - When external events occur (I/O readiness, timers, etc.) future is woken up
- Executor polls future again
- Process repeats until
Ready
Async execution contexts
await expressions are only valid within a certain context of execution: they can be used within blocks and functions that are marked with the async keyword, to specify that they can be interrupted and resumed.
#![allow(unused)] fn main() { async fn some_fn() { let result = some_future.await; } }
When the Rust compiler encounters a block marked with async, it compiles it into a unique, anonymous data type that implements the Future trait. When it sees a function marked with async, it compiles it into a non-async function whose body is an async block. An async function's return type is the type of the anonymous data type the compiler creates for that async block.
Thus, writing async fn is equivalent to writing a function that returns a future of the return type.
#![allow(unused)] fn main() { fn some_fn() -> impl Future<Output = ()> { async move { let result = some_future.await; } } }
Async runtimes
While the async/await features are fully integrated to the Rust programming language, and fully understood by its compiler, Rust doesn't provide any standard runtime to execute an async program. Thus calling our function from main wouldn't work:
async fn some_fn() { let result = some_future.await; } async fn main() { some_fn().await; }
Instead, Rust relies on third-party implementations, such as:
tokio: most popular and used.smol: more lightweight and easy to understand.embassy: for embedded systems.glommio: for I/O-bound workloads.
For the rest of this course, we will use the tokio framework as our async runtime.
async fn some_fn() { // async code... } // The main function has to be annotated // in order to be async compatible: #[tokio::main] async fn main() { some_fn().await; }
Try to implement the "equivalent" program with async/await:use std::thread::{sleep, spawn}; use std::time::Duration; fn count_to(N: i32) { for n in 1..=N { println!("{n}"); sleep(Duration::from_secs(1)); } } fn main() { let t1 = spawn(|| { count_to(5) }); t1.join().unwrap(); println!("joined!"); }
Correction
use tokio::{spawn, time::{sleep, Duration}}; async fn count_to(N: i32) { for n in 1..=N { println!("{n}"); sleep(Duration::from_secs(1)).await; } } #[tokio::main] async fn main() { count_to(5).await; println!("joined!"); }
Spawning concurrent tasks
The example above works with a single task, but is not really interesting as it is.
In order to get parallel tasks running concurrently, tokio provides the spawn() function,
which takes an async closure as argument:
#[tokio::main] async fn main() { tokio::spawn(async move { my_async_fn("first task").await; }); tokio::spawn(async move { my_async_fn("second parallel task").await; }); }
async blocks and closures allow the move keyword, much like normal closures. An async move block will take ownership of the variables it references, allowing it to outlive the current scope, but giving up the ability to share those variables with other code.
Try now to wrap our previous count_to(5).awaitfunction call insidetokio::spawnas a closure. Notice anything? What is the problem here? How can we fix it?
Correction
the tokio::spawn function returns a future, wrapped in a JoinHandle, so we still need to join, by calling .await (and .unwrap()):
use tokio::{spawn, time::{sleep, Duration}}; async fn count_to(N: i32) { for n in 1..=N { println!("{n}"); sleep(Duration::from_secs(1)).await; } } #[tokio::main] async fn main() { tokio::spawn(async move { count_to(5).await; }).await.unwrap(); println!("joined!"); }
Let's now improve our count_tofunction by setting thesleepduration as a variable parameter, and add a start offset as well. We could do this for instance by using astd::ops::Rangeinstead of ai32for the parameterN:#![allow(unused)] fn main() { async fn count_to(R: Range<i32>, sleep_dur: Duration) {...} }Implement the modified version of the function, and its usage inside the
mainfunction. Try to also spawn another function call with different parameters. Are the two tasks now running in parallel?
Correction
No, they're still running sequentially. We need something extra to make them parallel, like the join! macro:
use std::ops::Range; use tokio::{join, spawn, time::{Duration, sleep}}; async fn count_to(R: Range<i32>, sleep_dur: Duration) { for n in R { println!("{n}"); sleep(sleep_dur).await; } } #[tokio::main] async fn main() { // We remove the .await.unwrap() calls, // so that the tasks can be joined afterwards together: let task_1 = tokio::spawn(async move { count_to(0..5, Duration::from_secs(2)).await; }); let task_2 = tokio::spawn(async move { count_to(5..10, Duration::from_secs(1)).await; }); let (t1, t2) = join!(task_1, task_2); t1.unwrap(); t2.unwrap(); println!("joined!"); }
Shared states & Message passing
The process of sharing states & passing messages in async contexts is similar to the one seen above for threads, with a few specificities, which can be dependent on the runtime that is used. tokio, for instance, provides a few primitives, such as async Mutex or mpsc channels (multiple-producer, single consumer), the tutorial explains well how to use them:
Transform the counter example by using tokio(with a mutex or with channels) to share the count of one task with another
Correction
use tokio::{join, sync::mpsc, time::{Duration, sleep}}; #[tokio::main] async fn main() { let (tx, mut rx) = mpsc::channel::<i32>(10); let task_1 = tokio::spawn(async move { for n in 0..5 { tx.send(n).await.unwrap(); sleep(Duration::from_secs(1)).await; } }); let task_2 = tokio::spawn(async move { loop { if rx.is_closed() { break; } match rx.recv().await { Some(n) => { println!("Counting: {n}"); } _ => () } } }); let (t1, t2) = join!(task_1, task_2); t1.unwrap(); t2.unwrap(); println!("joined!"); }
Async and networking
Networking is an ideal candidate for async-based programs, since there's usually no certainty on the timing of their inputs and overall order of events. tokio has in its libraries everything needed for basic TCP and UDP exchanges:
use std::error::Error; use tokio::{net::{TcpListener, TcpStream}}; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { let addr = "127.0.0.1:8047"; // Bind a TCP listener to addr: let listener = TcpListener::bind(addr).await?; let streams = tokio::spawn(async move { // Connect to same addr: TcpStream::connect(addr) .await .unwrap(); println!("Connected!"); }); // Wait for a client to connect: let (..) = listener.accept().await?; streams.await?; Ok(()) }
With help from the example above and the tokiotutorial and documentation, write a simple echo example between a TCP server and a single client.
Correction
use std::error::Error; use tokio::{io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}}; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { let addr = "127.0.0.1:8047"; // Bind a TCP listener to addr: let listener = TcpListener::bind(addr).await?; let streams = tokio::spawn(async move { let mut stream = TcpStream::connect(addr) .await .unwrap(); let mut buf = vec![0; 128]; loop { let n = stream.read(&mut buf).await.unwrap(); if n == 0 { break; } println!("Received {:?}", String::from_utf8_lossy(&buf[..n])); } }); // Wait for a client to connect: let (mut socket, ..) = listener.accept().await?; tokio::spawn(async move { socket.write_all(b"echo!\n").await.unwrap(); }); streams.await?; Ok(()) }