David Beck

Thoughts

Follow on GitHub

Learning Rust: Sharing My Queue Between Threads

06 May 2016 by David Beck on [LinkedIn] / [Feed]

This is the fourth episode of my Learning Rust posts. In the previous ones I developed a lock-free queue for sharing between a single producer and single consumer. The queue itself looked OK, except that it couldn’t be shared between threads. Rust has strict checks about what and how can be shared and my implementation didn’t comply to these rules.

In this post I focus on this missing part. I have two goals here:

  1. make sure the code compiles when shared between threads
  2. ensure that the queue can only be used by a single producer and single consumer

multi threaded

CircularBuffer

As a reminder I copy here the data structure of the CircularBuffer that I want to share between threads:

struct CircularBuffer<T : Copy> {
  seqno       : AtomicUsize,        // the ID of the last written item
  data        : Vec<T>,             // (2*n)+1 preallocated elements
  size        : usize,              // n

  buffer      : Vec<AtomicUsize>,   // (positions+seqno)[]
  read_priv   : Vec<usize>,         // positions belong to the reader
  write_tmp   : usize,              // temporary position where the writer writes first
  max_read    : usize,              // reader's last read seqno
}

More information about the members, logic, implementation of this data structure can be found in my previous post.

Add Arc and UnsafeCell

I payed great attention to use the members of CircularBuffer in such a way to be thread safe, so no lock is needed if used by a single producer and a single consumer. In Rust this is not enough. I must explain this to the compiler otherwise it refuses to compile.

My first naive attempt, (which doesn’t ensure the SPSC property neither compiles), uses Arc and UnsafeCell. Arc allows me to wrap the object in a thread safe reference counted pointer. With UnsafeCell I say that it is my business and responsibility that the internals of CircularBuffer can be modified through multiple references (so please-please compiler believe me…).

I don’t think this is the right thing though. One of Rust’s greatest strengths is not in use here: being able to reason about the thread safety through language constructs. But anyways, here is the code:

let shared = Arc::new(UnsafeCell::new(CircularBuffer::new(4, 0 as i32)));

let t = thread::spawn(move|| {
  for i in 1..1000000 {
    (*shared.get()).put(|v| *v = i);
  }
});

for _k in 1..1000 {
  let mut prev = 0;
  for i in (*shared.get()).iter() {
    if i < prev { panic!("invalid value read!"); }
    prev = i;
  }
}

t.join().unwrap();

This doesn’t compile by the way, because I should be able to tell the compiler that I can safely share core::cell::UnsafeCell<spsc::CircularBuffer<i32>>:

dbeck$ cargo run
   Compiling rpg v0.1.0 (file:///Users/dbeck/work/rust_playground)
src/spsc/mod.rs:203:13: 203:26 error: the trait `core::marker::Sync` is not implemented for the type `core::cell::UnsafeCell<spsc::CircularBuffer<i32>>` [E0277]
src/spsc/mod.rs:203     let t = thread::spawn(move|| {
                                ^~~~~~~~~~~~~
src/spsc/mod.rs:203:13: 203:26 help: run `rustc --explain E0277` to see a detailed explanation
src/spsc/mod.rs:203:13: 203:26 note: `core::cell::UnsafeCell<spsc::CircularBuffer<i32>>` cannot be shared between threads safely
src/spsc/mod.rs:203:13: 203:26 note: required because it appears within the type `[closure@src/spsc/mod.rs:203:27: 209:6 shared:alloc::arc::Arc<core::cell::UnsafeCell<spsc::CircularBuffer<i32>>>]`
src/spsc/mod.rs:203:13: 203:26 note: required by `std::thread::spawn`
error: aborting due to previous error
Could not compile `rpg`.

To learn more, run the command again with --verbose.

I made several attempts to add the Sync marker to UnsafeCell<CircularBuffer<T>> without any success. I gave up at the end, mainly because even if I could, the result wouldn’t enforce the single producer-single consumer property. Without that my CircularBuffer is not thread safe.

Implementation

I looked at the official mpsc::channel implementation and adapted it to my needs. (Note that, there already exists an SPSC channel in the standard lib, with different design decisions.)

The first thing I needed, is a function that creates the producer-consumer pair:

pub fn channel<T: Copy + Send>(size : usize,
                               default_value : T) -> (Sender<T>, Receiver<T>) {
    let a = Arc::new(UnsafeCell::new(CircularBuffer::new(size, default_value)));
    (Sender::new(a.clone()), Receiver::new(a))
}

Then I needed to wrap Arc and UnsafeCell into the Sender and the Receiver. Plus I had to tell Rust that it is safe to Send them between threads:

pub struct Sender<T: Copy> {
  inner: Arc<UnsafeCell<CircularBuffer<T>>>,
}

pub struct Receiver<T: Copy> {
  inner: Arc<UnsafeCell<CircularBuffer<T>>>,
}

unsafe impl<T: Copy> Send for Sender<T> { }
unsafe impl<T: Copy> Send for Receiver<T> { }

The last step is to wrap the put and iter functions into the Sender and Receiver.

impl<T: Copy + Send> Sender<T> {
  fn new(inner: Arc<UnsafeCell<CircularBuffer<T>>>) -> Sender<T> {
    Sender { inner: inner, }
  }

  pub fn put<F>(&mut self, setter: F) -> usize
    where F : FnMut(&mut T)
  {
    unsafe { (*self.inner.get()).put(setter) }
  }
}

impl<T: Copy + Send> Receiver<T> {
  fn new(inner: Arc<UnsafeCell<CircularBuffer<T>>>) -> Receiver<T> {
    Receiver { inner: inner, }
  }

  pub fn iter(&mut self) -> CircularBufferIterator<T> {
    unsafe { (*self.inner.get()).iter() }
  }
}

SPSC

This code now compiles and works:

use std::thread;
use rpg::*;

let (mut tx, mut rx) = spsc::channel(7, 0 as i32);
let t = thread::spawn(move|| {
  for i in 1..1000000 {
    tx.put(|v| *v = i);
  }
});

for _k in 1..1000 {
  let mut prev = 0;
  for i in rx.iter() {
    if i < prev { panic!("invalid value read!"); }
    prev = i;
  }
}

t.join().unwrap();

So now I can share the queue between threads. The SPSC property is satisfied by the accessibility rules of the Sender, Receiver and CircularBuffer objects.

Rust version

$ rustc --version
rustc 1.8.0 (db2939409 2016-04-11)

Git repo

There is a github repo for this experiment series. The source code of this experiment is here.

Update: The SPSC channel is now released as lossyq crate.

Episodes of this series

  1. Closures
  2. Iterator
  3. Yet Another Lock-Free Queue
  4. Sharing My Queue Between Threads