Threadpools

Calculate the SHA256 of ISO files concurrently

threadpool num_cpus walkdir ring cat-concurrency cat-filesystem

This example calculates the SHA256 for every file with iso extension in the current directory. A threadpool generates threads equal to the number of cores present in the system found with num_cpus::get⮳. walkdir::WalkDir::new⮳ iterates the current directory and calls walkdir::WalkDir::new⮳ to perform the operations of reading and computing SHA256 hash.

use std::fs::File;
use std::io::BufReader;
use std::io::Error;
use std::io::Read;
use std::path::Path;
use std::sync::mpsc::channel;

use ring::digest::Context;
use ring::digest::Digest;
use ring::digest::SHA256;
use threadpool::ThreadPool;
use walkdir::WalkDir;

// Verify the iso extension
fn is_iso(entry: &Path) -> bool {
    matches!(entry.extension(), Some(e) if e.to_string_lossy().to_lowercase() == "iso")
}

fn compute_digest<P: AsRef<Path>>(filepath: P) -> Result<(Digest, P), Error> {
    let mut buf_reader = BufReader::new(File::open(&filepath)?);
    let mut context = Context::new(&SHA256);
    let mut buffer = [0; 1024];

    loop {
        let count = buf_reader.read(&mut buffer)?;
        if count == 0 {
            break;
        }
        context.update(&buffer[..count]);
    }

    Ok((context.finish(), filepath))
}

fn main() -> Result<(), Error> {
    let pool = ThreadPool::new(num_cpus::get());

    let (tx, rx) = channel();

    for entry in WalkDir::new("/home/user/Downloads")
        .follow_links(true)
        .into_iter()
        .filter_map(|e| e.ok())
        .filter(|e| !e.path().is_dir() && is_iso(e.path()))
    {
        let path = entry.path().to_owned();
        let tx = tx.clone();
        pool.execute(move || {
            let digest = compute_digest(path);
            tx.send(digest).expect("Could not send data!");
        });
    }

    drop(tx);
    for t in rx.iter() {
        let (sha, path) = t?;
        println!("{:?} {:?}", sha, path);
    }
    Ok(())
}

Draw a fractal, dispatching work to a thread pool

threadpool num num_cpus image cat-concurrency cat-science cat-rendering

This example generates an image by drawing a fractal from the Julia set⮳ with a thread pool for distributed computation.

julia-set

Allocate memory for output image of given width and height with image::ImageBuffer::new⮳. image::Rgb::from_channels⮳ calculates RGB pixel values. Create threadpool::ThreadPool⮳ with thread count equal to number of cores with num_cpus::get⮳. threadpool::ThreadPool::execute⮳ receives each pixel as a separate job.

std::sync::mpsc::channel⮳ receives the jobs and std::sync::mpsc::Receiver::recv⮳ retrieves them. image::ImageBuffer::put_pixel⮳ uses the data to set the pixel color. image::ImageBuffer::save⮳ writes the image to output.png.

use std::fs;
use std::sync::mpsc::channel;

use anyhow::Result;
use image::ImageBuffer;
use image::Rgb;
use num::complex::Complex;
use threadpool::ThreadPool;

// Function converting intensity values to RGB
fn wavelength_to_rgb(wavelength: u32) -> Rgb<u8> {
    let wave = wavelength as f32;
    let (r, g, b) = match wavelength {
        380..=439 => ((440. - wave) / (440. - 380.), 0.0, 1.0),
        440..=489 => (0.0, (wave - 440.) / (490. - 440.), 1.0),
        490..=509 => (0.0, 1.0, (510. - wave) / (510. - 490.)),
        510..=579 => ((wave - 510.) / (580. - 510.), 1.0, 0.0),
        580..=644 => (1.0, (645. - wave) / (645. - 580.), 0.0),
        645..=780 => (1.0, 0.0, 0.0),
        _ => (0.0, 0.0, 0.0),
    };
    let factor = match wavelength {
        380..=419 => 0.3 + 0.7 * (wave - 380.) / (420. - 380.),
        701..=780 => 0.3 + 0.7 * (780. - wave) / (780. - 700.),
        _ => 1.0,
    };
    let (r, g, b) = (
        normalize(r, factor),
        normalize(g, factor),
        normalize(b, factor),
    );
    Rgb([r, g, b])
}
// Maps Julia set distance estimation to intensity values
fn julia(
    c: Complex<f32>,
    x: u32,
    y: u32,
    width: u32,
    height: u32,
    max_iter: u32,
) -> u32 {
    let width = width as f32;
    let height = height as f32;
    let mut z = Complex {
        // scale and translate the point to image coordinates
        re: 3.0 * (x as f32 - 0.5 * width) / width,
        im: 2.0 * (y as f32 - 0.5 * height) / height,
    };
    let mut i = 0;
    for t in 0..max_iter {
        if z.norm() >= 2.0 {
            break;
        }
        z = z * z + c;
        i = t;
    }
    i
}
// Normalizes color intensity values within RGB range
fn normalize(color: f32, factor: f32) -> u8 {
    ((color * factor).powf(0.8) * 255.) as u8
}

fn main() -> Result<()> {
    let (width, height) = (1920, 1080);
    let mut img = ImageBuffer::new(width, height);
    let iterations = 300;

    let c = Complex::new(-0.8, 0.156);

    let pool = ThreadPool::new(num_cpus::get());
    let (tx, rx) = channel();

    for y in 0..height {
        let tx = tx.clone();
        pool.execute(move || {
            for x in 0..width {
                let i = julia(c, x, y, width, height, iterations);
                let pixel = wavelength_to_rgb(380 + i * 400 / iterations);
                tx.send((x, y, pixel)).expect("Could not send data!");
            }
        });
    }

    for _ in 0..(width * height) {
        let (x, y, pixel) = rx.recv()?;
        img.put_pixel(x, y, pixel);
    }
    if !fs::exists("temp")? {
        fs::create_dir("temp")?;
    }
    img.save("temp/output.png")?;
    println!("Image saved!");
    Ok(())
}