David Beck

Thoughts

Follow on GitHub

Learning Rust: Yet Another Lock Free Queue

01 May 2016 by David Beck on [LinkedIn] / [Feed]
submit to reddit

The third episode of my Rust experiments further develops the CircularBuffer example. In the previous episodes I played with closures and iterator for the CircularBuffer.

This post changes the iterator to use slices rather then holding a reference to the CircularBuffer. (The slice based idea comes form Shepmaster from this stack overflow discussion. Thank you.) The other change is that I added a level of indirection to support lock free operations.

bite me

Design choices

This is yet another single publisher, single consumer queue. (I know there is one in the core library too.)

My queue implementation is based on a fix sized buffer. When the queue is full, the writer starts overwriting past elements. The writer doesn’t get blocked, and the buffer won’t be extended. This way the writer is not affected by the reader’s speed and the buffer size doesn’t grow without bounds. Memory allocation only happens when the buffer is created.

The reader can get an iterator to the queue and reading is done through this iterator. The read position is stored in the CircularBuffer data structure, so data can only be delivered at most once.

Here the reader cannot wait for being notified when new elements arrived. I believe these are separate concerns:

  • the notification
  • the data storage

I only take care of the latter because I think once the data storage functionality is implemented, it is easy to add condition variables or any other way on top of this to help the reader waiting for new data.

The new CircularBuffer

struct CircularBuffer<T : Copy> {
  seqno       : AtomicUsize,        // the ID of the last written item
  data        : Vec<T>,             // (2*n)+1 preallocated elements
  size        : usize,              // n

  buffer      : Vec<AtomicUsize>,   // (positions+seqno)[]
  read_priv   : Vec<usize>,         // positions belong to the reader
  write_tmp   : usize,              // temporary position where the writer writes first
  max_read    : usize,              // reader's last read seqno
}

The new function creates the CircularBuffer:

impl <T : Copy> CircularBuffer<T> {
  fn new(size : usize, default_value : T) -> CircularBuffer<T> {

    if size == 0 { panic!("size cannot be zero"); }

    let mut ret = CircularBuffer {
      seqno      : AtomicUsize::new(0),
      data       : vec![],
      size       : size,
      buffer     : vec![],
      read_priv  : vec![],
      write_tmp  : 0,
      max_read   : 0,
    };

    // make sure there is enough place and fill it with the
    // default value
    ret.data.resize((size*2)+1, default_value);

    for i in 0..size {
      ret.buffer.push(AtomicUsize::new((1+i) << 16));
      ret.read_priv.push(1+size+i);
    }

    ret
  }
}

Writer

The data vector holds 2n+1 preallocated items. n items belong to the reader and n+1 items belong to the writer. The ownership of who owns which elements are tracked by the buffer, read_priv and write_tmp members. The buffer vector represents the CircularBuffer where each element is composed of 16 bits of the seqno and the rest is a position to the data vector. The write_tmp element is also a position referring to the data vector. When the writer writes a new element:

  • it writes to the data element pointed by write_tmp
  • than it calculates seqno modulo size, which is the position in buffer which is going to be updated (new_pos)
  • than buffer[new_pos] will be updated to hold (write_tmp << 16) + (seqno % 0xffff)
  • finally write_tmp will be updated to the previous value of buffer[old_pos] >> 16
  • (basically the positions of write_tmp and buffer will be swapped)

This design allows the writer to always write to a private area that is not touched by the reader and then it atomically swaps the buffer[new_pos] element over to the freshly written element. This allows writing without interfering with the reader.

impl <T : Copy> CircularBuffer<T> {
  fn put<F>(&mut self, setter: F) -> usize
    where F : FnMut(&mut T)
  {
    let mut setter = setter;

    // get a reference to the data
    let mut opt : Option<&mut T> = self.data.get_mut(self.write_tmp);

    // write the data to the temporary writer buffer
    match opt.as_mut() {
      Some(v) => setter(v),
      None    => { panic!("write tmp pos is out of bounds {}", self.write_tmp); }
    }

    // calculate writer flag position
    let seqno  = self.seqno.load(Ordering::SeqCst);
    let pos    = seqno % self.size;

    // get a reference to the writer flag
    match self.buffer.get_mut(pos) {
      Some(v) => {
        let mut old_flag : usize = (*v).load(Ordering::SeqCst);
        let mut old_pos  : usize = old_flag >> 16;
        let new_flag     : usize = (self.write_tmp << 16) + (seqno & 0xffff);

        loop {
          let result = (*v).compare_and_swap(old_flag,
                                             new_flag,
                                             Ordering::SeqCst);
          if result == old_flag {
            self.write_tmp = old_pos;
            break;
          } else {
            old_flag = result;
            old_pos  = old_flag >> 16;
          };
        };
      },
      None => { panic!("buffer index is out of bounds {}", pos); }
    }

    // increase sequence number
    self.seqno.fetch_add(1, Ordering::SeqCst)
  }
}

Reader

To read data one needs to obtain an iterator through the iter() function. This loops through the buffer in reverse order and atomically swaps the reader’s own positions held by the read_priv vector with the position part of the buffer component. While looping it checks that the sequence number part of the buffer entry is the expected one. If not then it knows that the writer has flipped over, so the given element should be returned during the next iteration.

The result of this operation is that read_priv vector holds the pointers to the previously written elements and the reader gave its own elements to the writer in exchange, so the writer can write those, while the reader works with its own copies. The iter() function is implemented like this:

impl <T : Copy> CircularBuffer<T> {
  fn iter(&mut self) -> CircularBufferIterator<T> {
    let mut seqno : usize = self.seqno.load(Ordering::SeqCst);
    let mut count : usize = 0;
    let max_read : usize = self.max_read;
    self.max_read = seqno;

    loop {
      if count >= self.size || seqno <= max_read || seqno == 0 { break; }
      let pos = (seqno-1) % self.size;

      match self.read_priv.get_mut(count) {
        Some(r) => {
          match self.buffer.get_mut(pos) {
            Some(v) => {
              let old_flag : usize = (*v).load(Ordering::SeqCst);
              let old_pos  : usize = old_flag >> 16;
              let old_seq  : usize = old_flag & 0xffff;
              let new_flag : usize = (*r << 16) + (old_seq & 0xffff);

              if old_flag == (*v).compare_and_swap(old_flag, new_flag, Ordering::SeqCst) {
                *r = old_pos;
                seqno -=1;
                count += 1;
              } else {
                break;
              }
            },
            None => { panic!("buffer index is out of bounds {}", pos); }
          }
        },
        None => { panic!("read_priv index is out of bounds {}", count); }
      }
    }

    CircularBufferIterator {
      data    : self.data.as_slice(),
      revpos  : self.read_priv.as_slice(),
      count   : count,
    }
  }
}

The CircularBufferIterator is:

struct CircularBufferIterator<'a, T: 'a + Copy> {
  data   : &'a [T],
  revpos : &'a [usize],
  count  : usize,
}

The revpos slice is created from the reader’s read_priv vector. It holds the pointers to the data that the reader can safely read. The iterator trait iterates through the revpos slice in reverse order:

impl <'_, T: '_ + Copy> Iterator for CircularBufferIterator<'_, T> {
  type Item = T;

  fn next(&mut self) -> Option<T> {
    if self.count > 0 {
      self.count -= 1;
      let pos : usize = self.revpos[self.count];
      Some(self.data[pos])
    } else {
      None
    }
  }
}

Notes

I call the underlying data structure CircularBuffer because of the way writer wraps over at the end, when reaches the final element in the buffer. This may be confusing that the underlying implementation is not a linked list.

The reader and the writer are both using the buffer vector. The reader tries to convince the writer to use its location while the reader processes the data previously written by the writer. To minimize contention the writer goes through the buffer in forward order and the reader works in the backward order.

To make this usable in real multithreaded programs I will need to dig into the Send+Sync realm. This will be the topic of my next post. May be I create my first crate afterwards?

Rust version

$ rustc --version
rustc 1.8.0 (db2939409 2016-04-11)

Git repo

I opened a github repo for this experiment series. The source code of this experiment is here.

Episodes of this series

  1. Closures
  2. Iterator
  3. Yet Another Lock-Free Queue
  4. Sharing My Queue Between Threads