1brc

2024-01-19

The 1brc introduces a simple task, but takes it to large proportions. The challenge is to read 1 billion rows of data onweather stations temperatures as fast as possible. The data consists of a station name and a temperature value. The temperatures are a range from 99.9 to -99.9.

Originally created as a task for Java developers, the 1brc has been taken on by developers in other languages as well. I decided to use Rust, because it's parsing capabilities in it's std library are great, and Rust allows for large amounts of control of threads. It's also a statically typed language that I am familiar with.

A brief on the Implementation.

Throughout my iterations of the program, some things were able to stay consistant. On such item was the Station Data struct.

#[derive(PartialEq, Debug, Clone)]
pub struct StationData {
    count: usize,
    min: i64,
    max: i64,
    sum: i64,
}

impl StationData {
    pub fn new(temp: i64) -> Self {
        Self {
            count: 1,
            min: temp,
            max: temp,
            sum: temp,
        }
    }

    pub fn add_temp_data(&mut self, temperature: i64) {
        self.min = self.min.min(temperature);
        self.max = self.max.max(temperature);
        self.sum += temperature;
        self.count += 1;
    }

    fn calculate_mean(&self) -> i64 {
        return self.sum / self.count as i64;
    }
}

impl Display for StationData {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "{:.1}/{:.1}/{:.1}",
            self.min as f32 / 10.,
            self.max as f32 / 10.,
            self.calculate_mean() as f32 / 10.
        )
    }
}

The strategy for this struct is simple, temperatures will be represented as an int value, and only turned into their decimal representation after all the stations are accouted for. Most hardware is optimized to handle float arithemtic, so the benefits after the string has been parsed to a number are minimal, if not slightly slower on some hardware. The benifit of this strategy is converting a string to an intiger, wich is much faster than converting a string to a float.

Another thing that remained consitent was the line parser. The line parse I implemented looks like the following.

Line parsing.

struct Reader<'a> {
    buf: &'a [u8],
    pos: usize,
}

impl<'a> Reader<'a> {
    fn new(buf: &'a [u8]) -> Self {
        return Self { buf, pos: 0 };
    }

    fn read_str(&mut self) -> &str {
        let mut last = self.pos;

        while last < self.buf.len() && self.buf[last] != b';' {
            last += 1;
        }

        let str = unsafe { str::from_utf8_unchecked(&self.buf[self.pos..last]) };
        self.pos = last + 1;
        return str;
    }

    fn read_temp(&mut self) -> i64 {
        let mut temp = 0;
        let neg: bool;

        if self.has_remaining() && self.buf[self.pos] == b'-' {
            self.pos += 1;
            neg = true;
        } else {
            neg = false;
        }

        while self.has_remaining() && self.buf[self.pos] != 0xA {
            debug_assert!(!(self.pos >= self.buf.len()));
            if self.buf[self.pos] != b'.' {
                temp = temp * 10 + (self.buf[self.pos] & 15) as i64;
            }
            self.pos += 1;
        }
        self.pos += 1;
        if neg {
            temp = -1 * temp;
        }
        return temp;
    }

    fn has_remaining(&self) -> bool {
        return self.pos < self.buf.len();
    }
}

I played around with changing the read_str() function, trying it with just extracting a &[u8] (a byte slice), and the shown string slice. However there did not seem to be any perfomance gains. Probably because I am extracting the string value with the lossy function and the string is not getting evaluated until it is printed to the terminal, but that is speculation.

Buffer parsing

Our last consistent component is a parse_data() function. This function takes in a buffer, and sorts the outputs into a hashmap.

fn parse_data(reader: &mut Reader, stations: &mut HashMap<String, StationData>) {
    while reader.has_remaining() {
        let station_name = reader.read_str();
        if let Some(station) = stations.get_mut(station_name) {
            let temp = reader.read_temp();
            station.add_temp_data(temp);
        } else {
            let station_name = station_name.to_string();
            let temp = reader.read_temp();
            let station = StationData::new(temp);
            stations.insert(station_name, station);
        }
    }
}

While there are some changes of this function throughout my iterations, it functionally remained the same and only changed to comply with rust's borrow checker.

Single threaded attempt

This brings us to my first attempt at parsing the entire data set. My first 'real' attempt that is semi-respectable was single threaded and sync.

fn main() {
    let start = Instant::now();
    let path = get_data_path();
    let mut file = File::open(path).unwrap();
    let mut offset = 0;
    let mut stations = HashMap::new();

    let mut buf = [0; CHUNK_SIZE];
    loop {
        file.seek(std::io::SeekFrom::Start(offset)).unwrap();
        let read = file.read(&mut buf).unwrap();
        let mut last = 0;

        for (idx, ch) in buf.iter().rev().enumerate() {
            if ch == &0xA {
                last = CHUNK_SIZE - idx;
                offset = offset + last as u64;
                break;
            }
        }

        let mut reader = Reader::new(&buf[0..last]);

        while reader.has_remaining() {
            parse_data(&mut reader, &mut stations);
        }

        if read != CHUNK_SIZE {
            break;
        }
    }

    file.seek(std::io::SeekFrom::Start(offset)).unwrap();
    let read = file.read(&mut buf).unwrap();
    let mut reader = Reader::new(&buf[..read]);
    while reader.has_remaining() {
        parse_data(&mut reader, &mut stations);
    }

    let mut all: Vec<_> = stations.into_iter().collect();
    all.sort_unstable_by(|a, b| a.0.cmp(&b.0));

    for (station_name, station_data) in all.iter() {
        println!("{}={}", station_name, station_data);
    }

    let end = Instant::now();

    println!("time elapsed {}", end.duration_since(start).as_millis());
}

The thought process is pretty simple. The file is nearly 16Gb, the entirety of my RAM. So lets break the file into chunks, parsing each chunk one at a time and update the hash map, sort the data and print it to the console.

Sadly, but expectantly the program takes about 22 seconds to parse. Thankfully, there are a lot of performance optimizations left on the table.

Mutli threaded approach

To reap the benifits of a multithreaded approach, we first need to tell each thread what work it needs to do. The inital read of the file is done to obtainin an alignment and split the thread into chunks. This will alter be used to give each thread the work it needs to accomplish

async fn align_chunks(file: &mut File, file_len: u64) -> Vec<u64> {
    let mut offset = CHUNK_SIZE;
    let mut scan_buf = [0; PEEK];
    let mut chunk_offsets = vec![0];

    while offset < file_len {
        file.seek(std::io::SeekFrom::Start(offset)).unwrap();
        file.read(&mut scan_buf).unwrap();

        let mut found_delimeter = false;
        for (idx, ch) in scan_buf.iter().rev().enumerate() {
            // branch on line feed char
            if *ch == 0xA {
                found_delimeter = true;
                // subtract the offset
                offset += (PEEK - idx) as u64;

                // push the delimeter into the vector.
                chunk_offsets.push(offset);

                // advance the file
                offset += CHUNK_SIZE;
                break;
            }
        }
        assert!(found_delimeter);
    }

    let mut prev = chunk_offsets[0];

    // turn the index of the delimeters into a iterator of distances.
    let mut out: Vec<u64> = chunk_offsets
        .iter()
        .skip(1)
        .map(move |curr| {
            let test = curr - prev;
            prev = *curr;
            return test;
        })
        .collect();

    // if we failed to align the last chunk, which is likely, push it to the output.
    if offset != file_len {
        let last = chunk_offsets[chunk_offsets.len() - 1];
        out.push(file_len - last);
    }

    // reset the file back to start.
    file.seek(SeekFrom::Start(0)).unwrap();

    return out;
}

Then once we are in the thread's runtime, we need to load in the chunks from the file.

async fn read_chunks<'a>(
    file: &mut File,
    delims_iter: &mut Peekable<Box<std::slice::Iter<'a, u64>>>,
) -> Vec<RefCell<Reader>> {
    let mut chunks: Vec<RefCell<Reader>> = Vec::new();

    for _ in 0..THREAD_COUNT {
        let len: usize;
        if let Some(delim) = delims_iter.next() {
            len = *delim as usize;
        } else {
            break;
        }

        let mut buf = vec![0; len];

        file.read_exact(&mut buf).unwrap();
        let reader = Reader::new(buf);
        chunks.push(RefCell::new(reader));
    }

    return chunks;
}

Once we have the chunks read in, we are good to parse it.

async fn parse_chunks(
    chunks: &mut Vec<RefCell<Reader>>,
    stations: &Arc<RwLock<HashMap<Vec<u8>, Arc<Mutex<StationData>>>>>,
) {
    let mut futs = Vec::new();

    for chunk in chunks.iter() {
        let stations = stations.clone();
        let mut chunk = chunk.take();

        let handle = std::thread::spawn(move || {
            let mut t_stations = HashMap::new();

            parse_chunk(&mut chunk, &mut t_stations);

            let mut stations_read = stations.read().unwrap();

            for (name, data) in t_stations {
                if let Some(station) = stations_read.get(&name) {
                    let mut station = station.lock().unwrap();
                    station.combine(&data);
                } else {
                    drop(stations_read);
                    stations
                        .write()
                        .unwrap()
                        .insert(name, Arc::new(Mutex::new(data)));
                    stations_read = stations.read().unwrap();
                };
            }
        });

        futs.push(handle);
    }

    for fut in futs {
        fut.join().unwrap();
    }
}

The main flaw in this strategy is that we first need to read in each thread's chunk before we can spawn the thread to do work. This leads to a lot of time lost waiting for I/O while the threads could be already running.

The main function remained relatively similar, but wrapped the stations in a RwLock and each station in a Mutex.

#[tokio::main]
async fn main() {
    let path = get_data_path();
    let start = Instant::now();

    let mut file = File::open(path.clone()).unwrap();

    let file_len = file.metadata().unwrap().len();

    let stations: Arc<RwLock<HashMap<Vec<u8>, Arc<Mutex<StationData>>>>> =
        Arc::new(RwLock::new(HashMap::new()));

    let delims = align_chunks(&mut file, file_len).await;

    let mut delims_iter = Box::new(delims.iter()).peekable();

    let mut total = 0;

    while delims_iter.peek().is_some() {
        let mut chunks = read_chunks(&mut file, &mut delims_iter).await;

        total += chunks.len();

        parse_chunks(&mut chunks, &stations).await;
    }

    print_out(stations);

    println!(
        "total chunks: {}, processed chunks: {}",
        delims.len(),
        total
    );

    println!(
        "Elapsed: {} ms",
        Instant::now().duration_since(start).as_millis()
    );
}

The execution for this implementation is consistantly in the low 9 second range. Better! But there is still more that can be improved on... I also took a performance hit because the read_str() function in the line parsing changed to be a Vector instead of a slice. I did that to keep the borrow checker quiete while working through the implementation and porting to an async runtime. This had a huge performance impact that we will see a little later on.

Threads that dispatch immediately

So here is where my knowledge on async rust starts to show it's weakpoints. Namely, trying to implement an iterator that can be shared across threads. It is similar to a SPMC (single producer multi consumer) channel in a sense, but once it is created it can not be written to any longer.

#[derive(Clone)]
struct AlignmentStream {
    inner: Pin<Vec<Alignment>>,
    idx: Arc<Mutex<usize>>,
}

impl AlignmentStream {
    fn new(inner: Vec<Alignment>) -> Self {
        return Self {
            inner: Pin::new(inner),
            idx: Arc::new(Mutex::new(0)),
        };
    }
}

impl Stream for AlignmentStream {
    type Item = Alignment;

    fn poll_next(
        self: std::pin::Pin<&mut Self>,
        _: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        let mut idx = self.idx.lock().unwrap();

        if *idx < self.inner.len() {
            let out = self.inner[*idx].clone();
            *idx += 1;
            return Poll::Ready(Some(out));
        } else {
            return Poll::Ready(None);
        }
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        return (*self.idx.lock().unwrap(), Some(self.inner.len()));
    }
}

This is a little bit hacky, and could probably be done without the futures crate's Stream trait, but it originally was trying to create an actualy SPMC channel, but decided the benifit did not outweight the gains. To chunk the file only took 40 ms, so the time it would save was marginal at best.

If you look closely, we also have an Alignment struct inside the inner field. That struct contains the bounds of the chunk, and is just there for readability.

#[derive(Clone)]
struct Alignment {
    start: u64,
    end: u64,
}

impl Alignment {
    fn start(&self) -> SeekFrom {
        return SeekFrom::Start(self.start);
    }

    fn end(&self) -> SeekFrom {
        return SeekFrom::Start(self.end);
    }
}

Now we have the tool to communicate to the thread. We just need a tool for the thread to communicate back to the main thread. For that, I decided on a MPSC (multi-producer single-consumer).

fn dispatch_thread(
    chunks: &AlignmentStream,
    tx: &SyncSender<ChannelSignal>,
    path: String,
) -> JoinHandle<()> {
    let tx = tx.clone();
    let path = path.clone();
    let mut chunks = chunks.clone();
    tokio::spawn(async move {
        loop {
            let mut buf: Vec<u8>;
            if let Some(chunk) = chunks.next().await {
                let mut file = File::open(&path).await.unwrap();
                file.seek(chunk.start()).await.unwrap();
                let size = (chunk.end - chunk.start) as usize;
                buf = vec![0; size];
                file.read_exact(&mut buf).await.unwrap();
            } else {
                tx.send(ChannelSignal::End).unwrap();
                return;
            }

            let mut reader = Reader::new(buf);
            let mut stations = StationMap::new();
            parse_chunk(&mut reader, &mut stations);
            tx.send(ChannelSignal::Data(stations)).unwrap();
        }
    })
}
#[tokio::main]
async fn main() {
    let path = get_data_path();
    let start = Instant::now();

    let mut file = File::open(path.clone()).await.unwrap();

    let file_len = file.metadata().await.unwrap().len();

    let mut stations = StationMap::new();
    let delims = align_chunks(&mut file, file_len).await;

    let chunks = AlignmentStream::new(delims);

    let (tx, rx) = std::sync::mpsc::sync_channel(THREAD_COUNT * 4);
    let mut thread_pool = Vec::new();

    for _ in 0..THREAD_COUNT {
        thread_pool.push(dispatch_thread(&chunks, &tx, path.clone()));
    }

    let mut ackd = 0;

    while ackd < THREAD_COUNT {
        let data = rx.recv().unwrap();
        match data {
            ChannelSignal::Data(data) => {
                for station in &data.inner {
                }
                stations.combine(data);
            }
            ChannelSignal::End => {
                ackd += 1;
            }
        }
    }

    print_out(stations);


    println!(
        "Elapsed: {} ms",
        Instant::now().duration_since(start).as_millis()
    );
}

This change gave us a significant boost to a ~6.5 second run.

Last tweaks

Remember that I changed the string slice into a vector to get the borrow checker to leave me alone while I ported to a async runtime? Well lets undo that change, migrate into Tokio (A rust async runtime), and perform some other minor changes for clarity. The Tokio migration did not get us any real performance benifit, but some of the utility functions that tokio gives you are really nice to work with. The full final code looks like this:

use std::fmt::{Display, Formatter};
use std::io::{Read, Seek};
use std::sync::{mpsc::SyncSender, Arc, Mutex};
use std::{io::SeekFrom, pin::Pin, str, task::Poll, time::Instant, usize};

use futures::{Stream, StreamExt};

use hashbrown::HashMap;

use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio::{fs::File, runtime::Builder, task::JoinHandle};

const CHUNK_SIZE: u64 = 1024 * 1024 * 32;
const THREAD_COUNT: usize = 12;
const PEEK: usize = 100;

fn main() {
    let mut ackd = 0;
    let mut total = 0;
    let mut chunks_processed = 0;

    let path = get_data_path();
    let start = Instant::now();

    let mut file = std::fs::File::open(&path).unwrap();
    let file_len = file.metadata().unwrap().len();

    let mut stations = StationMap::new();
    let chunks = AlignmentStream::new(align_chunks(&mut file, file_len));

    let (tx, rx) = std::sync::mpsc::sync_channel(THREAD_COUNT * 4);
    let mut thread_pool = Vec::new();

    let mut builder = Builder::new_multi_thread();
    builder.worker_threads(THREAD_COUNT);
    builder.enable_all();

    let rt = builder.build().unwrap();

    rt.block_on(async {
        for _ in 0..THREAD_COUNT {
            thread_pool.push(dispatch_thread(&chunks, &tx, &path).await);
        }
    });

    while ackd < THREAD_COUNT {
        let data = rx.recv().unwrap();
        match data {
            ChannelSignal::Data(data) => {
                for station in &data.inner {
                    total += station.1.count;
                }
                chunks_processed += 1;
                stations.combine(data);
            }
            ChannelSignal::End => {
                ackd += 1;
            }
        }
    }
    print_out(stations);
    println!(
        "\r\nTotal Elapsed time: {} ms",
        Instant::now().duration_since(start).as_millis()
    );

    if total != 1_000_000_000 {
        println!("\r\n\t*** WARNING: Did not parse all 1bn rows. If you're not testing on the full data set, disreguard.\r\trows parsed: {total}\r\n")
    }

    assert_eq!(chunks_processed, chunks.inner.len());
}

async fn dispatch_thread(
    chunks: &AlignmentStream,
    tx: &SyncSender<ChannelSignal>,
    path: &str,
) -> JoinHandle<()> {
    let tx = tx.clone();
    let mut file = File::open(&path).await.unwrap();
    let mut chunks = chunks.clone();
    tokio::spawn(async move {
        loop {
            let mut buf: Vec<u8>;
            // read in a chunk if one is queued. else, return END signal
            if let Some(chunk) = chunks.next().await {
                file.seek(chunk.start()).await.unwrap();
                let size = (chunk.end - chunk.start) as usize;
                buf = vec![0; size];
                file.read_exact(&mut buf).await.unwrap();
            } else {
                tx.send(ChannelSignal::End).unwrap();
                return;
            }

            let mut reader = Reader::new(buf);
            let mut stations = StationMap::new();
            parse_chunk(&mut reader, &mut stations);
            tx.send(ChannelSignal::Data(stations)).unwrap();
        }
    })
}

fn align_chunks(file: &mut std::fs::File, file_len: u64) -> Vec<Alignment> {
    let mut offset = CHUNK_SIZE;
    let mut scan_buf = [0; PEEK];
    let mut chunk_offsets = vec![0];

    while offset < file_len {
        file.seek(std::io::SeekFrom::Start(offset)).unwrap();
        file.read(&mut scan_buf).unwrap();

        let mut found_delimeter = false;
        for (idx, ch) in scan_buf.iter().rev().enumerate() {
            // branch on line feed char
            if *ch == 0xA {
                found_delimeter = true;
                // subtract the offset
                offset += (PEEK - idx) as u64;

                // push the delimeter into the vector.
                chunk_offsets.push(offset);

                // advance the file
                offset += CHUNK_SIZE;
                break;
            }
        }
        assert!(found_delimeter);
    }

    let mut prev = chunk_offsets[0];

    // turn the index of the delimeters into a iterator of distances.
    let mut out: Vec<Alignment> = chunk_offsets
        .iter()
        .skip(1)
        .map(|curr| {
            let align = Alignment {
                start: prev,
                end: *curr,
            };
            prev = *curr;
            return align;
        })
        .collect();

    // if we failed to align the last chunk, which is likely, push it to the output.
    if offset != file_len {
        let last = chunk_offsets[chunk_offsets.len() - 1];
        out.push(Alignment {
            start: last,
            end: file_len,
        })
    }

    // reset the file back to start.
    file.seek(SeekFrom::Start(0)).unwrap();

    return out;
}

fn parse_chunk(reader: &mut Reader, stations: &mut StationMap) {
    while reader.has_remaining() {
        let station_name = reader.read_station_name();

        if let Some(station) = stations.get_mut(station_name) {
            let temp = reader.read_temp();
            station.add_temp_data(temp);
        } else {
            let station_name = station_name.to_owned();
            let station = StationData::new(reader.read_temp());
            stations.insert(station_name, station);
        }
    }
}

fn print_out(stations: StationMap) {
    let all = stations.inner;
    let mut all: Vec<_> = all.into_iter().map(|x| (x.0, x.1)).collect();

    all.sort_unstable_by(|a, b| a.0.cmp(&b.0));

    for (station_name, station_data) in all.into_iter() {
        println!("{}={station_data}", unsafe {
            str::from_utf8_unchecked(&station_name)
        })
    }
}

pub fn get_data_path() -> String {
    let args = std::env::args().collect::<Vec<String>>();
    let source = args.get(1).unwrap_or_else(|| {
        println!("Please provide a file path");
        std::process::exit(1);
    });
    return "data/".to_string() + source + ".txt";
}

#[derive(Clone)]
struct AlignmentStream {
    inner: Pin<Vec<Alignment>>,
    idx: Arc<Mutex<usize>>,
}

impl AlignmentStream {
    fn new(inner: Vec<Alignment>) -> Self {
        return Self {
            inner: Pin::new(inner),
            idx: Arc::new(Mutex::new(0)),
        };
    }
}

impl Stream for AlignmentStream {
    type Item = Alignment;

    // hackey solution to get an iterator to be Send + Sync
    fn poll_next(
        self: std::pin::Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        let mut idx = self.idx.lock().unwrap();

        if *idx < self.inner.len() {
            let out = self.inner[*idx];
            *idx += 1;
            return Poll::Ready(Some(out));
        } else {
            return Poll::Ready(None);
        }
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        return (*self.idx.lock().unwrap(), Some(self.inner.len()));
    }
}

enum ChannelSignal {
    End,
    Data(StationMap),
}

#[derive(Copy, Clone)]
struct Alignment {
    start: u64,
    end: u64,
}

impl Alignment {
    fn start(&self) -> SeekFrom {
        return SeekFrom::Start(self.start);
    }
}

#[derive(Default)]
struct Reader {
    buf: Vec<u8>,
    pos: usize,
}

impl Reader {
    fn new(buf: Vec<u8>) -> Self {
        return Self { buf, pos: 0 };
    }

    fn read_station_name(&mut self) -> &[u8] {
        let mut last = self.pos;
        while self.has_remaining() && self.buf[last] != 0x3B {
            last += 1;
        }

        let str = &self.buf[self.pos..last];
        self.pos = last + 1;
        return str;
    }

    fn read_temp(&mut self) -> i64 {
        let mut temp = 0;
        let neg: bool;

        assert!(self.has_remaining());
        if self.buf[self.pos] == b'-' {
            self.pos += 1;
            neg = true;
        } else {
            neg = false;
        }

        while self.has_remaining() && self.buf[self.pos] != 0xA {
            if self.buf[self.pos] != b'.' {
                temp = temp * 10 + (self.buf[self.pos] & 15) as i64;
            }
            self.pos += 1;
        }

        self.pos += 1;
        if neg {
            temp = -1 * temp;
        }
        return temp;
    }

    fn has_remaining(&self) -> bool {
        return self.pos < self.buf.len();
    }
}

#[derive(PartialEq, Debug)]
pub struct StationData {
    count: usize,
    min: i64,
    max: i64,
    sum: i64,
}

struct StationMap {
    inner: HashMap<Vec<u8>, StationData>,
}

impl StationMap {
    fn new() -> Self {
        return Self {
            inner: HashMap::new(),
        };
    }

    fn get_mut(&mut self, key: &[u8]) -> Option<&mut StationData> {
        return self.inner.get_mut(key);
    }

    fn combine(&mut self, other: Self) {
        for (name, data) in other.inner {
            if let Some(station) = self.inner.get_mut(&name) {
                station.combine(&data);
            } else {
                self.insert(name, data);
            }
        }
    }

    fn insert(&mut self, name: Vec<u8>, data: StationData) {
        self.inner.insert(name, data);
    }
}

impl StationData {
    pub fn new(temp: i64) -> Self {
        Self {
            count: 1,
            min: temp,
            max: temp,
            sum: temp,
        }
    }

    pub fn add_temp_data(&mut self, temperature: i64) {
        self.count += 1;
        self.min = self.min.min(temperature);
        self.max = self.max.max(temperature);
        self.sum += temperature;
    }

    fn calculate_mean(&self) -> i64 {
        return self.sum / self.count as i64;
    }

    fn combine(&mut self, other: &Self) {
        self.count += other.count;
        self.min = self.min.min(other.min);
        self.max = self.max.max(other.max);
        self.sum += other.sum;
    }
}

impl Display for StationData {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "{:.1}/{:.1}/{:.1}",
            self.min as f32 / 10.,
            self.max as f32 / 10.,
            self.calculate_mean() as f32 / 10.,
        )
    }
}

There are some things I can clean up, and sure I overengineered a couple spots, but hey. I am doing this as a hobby. I want it to stay fun, not feel like a chore. The final time is ~3.5 seconds.

Summary

Allocations are expensive, channels are fast, and get threads working as soon as possible.