Concurrent Data Structures
Recipe | Crates | Categories |
---|---|---|
Bounded Multi-producer Multi-consumer Queue | ||
dashmap | ||
flurry | ||
papaya |
Refer to the comparative benchmarks of concurrent HashMaps⮳.
dashmap
dashmap
⮳ is a fast concurrent HashMap
i.e. a concurrent associative array.
dashmap
⮳ tries to be a direct replacement for RwLock<HashMap<K, V>>
.
It allows multiple threads to concurrently read and write to the map with minimal contention, using a technique called "shard-based" or "bucket-based" concurrency. This makes dashmap
⮳ a good choice when you need a hash map that can be accessed frequently by multiple threads without significant performance bottlenecks.
//! Demonstrates the use of `DashMap` for concurrent data access and
//! modification.
//!
//! This example showcases how to create a shared `DashMap`, access and modify
//! it from multiple threads, and perform various operations like insertion,
//! retrieval, modification, and removal.
use std::sync::Arc;
use std::thread;
use dashmap::DashMap;
fn main() {
// Create a shared DashMap with an Arc.
let map: Arc<DashMap<&str, i32, _>> = Arc::new(DashMap::new());
// Alternatively, you can use `DashMap::with_capacity(20)` to pre-allocate
// space.
// Create multiple threads.
let mut threads = Vec::new();
for i in 0..4 {
let map_clone = map.clone();
let thread_id = i;
threads.push(thread::spawn(move || {
// Access and modify the map from each thread.
match thread_id {
0 => {
map_clone.insert("key1", thread_id);
println!("Thread {} inserted key1", thread_id);
}
1 => {
map_clone.insert("key2", thread_id);
println!("Thread {} inserted key2", thread_id);
}
2 => match map_clone.get("key1") {
Some(value) => {
println!("Thread {} read key1: {}", thread_id, *value);
}
_ => {
println!("Thread {} couldn't find key1", thread_id);
}
},
3 => match map_clone.get_mut("key2") {
Some(mut value) => {
*value += 10;
println!(
"Thread {} incremented key2 value to {}",
thread_id, *value
);
}
_ => {
println!("Thread {} couldn't find key2", thread_id);
}
},
_ => panic!("Unknown thread ID"),
}
}));
}
// Wait for all threads to finish.
for thread in threads {
thread.join().unwrap();
}
// Remove "key1" and assert its value.
assert_eq!(map.remove("key1").unwrap().1, 0); // `remove` returns Option<(K, V)>.
// Check if "key2" exists.
assert!(map.contains_key("key2"));
// Remove "key2" if its value is 11.
map.remove_if("key2", |_, val| *val == 11);
// Access the final state of the map from the main thread.
println!("final count: {}", map.iter().count());
}
Bounded Multi-producer Multi-consumer Queue
crossbeam-queue
⮳ provides various concurrent queue implementations in Rust, designed for efficient and safe communication between threads. It offers different queue types optimized for various use cases, including single-producer/single-consumer, multi-producer/multi-consumer, and bounded/unbounded queues. These queues are essential for building concurrent data structures and message-passing systems, enabling threads to exchange data without race conditions or memory safety issues.
use crossbeam_queue::ArrayQueue; /// Demonstrates the basic usage of `ArrayQueue`. fn main() { // Create a new `ArrayQueue` with a capacity of 2. let q = ArrayQueue::new(2); // Push 'a' onto the queue. This should succeed. assert_eq!(q.push('a'), Ok(())); // Push 'b' onto the queue. This should also succeed. assert_eq!(q.push('b'), Ok(())); assert_eq!(q.push('c'), Err('c')); assert_eq!(q.pop(), Some('a')); println!("{:?}", q.pop()); }
flurry
flurry
⮳ is a concurrent hash table designed for high performance. It allows fully concurrent reads and highly concurrent updates. Its main type is functionally very similar to std::collections::HashMap
⮳. Its implementation is closely based on Java's java.util.concurrent.ConcurrentHashMap
. Even though all operations on the map are thread-safe and operate on shared references, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access (doc).
use std::sync::Arc; use std::thread; use flurry::HashMap; /// This example demonstrates the basic usage of the `flurry::HashMap` for /// concurrent data access. It showcases how to insert, read, remove, and /// iterate over the map, both sequentially and concurrently. fn main() { // Create a new Flurry HashMap. // Wrap in an Arc, so that it can be shared between threads let map = Arc::new(HashMap::new()); { // Many of the access methods take a reference to a Guard. // You obtain a Guard using `HashMap::guard`, and you can // use references to the same guard to make one or more API // calls. let guard = map.guard(); map.insert("key0".to_string(), 0, &guard); // Keep in mind that for as long as you hold onto this Guard, you are // preventing the collection of garbage generated by the map. } { // Alternatively, get a reference to the map with the current thread // pinned. assert!(!map.pin().is_empty()); // Keep in mind that for as long as you hold onto this, you are // preventing the collection of garbage generated by the map. // You can also re-use a map pin like so: let mref = map.pin(); // Insert key-value pair into the map. // If the map did not have this key present, None is returned. // If the map did have this key present, the value is updated, // and the old value is returned. The key is left unchanged. mref.insert("key1".to_string(), 1); mref.insert("key2".to_string(), 2); mref.insert("key3".to_string(), 3); // Read values. // Retrieval operations generally do not block, so may overlap with // update operations (including insert). Retrievals reflect the // results of the most recently completed update operations // holding upon their onset. println!("Value for key1: {:?}", mref.get(&"key1".to_string())); // Output: Some(1) println!("Value for key4: {:?}", mref.get(&"key4".to_string())); // Output: None println!("\nRemoving key0"); mref.remove(&"key0".to_string()); println!( "Value for key0 after removal: {:?}", mref.get(&"key0".to_string()) ); println!("\nChecking if key2 exists"); println!("{}", mref.contains_key(&"key2".to_string())); // Operations that inspect the map as a whole, rather than a single key, // operate on a snapshot of the underlying table. For example, iterators // return elements reflecting the state of the hash table at some point // at or since the creation of the iterator. // Iterate over the map (read-only). println!("Iterating over the map:"); for (key, value) in mref.iter() { println!("{}: {}", key, value); } // Aggregate status methods like `len` are typically useful only when a // map is not undergoing concurrent updates in other threads. assert!(mref.len() == 3); } let m = map.clone(); let writer_thread = thread::spawn(move || { for i in 4..=5 { m.pin().insert(format!("key{}", i), i); // Simulate some work thread::sleep(std::time::Duration::from_millis(10)); } }); let m2 = map.clone(); let reader_thread = thread::spawn(move || { for _ in 1..=2 { // Read multiple times println!("Reading from concurrent reader:"); for (key, value) in m2.pin().iter() { println!("{}: {}", key, value); } thread::sleep(std::time::Duration::from_millis(10)); } }); writer_thread.join().unwrap(); reader_thread.join().unwrap(); // Final state of the map. println!("\nFinal state of the map:"); for (key, value) in map.pin().iter() { println!("{}: {}", key, value); } }
papaya
papaya
⮳ offers a fast and ergonomic concurrent hash-table for read-heavy workloads.
- Ergonomic lock-free API - no more deadlocks!
- Powerful atomic operations.
- Seamless usage in async contexts.
- Extremely scalable, low-latency reads (see performance).
- Predictable latency across all operations.
- Efficient memory usage, with garbage collection powered by
seize
⮳. (doc)
use papaya::HashMap; // `papaya` is a concurrent hash-table for read-heavy workloads. // It avoids the overhead and deadlock potential of traditional locks by using // "pinning." Pins act like lock guards. // Use a map from multiple threads. fn main() { // Create a map. let map = HashMap::new(); std::thread::scope(|s| { // Insert some values. s.spawn(|| { // Pin the map. // Pinning and unpinning the table is relatively cheap but not free. // Therefore, pin reuse is encouraged, within reason, noting that, // as long as you are holding on to a pin, you // are preventing the map from performing garbage collection. let m = map.pin(); for i in 'A'..='Z' { // Inserts a key-value pair into the map. // If the map did not have this key present, None is returned. // If the map did have this key present, the value is updated, // and the old value is returned. m.insert(i, 1); } }); // Remove the values. // Note that the map is accessed from multiple threads. s.spawn(|| { let m = map.pin(); for i in 'A'..='Z' { m.remove(&i); } }); // Read the values. // Note that // - Read and write operations may overlap in time. // - There is no support for locking the entire table nor individual // keys // to prevent concurrent access, except through external fine-grained // locking. // - Read operations (such as get) reflect the results of // the most-recent write. s.spawn(|| { for (key, value) in map.pin().iter() { println!("{key}: {value}"); } }); // Atomic operations: let m = map.pin(); // Updates an existing entry or inserts a default value atomically. m.update_or_insert('#', |e| e + 1, 1); // In this case, the key did not exist and 1 was inserted. assert_eq!(m.get(&'#'), Some(&1)); // Updates an existing entry atomically. assert_eq!(m.update('#', |e| e + 1), Some(&2)); }); // Example adapted from https://docs.rs/papaya/latest/papaya/ }
Related Topics
- Async.
- Async Channels.
- Data Structures.
- Global Static.
- Rust Patterns.
- Send and Sync.
- Shared State.