Concurrency in Elixir and Ruby

by Jeremy D. Frens on May 22, 2016
part of the Ruby, Elixir, Variable Bindings, and Concurrency series


I ended my previous article on why side effects matter with an honest-to-flying-spaghetti-monster race condition in Ruby. That example made the article a little longer than I had expected, so I put off the ending for another article. This article.

Previously on “Programming During Recess”…

We had an Inc class in the previous article. It implemented a simple incrementing counter. As I played around with that example for this article, I realized that Inc was a really dumb name. So here’s the Inc class renamed Counter:

class Counter
  def initialize(value) ; @value = value ; end
  def value             ; @value         ; end
  def inc!              ; @value += 1    ; end
end

counter = Counter.new(0)
t1 = Thread.new { 10.times { counter.inc! } }
t2 = Thread.new { 10.times { counter.inc! } }
t1.join
t2.join
counter.value # => ???

I also took out the sleep 0 that we needed to increase the risk of a race condition. If you want to try running this code for yourself and see the race conditions, replace the definition of #inc! with this:

def inc!
  v = @value
  sleep 0
  @value = v + 1
end

counter.value comes out as 10 and 20 and pretty much any number in between.

Atomic operations

The problem with counter.inc! is that it needs to read, compute, and then write. Worse yet, reading and writing is done on an instance variable which is kept in a hash table, so there’s actually quite a lot being done in #inc!, too much for Ruby to treat naturally as one operation.

We need some way to tell Ruby to treat #inc! as an atomic operation. Don’t think chemistry and physics here, think Greek: atomos means “indivisible”. The Ruby VM needs to treat the read-compute-write as an indivisible operation that can’t be interrupted by another process.

Usually this is done with a lock of some sort. Ruby does not provide any language-level locking mechanisms; we find them in the standard library.

Mutex

We can wrap the increment in a mutex:

mutex = Mutex.new
counter = Counter.new(0)
t1 = Thread.new { 10.times { mutex.synchronize { counter.inc! } } }
t2 = Thread.new { 10.times { mutex.synchronize { counter.inc! } } }
# ...

Mutex implements a simple locking mechanism for concurrency. When one thread calls #synchronize on mutex, it locks the mutex and executes the block. If the mutex is already locked (by another process), it waits until it is unlocked, then it gets the lock and can execute. When the block is done, the mutex is unlocked.1 This means that counter.inc! will run as an atomic operation, and the race condition is gone!

However, this is a terrible design. We’ll have to pass mutex along with counter, and we’ll have to remember to use the mutex whenever we call counter.inc!.

Since the mutex and the counter seem to be an inseparable pair, maybe the mutex should go into the Counter class. You can certainly do that, but it gets a bit messy (because it breaks the Single Responsibility Principle). Also, a Counter might not be shared across threads and so doesn’t need a mutex. A mutex is not that expensive, but it’s also not free. So we’d like to have a separate Counter without a mutex.

The solution, then, is to implement a new class to handle the mutex:

class Counter
  def initialize(value) ; @value = value ; end
  def value ; @value      ; end
  def inc!  ; @value += 1 ; end
end

class CounterWithMutex
  def initialize(value)
    @mutex = Mutex.new
    @counter = Counter.new(value)
  end
  def value ; @counter.value ; end
  def inc!
    @mutex.synchronize { @counter.inc! }
  end
end

counter = CounterWithMutex.new(0)
t1 = Thread.new { 10.times { counter.inc! } }
t2 = Thread.new { 10.times { counter.inc! } }
t1.join
t2.join
counter.value # => ???

A few things to notice:

  • CounterWithMutex could probably be made more general and simpler with some metaprogramming.2
  • The mutex is not used in CounterWithMutex#value. That method only reads the value, so there’s no danger of a race condition.3
  • Each counter has their own mutex. You could have fifty counters, and each one would be locked independently of the others.

Redis, an external solution

What if you need a shared counter across multiple servers? It’s easy to share instances of Ruby objects across threads because they work in shared memory, but each server (or really each Ruby process) has their own memory space.

We can use an external store like Redis.

require 'redis'
REDIS_URL = 'redis://127.0.0.1/' # set this for your Redis server

class Counter
  def initialize(value, value_key:, redis:)
    @value_key = value_key
    @redis = redis
    @redis.setnx(@value_key, value)
  end
  def value
    @redis.get(@value_key)
  end
  def inc!
    @redis.incr(@value_key)
  end
end

redis     = Redis.new(url: REDIS_URL)
value_key = 'counter of love'
counter   = Counter.new(0, value_key: value_key, redis: redis)
t1 = Thread.new { 10.times { counter.inc! } }
t2 = Thread.new { 10.times { counter.inc! } }
t1.join
t2.join
counter.value # => ???

Some things to notice:

  • Change the value of REDIS_URL if you run this for yourself.4
  • We explicitly provide a “value key” for the counter to share it across processes and machines.5
  • The counter is persistent. More on this in a bit.
  • There’s no adapter class to tease out since the counter itself is in Redis.
  • You always have to use Redis.6

The Redis class from the redis gem provides an instance method for each Redis command.7

  • SETNX (used in Counter#initialize) sets the value for the value key if the key does not already exist in Redis. If the key already exists, this instruction does nothing.
  • INCR (used in Counter#inc!) is an increment as an atomic operation.

If we used SET instead of SETNX, then each time the program is run, the counter would get reset back to 0 (the initial value passed in). It seems a bit counterproductive to keep resetting the counter whenever a new process accesses it.

As mentioned above, this implementation makes the counter persistent. Once you’ve run the script, the counter just keeps on collecting increments. It doesn’t go away when the script is done. If you run it ten times in a row, then the counter will be incremented 200 times! Come back two days later (assuming Redis hasn’t crashed disastrously), and the counter will pick up where it left off.

Maybe this persistence works for whatever problem you’re trying to solve. If not, you might need to reset the counter (using SET) or to delete the counter (using DEL).

Redis, the lock

Our contrived example is misleading because the atomic operation is just a simple increment. What if the computation were more involved and complicated?

Regardless, the value still needs to be saved in Redis (or in another database) so that we can shared it among Ruby processes. Redis has a hash datatype with hash operations, and that usually works for most serialization needs.

But there’s no “read this hash from Redis, transform the hash in Ruby, and write the new hash back to Redis” instruction in Redis. You can execute Redis commands in a transaction, but that doesn’t help with any Ruby computations you need to do. You can write Lua code to be executed on the Redis server within a transaction, but that introduces a whole new language to learn.

Or you could use Redis to implement a mutex, and leave the computation in Ruby.

require 'redis'
require 'redis-semaphore'

class Counter
  def initialize(value, value_key:, redis:)
    @value_key = value_key
    @redis     = redis
    @semaphore.lock do
      unless @redis.exists(@value_key)
        @redis.set(@value_key, value)
      end
    end
  end
  def value
    @redis.get(@value_key)
  end
  def inc!
    @semaphore.lock do
      @redis.set(@value_key, @redis.get(@value_key) + 1)
    end
  end
  def semaphore
    @semaphore ||= Redis::Semaphore.new(semaphore_key, redis: @redis)
  end
  def semaphore_key
    'semaphore::' + @value_key
  end
end

The redis-semaphore gem implements a full semaphore solution that can be used as a simple mutex.

  • Redis::Semaphore#lock is the special method for synchronization.
  • semaphore_key is the name of the semaphore in Redis.
  • value_key is still the key for the value itself.

I’m not exactly happy with the OO design of this Counter. Three responsibilities are wrapped up in this class: maintaining and using the semaphore, storing the value, and computing in Ruby.

So Ruby ain’t so bad?

So, obviously, you can do concurrency in Ruby. It might not even be that awful. But all of these solutions are libraries and patterns and external systems that aren’t normally part of Ruby. Sure, Mutex is in the standard library, but how many times have you seen it before today? Have you ever used it? If an interview candidate for a Ruby position came in and said they knew nothing about Mutex, would you hire them? I knew the class existed, but I never wrote any code with it until I wrote this article.

All of these solutions put a heavy burden on the programmer. You have to worry about every variable that captured in a thread: does it or something in it get side effected? if so, is the side effect atomic to avoid race conditions?

Elixir8 doesn’t make all of these problems go away. It just incorporates them into the language itself. While I wouldn’t expect a senior Ruby dev to know about Mutex, I would expect a normal Elixir dev (maybe even an associate or junior dev) to know the difference between Task and Agent.

The Elixir solution

defmodule Counter do
  def start_link(val) do
    Agent.start_link(fn -> val end)
  end
  def value(counter) do
    Agent.get(counter, fn val -> val end)
  end
  def inc(counter) do
    Agent.update(counter, fn val -> val+1 end)
  end
end

{:ok, counter} = Counter.start_link(0)
t1 = Task.async(fn -> Enum.each(1..10, fn _ -> Counter.inc(counter) end) end)
t2 = Task.async(fn -> Enum.each(1..10, fn _ -> Counter.inc(counter) end) end)
Task.await(t1)
Task.await(t2)
Counter.value(counter) # => ???

We’re using two very simple tools in the standard tool belt for concurrency in Elixir. An Agent is a process that keeps track of a state. You can put whatever you want into the state; you just have to maintain it as you use the agent. A Task is a process that completes a computation.

Agent.start_link takes a thunk9; the thunk’s return value is the initial state of the agent. Notice how the thunk captures the val passed in as a parameter to Counter.start_link. Agent.start_link returns a tuple in the form {:ok, agent} (or {:error, reason} if there was a problem). So counter in the main script is a pid (process id) for the process running the agent.

Task.async runs its thunk in a separate process.

Counter.inc! receives the pid of the agent, and executes Agent.update. The update function receives the current state as a parameter, val in this case; the function then returns the new state, val+1.

Task.await waits for the task to finish.

Counter.value uses Agent.get to fetch the value from the state. Since the value is the state, we pass the identity function to Agent.get.

And it works.

This Elixir solution does have the drawback that you’re always using a separate process for the counter, but Elixir processes are so lightweight that I don’t consider this to be a significant overhead.

If you want to share the counter on multiple nodes, it’s not as automatic as with Redis. You have to name the nodes, connect them up, and then share the counter between them. None of this is hard, and it’s less work than setting up Redis. The upside is that we have an all-Elixir solution.

I am a lousy, stinking liar

Confession: there’s a side effect in that Elixir code. Counter.inc changes the state in the agent process. That’s a side effect. Didn’t I start this series of articles by essentially declaring that side effects are evil? If I didn’t say that exactly, I meant to.

The key is to control side effects. Elixir (and Erlang) don’t let you reassign values to existing variables. Allowing a simple reassignment opens up too many problems, and it’s so easy to program without them.

But state can be useful. Very useful. At some point, your processes have to share data somehow. Agent implements a state shared across multiple processes. It maintains a state that can change over time.

The control here is that the change is explicit. You have to send a message (e.g., Agent.update) to that other process in order for it to change its state. And when you receive data from that process (e.g., Agent.get), you know full well that it might be a different value now than it was two minutes ago. You expect it to change.

Elixir processes are awesome

As I just mentioned, communication with a process is explicit. You can’t really hide it. If a library is going to create a process, it should tell you about that process. And it should tell you when and how that process is being used by what functions. Or it should hide the process so well that you don’t need to know. I programmed in Elixir for months before I realized that IO.puts actually sends messages to an output process that it shares across all processes on a single node. IO.puts just worked, so I didn’t think about it.

Elixir processes are also isolated from each other, and it goes beyond keeping them away from each others’ variables. Each process is given its own chunk of memory. Garbage collection for one process does not affect another. If one process crashes, it does not cause others to crash.10

If you raise an error instead of counter.inc! in one of the Ruby threads, the whole program crashes. You lose any (unsaved) work they’ve done up to that point, and you have to start them all over again. Sure, we could rescue errors and stop the crash, but that’s more work we have to do.

Finally, atomicity is free with Elixir processes. A process is single threaded, and it controls when it receives messages from other processes. This is why Agent#get_and_update exists. You provide it a function that will receive the state, and it returns a tuple with two elements: the value to return (the “get”) and the new state (the “update”). You’re automatically guaranteed that the function will run atomically because Agent doesn’t process any message from other processes until the get-and-update function is done.

Mindset matters

I’m moderately sure that someone could come up with a reasonable way to do concurrency in Ruby. Maybe someone has, and it just needs to go mainstream. But I suspect it will always lack something that Elixir provides. (In particular, I’m thinking that Ruby will never get true process isolation. Not without radically changing the language or Ruby runtime.)

I worry also about the timetable. We have multiple cores now; we’ve had them for quiet a while, in fact. We have large problems we want to solve. We have to solve concurrency. It’s the only meaningful way for computing to progress. And Ruby seems to be lagging behind (but so are many other languages).

More importantly, Rubyists aren’t taught to code with concurrency in mind. It’s bad enough that too many Rubyists are actually Railsists. But concurrency solutions tend to be “run another server” or “use Sidekiq”. Now, don’t get me wrong, when it comes to Ruby, I’m stuck in this mindset.

But concurrency is never an afterthought with Elixir. It’s fundamental to the language. I am amused by the number of times I’ve seen the question “What the best Elixir equivalent to Sidekiq?” Um… Elixir itself? That’s not entirely a fair response; Sidekiq handles some bookkeeping that Elixir does not handle automatically (queues, limited number of concurrent processes, etc.). But there are existing (nearly fundamental) solutions for those problems in Elixir. It’s much easier to cook up the solution that you need.

But there’s a more subtle problem with a “replace Sidekiq” mindset. Sidekiq is usually invoked for exceptional circumstances: interact with this third-party API, send this email, do some maintenance, etc. With Elixir, you should think about doing things concurrently all of the time. Always send email in a separate process. If you have two or more independent queries to a database, run each of them in their own process.

Denouement

So I started this series of articles based on the scoping that Elixir uses to not implement reassignments. As I wrote the initial article (yes, it was just one article at first), I realized I wanted to put this quirk into perspective that it makes concurrency better. As I started to make certain claims about code I had written, I realized that I should probably justify those claims. So we ended up with four articles where the scoping quirk is just one of the articles, one article sets up the quirk, and the last two are rants about how Elixir is so much more awesome than Ruby.

I’m more convinced than ever that Elixir can and should play an important role for us in the future. I find it very telling that I would expect a normal software engineer to be able to write the Counter module in Elixir while I would not expect even a senior Ruby engineer to know off the top of their heads how to use a mutex. If concurrency is our future, we need the mentality that Elixir encourages. Ruby has to put together some good solutions and make them part of the Ruby mindset.

Footnotes

  1. Ruby also provides a MonitorMixin for a different kind of lock. To learn more about the different kinds of locks, I recommend these Wikipedia articles: mutual exclusion (especially “Types of mutual exclusion devices”), monitor, reentrant mutex, and semaphore. The basic idea is always the same: lock other processes out of the resource I want to modify so that we avoid race conditions.

  2. CounterWithMutex could be made more general, but don’t. We don’t have enough uses of it yet, and more importantly my example is terribly contrived. Build out a few of these WithMutex adapter classes to find the actual, useful patterns. That said, a judicious use of delegate or Forwardable would be welcome if the example were any more complicated.

  3. If you did lock the mutex in #value, you could argue that you’re getting the freshest value possible. If another process is updating the value, you’ll just wait until its done, and that shouldn’t take too long, right? But is the freshest value really necessary? What if fifty other processes are waiting for the lock to increment the value before your simple read? Do you want your process to wait for all of them to finish just to get the freshest value?

  4. See “Getting started” in the Redis README.

  5. I’ve done a terrible job of namespacing the key. You might not be the only one using the Redis server, so you might consider using multiple databases or a namespace prefix that you add explicitly to the key (see also the redis-namespace gem).

  6. In this case, I would argue that Redis is really fast, so the extra overhead isn’t awful. However, you can’t run this script without also running Redis. That means keeping Redis configured and maintained properly.

  7. I would like to nominate Redis for having the best documentation ever. It’s comprehensive, clear, succinct, and very searchable. We’re using two commands:

  8. I’m going to keep my article focused on Elixir, but when I make sweeping generalizations about Elixir, then it probably applies to Erlang and the BEAM (the Erlang virtual machine) as well.

  9. A thunk is a function with no arguments.

  10. You can link processes and organize them with supervisors, to handle crashed processes. This is all beyond the scope of this article.

elixir ruby concurrency