Shared-State Concurrency
Recipe | Crates | Categories |
---|---|---|
Maintain a global mutable state | ||
Mutexes | ||
parking_lot | ||
Atomics | ||
arc-swap |
Channels are similar to single ownership, because once you transfer a value down a channel, you should no longer use that value. Shared memory concurrency is like multiple ownership: multiple threads can access the same memory location at the same time.
The Rust standard library provides smart pointer types, such as Mutex<T>
and Arc<T>
, that are safe to use in concurrent contexts.
Maintain a global mutable state
Declare global state using lazy static
. lazy static
⮳ creates a globally available static ref
which requires a std::sync::Mutex
⮳ to allow mutation (also see std::sync::RwLock
⮳). The std::sync::Mutex
⮳ wrap ensures the state cannot be simultaneously accessed by multiple threads, preventing race conditions. A std::sync::MutexGuard
⮳ must be acquired to read or mutate the value stored in a std::sync::Mutex
⮳.
use std::sync::Mutex; use anyhow::Result; use anyhow::anyhow; use lazy_static::lazy_static; lazy_static! { static ref FRUIT: Mutex<Vec<String>> = Mutex::new(Vec::new()); } fn insert(fruit: &str) -> Result<()> { let mut db = FRUIT .lock() .map_err(|_| anyhow!("Failed to acquire MutexGuard"))?; db.push(fruit.to_string()); Ok(()) } fn main() -> Result<()> { insert("apple")?; insert("orange")?; insert("peach")?; { let db = FRUIT .lock() .map_err(|_| anyhow!("Failed to acquire MutexGuard"))?; db.iter().enumerate().for_each(|(i, item)| { println!("{}: {}", i, item); }); } insert("grape")?; Ok(()) }
Mutexes
Allow access to data from one thread at a time.
use std::sync::Arc; use std::sync::Mutex; use std::thread; fn main() { // We wrap Mutex in Arc to allow for multiple owners. // Arc<T> is safe to use in concurrent situations. let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { // `clone` is somewhat a misnomer; it creates another pointer to the // same Mutex, increasing the strong reference count. let counter = Arc::clone(&counter); let handle = thread::spawn( move || { let mut num = counter.lock().unwrap(); *num += 1; }, /* Releases the lock automatically when the MutexGuard * goes out of scope. */ ); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }
parking_lot
More compact and efficient implementations of the standard synchronization primitives.
parking_lot
⮳ provides implementations of parking_lot::Mutex
⮳, parking_lot::RwLock
⮳, parking_lot::Condvar
⮳ and parking_lot::Once
⮳ that are smaller, faster and more flexible than those in the Rust standard library. It also provides a parking_lot::ReentrantMutex
⮳ type.
std::sync::Mutex
works fine, but parking_lot
is faster.
use parking_lot::Once; use parking_lot::OnceState; // `Once` is a synchronization primitive which can be used to run a one-time // initialization. static INIT: Once = Once::new(); static mut VAL: usize = 0; // This function will only call `expensive_computation` once, and will // otherwise always return the value returned from the first invocation. fn get_cached_val() -> usize { // Accessing a `static mut` is unsafe much of the time, unless we do so // in a synchronized fashion (e.g. write once or read all) unsafe { // The given closure will be executed if this is the first time // call_once has been called. INIT.call_once(|| { VAL = expensive_computation(); println!("This is printed only once!"); }); // A closure has completed successfully. assert_eq!(INIT.state(), OnceState::Done); VAL } } fn expensive_computation() -> usize { // ... 42 } fn main() { // A closure has not been executed yet assert_eq!(INIT.state(), OnceState::New); for _ in 0..3 { assert_eq!(get_cached_val(), 42); } }
use parking_lot::RwLock; fn main() { let lock = RwLock::new(5); { println!("Many reader locks can be held at once"); let r1 = lock.read(); let r2 = lock.read(); assert_eq!(*r1, 5); assert_eq!(*r2, 5); println!("Read locks are dropped at this point"); } { println!("Only one write lock may be held, however"); let mut w = lock.write(); *w += 1; assert_eq!(*w, 6); println!("Write lock is dropped here"); } }
Atomics
Atomic types in std::sync::atomic
⮳ provide primitive shared-memory communication between threads, and are the building blocks of other concurrent types. It defines atomic versions of a select number of primitive types, including std::sync::atomic::AtomicBool
⮳, std::sync::atomic::AtomicIsize
⮳, std::sync::atomic::AtomicUsize
⮳, std::sync::atomic::AtomicI8
⮳, std::sync::atomic::AtomicU16
⮳, etc.
use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; static GLOBAL_THREAD_COUNT: AtomicUsize = AtomicUsize::new(0); fn main() { let old_thread_count = GLOBAL_THREAD_COUNT.fetch_add(1, Ordering::SeqCst); println!("live threads: {}", old_thread_count + 1); }
The most common way to share an atomic variable is to put it into an std::sync::Arc
⮳ (an atomically-reference-counted shared pointer).
crossbeam
⮳ also offers crossbeam::atomic::AtomicCell
⮳, a thread-safe mutable memory location. This type is equivalent to std::cell::Cell
⮳, except it can also be shared among multiple threads.
use crossbeam_utils::atomic::AtomicCell; fn main() { let a = AtomicCell::new(7); let v = a.into_inner(); assert_eq!(v, 7); println!("{}", v); }
arc-swap
The ArcSwap
type is a container for an Arc
that can be changed atomically. Semantically, it is similar to Atomic<Arc<T>>
(if there was such a thing) or RwLock<Arc<T>>
(but without the need for the locking). It is optimized for read-mostly scenarios, with consistent performance characteristics.