Concurrent Data Structures

Refer to the comparative benchmarks of concurrent HashMaps⮳.

dashmap

dashmap dashmap-crates.io dashmap-github dashmap-lib.rscat-algorithms cat-concurrency cat-data-structures

dashmap⮳ is a fast concurrent HashMap i.e. a concurrent associative array.

dashmap⮳ tries to be a direct replacement for RwLock<HashMap<K, V>>.

It allows multiple threads to concurrently read and write to the map with minimal contention, using a technique called "shard-based" or "bucket-based" concurrency. This makes dashmap a good choice when you need a hash map that can be accessed frequently by multiple threads without significant performance bottlenecks.

//! Demonstrates the use of `DashMap` for concurrent data access and
//! modification.
//!
//! This example showcases how to create a shared `DashMap`, access and modify
//! it from multiple threads, and perform various operations like insertion,
//! retrieval, modification, and removal.

use std::sync::Arc;
use std::thread;

use dashmap::DashMap;

fn main() {
    // Create a shared DashMap with an Arc.
    let map: Arc<DashMap<&str, i32, _>> = Arc::new(DashMap::new());
    // Alternatively, you can use `DashMap::with_capacity(20)` to pre-allocate
    // space.

    // Create multiple threads.
    let mut threads = Vec::new();
    for i in 0..4 {
        let map_clone = map.clone();
        let thread_id = i;
        threads.push(thread::spawn(move || {
            // Access and modify the map from each thread.
            match thread_id {
                0 => {
                    map_clone.insert("key1", thread_id);
                    println!("Thread {} inserted key1", thread_id);
                }
                1 => {
                    map_clone.insert("key2", thread_id);
                    println!("Thread {} inserted key2", thread_id);
                }
                2 => match map_clone.get("key1") {
                    Some(value) => {
                        println!("Thread {} read key1: {}", thread_id, *value);
                    }
                    _ => {
                        println!("Thread {} couldn't find key1", thread_id);
                    }
                },
                3 => match map_clone.get_mut("key2") {
                    Some(mut value) => {
                        *value += 10;
                        println!(
                            "Thread {} incremented key2 value to {}",
                            thread_id, *value
                        );
                    }
                    _ => {
                        println!("Thread {} couldn't find key2", thread_id);
                    }
                },
                _ => panic!("Unknown thread ID"),
            }
        }));
    }

    // Wait for all threads to finish.
    for thread in threads {
        thread.join().unwrap();
    }

    // Remove "key1" and assert its value.
    assert_eq!(map.remove("key1").unwrap().1, 0); // `remove` returns Option<(K, V)>.

    // Check if "key2" exists.
    assert!(map.contains_key("key2"));

    // Remove "key2" if its value is 11.
    map.remove_if("key2", |_, val| *val == 11);

    // Access the final state of the map from the main thread.
    println!("final count: {}", map.iter().count());
}

Bounded Multi-producer Multi-consumer Queue

crossbeam-queue-website crossbeam-queue crossbeam-queue-crates.io crossbeam-queue-github crossbeam-queue-lib.rs cat-concurrency cat-data-structures cat-no-std

crossbeam-queue provides various concurrent queue implementations in Rust, designed for efficient and safe communication between threads. It offers different queue types optimized for various use cases, including single-producer/single-consumer, multi-producer/multi-consumer, and bounded/unbounded queues. These queues are essential for building concurrent data structures and message-passing systems, enabling threads to exchange data without race conditions or memory safety issues.

use crossbeam_queue::ArrayQueue;

/// Demonstrates the basic usage of `ArrayQueue`.
fn main() {
    // Create a new `ArrayQueue` with a capacity of 2.
    let q = ArrayQueue::new(2);
    // Push 'a' onto the queue. This should succeed.
    assert_eq!(q.push('a'), Ok(()));
    // Push 'b' onto the queue. This should also succeed.
    assert_eq!(q.push('b'), Ok(()));
    assert_eq!(q.push('c'), Err('c'));
    assert_eq!(q.pop(), Some('a'));
    println!("{:?}", q.pop());
}

flurry

flurry flurry-crates.io flurry-github flurry-lib.rs cat-concurrency cat-data-structures

flurry is a concurrent hash table designed for high performance. It allows fully concurrent reads and highly concurrent updates. Its main type is functionally very similar to std::collections::HashMap. Its implementation is closely based on Java's java.util.concurrent.ConcurrentHashMap. Even though all operations on the map are thread-safe and operate on shared references, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access (doc).

use std::sync::Arc;
use std::thread;

use flurry::HashMap;

/// This example demonstrates the basic usage of the `flurry::HashMap` for
/// concurrent data access. It showcases how to insert, read, remove, and
/// iterate over the map, both sequentially and concurrently.
fn main() {
    // Create a new Flurry HashMap.
    // Wrap in an Arc, so that it can be shared between threads
    let map = Arc::new(HashMap::new());
    {
        // Many of the access methods take a reference to a Guard.
        // You obtain a Guard using `HashMap::guard`, and you can
        // use references to the same guard to make one or more API
        // calls.
        let guard = map.guard();
        map.insert("key0".to_string(), 0, &guard);
        // Keep in mind that for as long as you hold onto this Guard, you are
        // preventing the collection of garbage generated by the map.
    }
    {
        // Alternatively, get a reference to the map with the current thread
        // pinned.
        assert!(!map.pin().is_empty());
        // Keep in mind that for as long as you hold onto this, you are
        // preventing the collection of garbage generated by the map.

        // You can also re-use a map pin like so:
        let mref = map.pin();

        // Insert key-value pair into the map.
        // If the map did not have this key present, None is returned.
        // If the map did have this key present, the value is updated,
        // and the old value is returned. The key is left unchanged.
        mref.insert("key1".to_string(), 1);
        mref.insert("key2".to_string(), 2);
        mref.insert("key3".to_string(), 3);

        // Read values.
        // Retrieval operations generally do not block, so may overlap with
        // update operations (including insert). Retrievals reflect the
        // results of the most recently completed update operations
        // holding upon their onset.
        println!("Value for key1: {:?}", mref.get(&"key1".to_string())); // Output: Some(1)
        println!("Value for key4: {:?}", mref.get(&"key4".to_string())); // Output: None

        println!("\nRemoving key0");
        mref.remove(&"key0".to_string());
        println!(
            "Value for key0 after removal: {:?}",
            mref.get(&"key0".to_string())
        );

        println!("\nChecking if key2 exists");
        println!("{}", mref.contains_key(&"key2".to_string()));

        // Operations that inspect the map as a whole, rather than a single key,
        // operate on a snapshot of the underlying table. For example, iterators
        // return elements reflecting the state of the hash table at some point
        // at or since the creation of the iterator.

        // Iterate over the map (read-only).
        println!("Iterating over the map:");
        for (key, value) in mref.iter() {
            println!("{}: {}", key, value);
        }

        // Aggregate status methods like `len` are typically useful only when a
        // map is not undergoing concurrent updates in other threads.
        assert!(mref.len() == 3);
    }

    let m = map.clone();
    let writer_thread = thread::spawn(move || {
        for i in 4..=5 {
            m.pin().insert(format!("key{}", i), i);
            // Simulate some work
            thread::sleep(std::time::Duration::from_millis(10));
        }
    });

    let m2 = map.clone();
    let reader_thread = thread::spawn(move || {
        for _ in 1..=2 {
            // Read multiple times
            println!("Reading from concurrent reader:");
            for (key, value) in m2.pin().iter() {
                println!("{}: {}", key, value);
            }
            thread::sleep(std::time::Duration::from_millis(10));
        }
    });

    writer_thread.join().unwrap();
    reader_thread.join().unwrap();

    // Final state of the map.
    println!("\nFinal state of the map:");
    for (key, value) in map.pin().iter() {
        println!("{}: {}", key, value);
    }
}

papaya

papaya papaya-crates.io papaya-github papaya-lib.rs cat-algorithms cat-concurrency cat-data-structures

papaya offers a fast and ergonomic concurrent hash-table for read-heavy workloads.

  • Ergonomic lock-free API - no more deadlocks!
  • Powerful atomic operations.
  • Seamless usage in async contexts.
  • Extremely scalable, low-latency reads (see performance).
  • Predictable latency across all operations.
  • Efficient memory usage, with garbage collection powered by seize. (doc)
use papaya::HashMap;

// `papaya` is a concurrent hash-table for read-heavy workloads.
// It avoids the overhead and deadlock potential of traditional locks by using
// "pinning." Pins act like lock guards.

// Use a map from multiple threads.
fn main() {
    // Create a map.
    let map = HashMap::new();
    std::thread::scope(|s| {
        // Insert some values.
        s.spawn(|| {
            // Pin the map.
            // Pinning and unpinning the table is relatively cheap but not free.
            // Therefore, pin reuse is encouraged, within reason, noting that,
            // as long as you are holding on to a pin, you
            // are preventing the map from performing garbage collection.
            let m = map.pin();
            for i in 'A'..='Z' {
                // Inserts a key-value pair into the map.
                // If the map did not have this key present, None is returned.
                // If the map did have this key present, the value is updated,
                // and the old value is returned.
                m.insert(i, 1);
            }
        });

        // Remove the values.
        // Note that the map is accessed from multiple threads.
        s.spawn(|| {
            let m = map.pin();
            for i in 'A'..='Z' {
                m.remove(&i);
            }
        });

        // Read the values.
        // Note that
        // - Read and write operations may overlap in time.
        // - There is no support for locking the entire table nor individual
        //   keys
        // to prevent concurrent access, except through external fine-grained
        // locking.
        // - Read operations (such as get) reflect the results of
        // the most-recent write.
        s.spawn(|| {
            for (key, value) in map.pin().iter() {
                println!("{key}: {value}");
            }
        });

        // Atomic operations:
        let m = map.pin();
        // Updates an existing entry or inserts a default value atomically.
        m.update_or_insert('#', |e| e + 1, 1);
        // In this case, the key did not exist and 1 was inserted.
        assert_eq!(m.get(&'#'), Some(&1));
        // Updates an existing entry atomically.
        assert_eq!(m.update('#', |e| e + 1), Some(&2));
    });
    // Example adapted from https://docs.rs/papaya/latest/papaya/
}

Related Topics

  • Async.
  • Async Channels.
  • Data Structures.
  • Global Static.
  • Rust Patterns.
  • Send and Sync.
  • Shared State.