The Concurrent State of Rust

A Little About Me

Chris Griffing

Full Stack Web Developer in Bellevue at Fresh Consulting

Rust Novice Alert!!!

I have a small node.js api I am rewriting in various backend languages. So far, Rust and Go. Elixir(/Phoenix) and Swift are next on my list.

Tango

Presentation made with Tango

https://github.com/pnkfelix/tango

Markdown to .rs file conversion and back. The Rust lives in the markdown in a code block, the markdown lives in the rust code as specially formatted comments.

Some History

"When Rust first began, it baked channels directly into the language, taking a very opinionated stance on concurrency." -Aaron Turon

https://blog.rust-lang.org/2015/04/10/Fearless-Concurrency.html

"... we had M:N threading for a long time and went to great pains to remove it. ..." -Patrick Walton

https://news.ycombinator.com/item?id=11369170

Some Setup

The Operation

Here is an arbitrary function we will be using to demonstrate how to use the various techniques.

use std::time::Duration;
use std::thread::sleep;

pub fn the_operation(handle: &str, duration: u64) -> &str {
    //println!("Starting {}", handle);
    sleep(Duration::from_millis(duration));
    //println!("Finishing {}", handle);
    return handle;
}

A Quick Test

use test::Bencher;

#[test]
fn testing_the_operation() {
    let v1 = the_operation("Hello", 3);
    let v2 = the_operation("World", 2);
    let result = format!("{} {}!", v1, v2);
    assert_eq!("Hello World!", result);
}

#[bench]
fn benchmark_the_operation(b: &mut Bencher) -> () {
    b.iter(|| {
        return testing_the_operation();
    })
}

Standard Library

Send/Sync

These traits are the fundamental building blocks of pretty much everything else I will talk about today.

https://doc.rust-lang.org/nomicon/send-and-sync.html

  • "A type is Send if it is safe to send it to another thread."

  • "A type is Sync if it is safe to share between threads (&T is Send)."

  • "... unlike every other trait, if a type is composed entirely of Send or Sync types, then it is Send or Sync."

Threads

Kernel/OS level threads. Limited by the threading capabilities of the underlying hardware.

Sharing Data - Arc and Mutex

Arc (Atomic Reference Counter) allows us to share data between threads. However, the data is inherently immutable.

A Mutex (Mutual Exclusion) allows us to lock the data as it is being written.

We can wrap our value with a Mutex and then wrap that with an Arc. Mutexes become tricky if a thread panics after locking.

Joining Threads

use time::PreciseTime;
use time::Duration;
use std::thread;
use slide_01_setup::the_operation;
use test::Bencher;

#[test]
fn testing_join() {
    let start = PreciseTime::now();

    let thread1 = thread::spawn(move || the_operation("Hello", 3));
    let thread2 = thread::spawn(move || the_operation("World", 2));

    let mut values = Vec::new();

    values.push(thread1.join().unwrap());
    values.push(thread2.join().unwrap());

    let result = format!("{} {}!", values[0], values[1]);

    assert_eq!("Hello World!", result);

    let end = PreciseTime::now();
    assert!(start.to(end) < Duration::seconds(5));
}

#[bench]
fn benchmark_join(b: &mut Bencher) -> () {
    b.iter(|| {
        return testing_join();
    })
}

Channels

A unidirectional data flow from sender to receiver. Basically, message passing.

// use time::PreciseTime;
use std::sync::mpsc::channel;
// use std::thread;
// use slide_01_setup::the_operation;

#[test]
fn testing_channels() {
    let start = PreciseTime::now();

    let (tx, rx) = channel();
    let tx2 = tx.clone();

    thread::spawn(move || {
        tx.send(the_operation("Hello", 3)).unwrap();
    });
    thread::spawn(move || {
        tx2.send(the_operation("World", 2)).unwrap();
    });

    let mut values = Vec::new();

    values.push(rx.recv().unwrap());
    values.push(rx.recv().unwrap());

    // Also cheating by knowing the order of the responses ahead of time. Could have the function return a struct that has an index value for sorting the results.
    let result = format!("{} {}!", values[1], values[0]);

    assert_eq!("Hello World!", result);

    let end = PreciseTime::now();
    assert!(start.to(end) < Duration::seconds(5));
}

#[bench]
fn benchmark_channels(b: &mut Bencher) -> () {
    b.iter(|| {
        return testing_channels();
    })
}

Utility Libraries - Quick Mentions

Crossbeam

https://github.com/aturon/crossbeam

  • Non-blocking data structures - stacks, queues, dequeues, bags, sets and maps

  • Scoped threads - Can share stack data among child threads

Some others

  • Simple_parallel - "this is not recommended for general use" - quoted form the docs

https://github.com/huonw/simple_parallel

  • Syncbox - Queues and Thread Pools (Used to contain Futures but that was abstracted away)

https://github.com/carllerche/syncbox

Coroutines

Libraries

Mioco

Built on top of Mio. Mio stands for Metal I/O. It just received $30k for api enhancements from the Mozilla Foundation.

http://hermanradtke.com/2015/07/12/my-basic-understanding-of-mio-and-async-io.html

use time::PreciseTime;
use time::Duration;
use mioco;
use mioco::Mioco;
use mioco::sync::mpsc::channel;
use slide_01_setup::the_operation;
use test::Bencher;

#[test]
fn testing_mioco_coroutines() {
    Mioco::new()
        .start(move || {
            let start = PreciseTime::now();

            let (tx, rx) = channel();
            let tx2 = tx.clone();

            mioco::spawn(move || {
                tx.send(the_operation("Hello", 3)).unwrap();
            });

            mioco::spawn(move || {
                tx2.send(the_operation("World", 2)).unwrap();
            });

            let mut values = Vec::new();

            values.push(rx.recv().unwrap());
            values.push(rx.recv().unwrap());

            let result = format!("{} {}!", values[1], values[0]);
            assert_eq!("Hello World!", result);

            let end = PreciseTime::now();
            assert!(start.to(end) < Duration::seconds(5));

        })
        .unwrap();
}

#[bench]
fn benchmark_mioco_coroutines(b: &mut Bencher) -> () {
    b.iter(|| {
        return testing_mioco_coroutines();
    })
}

Futures

Formerly in Standard Library

Deprecated since 1.2.0 https://doc.rust-lang.org/1.2.0/std/sync/struct.Future.html

Maybe this helps explain why: https://www.reddit.com/r/rust/comments/2m64o5/stdsyncfuture_is_almost_useless_for_async/

Eventual

use time::PreciseTime;
use time::Duration;
use eventual::*;
use slide_01_setup::the_operation;
use test::Bencher;

#[test]
fn testing_futures() {
    let start = PreciseTime::now();

    let future1 = Future::spawn(|| the_operation("Hello", 3));

    let future2 = Future::spawn(|| the_operation("World", 2));

    let res = join((future1, future2))
        .and_then(|(v1, v2)| Ok(format!("{} {}!", v1, v2)))
        .await()
        .unwrap();

    assert_eq!("Hello World!", res);

    let end = PreciseTime::now();
    assert!(start.to(end) < Duration::seconds(5));
}

#[bench]
fn benchmark_futures(b: &mut Bencher) -> () {
    b.iter(|| {
        return testing_futures();
    })
}

Promises

Similar to futures. Key Differences:

  • Single-threaded

It is pretty much like working with Node.js

https://github.com/dwrensha/gj

An Example

Warning: Not async, determined by ~5 second run time. Not sure why, but definitely user error.

// use time::PreciseTime;
use gj::{EventLoop, Promise, ClosedEventPort};
// use slide_01_setup::the_operation;

#[test]
fn testing_promises() {
    EventLoop::top_level(|wait_scope| -> Result<(), ()> {
        let start = PreciseTime::now();

        let mut promises = Vec::new();

        let promise1 = Promise::<(), ()>::ok(());
        promises.push(promise1.then(|_| Promise::ok(the_operation("Hello", 3))));

        let promise2 = Promise::<(), ()>::ok(());
        promises.push(promise2.then(|_| Promise::ok(the_operation("World", 2))));

        let values = Promise::all(promises.into_iter())
            .wait(wait_scope, &mut ClosedEventPort(()))
            .unwrap();

        let result = format!("{} {}!", values[0], values[1]);
        assert_eq!("Hello World!", result);

        let end = PreciseTime::now();
        assert!(start.to(end) < Duration::seconds(5));

        Ok(())
    })
    .expect("top level");
}

#[bench]
fn benchmark_promises(b: &mut Bencher) -> () {
    b.iter(|| {
        return testing_promises();
    })
}

Async/Await

Two simple macros built on top of Eventual's Futures. Saved for last because the code really does feel like the endgame syntax for async operations.

use time::PreciseTime;
use time::Duration;
use async_await::*;
use slide_01_setup::the_operation;
use test::Bencher;

#[test]
fn testing_async_await() {
    let start = PreciseTime::now();

    let value1 = async!{the_operation("Hello", 3)};
    let value2 = async!{the_operation("World", 2)};

    let result = format!("{} {}!", await!(value1), await!(value2));
    assert_eq!("Hello World!", result);

    let end = PreciseTime::now();
    assert!(start.to(end) < Duration::seconds(5));
}

#[bench]
fn benchmark_async_await(b: &mut Bencher) -> () {
    b.iter(|| {
        return testing_async_await();
    })
}

Thanks.

Import slide modules for testing