I ended my previous article on why side effects matter with an honest-to-flying-spaghetti-monster race condition in Ruby. That example made the article a little longer than I had expected, so I put off the ending for another article. This article.
We had an Inc
class in the previous article. It implemented a simple incrementing counter. As I played around with that example for this article, I realized that Inc
was a really dumb name. So here’s the Inc
class renamed Counter
:
class Counter
def initialize(value) ; @value = value ; end
def value ; @value ; end
def inc! ; @value += 1 ; end
end
counter = Counter.new(0)
t1 = Thread.new { 10.times { counter.inc! } }
t2 = Thread.new { 10.times { counter.inc! } }
t1.join
t2.join
counter.value # => ???
I also took out the sleep 0
that we needed to increase the risk of a race condition. If you want to try running this code for yourself and see the race conditions, replace the definition of #inc!
with this:
def inc!
v = @value
sleep 0
@value = v + 1
end
counter.value
comes out as 10 and 20 and pretty much any number in between.
The problem with counter.inc!
is that it needs to read, compute, and then write. Worse yet, reading and writing is done on an instance variable which is kept in a hash table, so there’s actually quite a lot being done in #inc!
, too much for Ruby to treat naturally as one operation.
We need some way to tell Ruby to treat #inc!
as an atomic operation. Don’t think chemistry and physics here, think Greek: atomos means “indivisible”. The Ruby VM needs to treat the read-compute-write as an indivisible operation that can’t be interrupted by another process.
Usually this is done with a lock of some sort. Ruby does not provide any language-level locking mechanisms; we find them in the standard library.
We can wrap the increment in a mutex:
mutex = Mutex.new
counter = Counter.new(0)
t1 = Thread.new { 10.times { mutex.synchronize { counter.inc! } } }
t2 = Thread.new { 10.times { mutex.synchronize { counter.inc! } } }
# ...
Mutex
implements a simple locking mechanism for concurrency. When one thread calls #synchronize
on mutex
, it locks the mutex and executes the block. If the mutex is already locked (by another process), it waits until it is unlocked, then it gets the lock and can execute. When the block is done, the mutex is unlocked.1 This means that counter.inc!
will run as an atomic operation, and the race condition is gone!
However, this is a terrible design. We’ll have to pass mutex
along with counter
, and we’ll have to remember to use the mutex
whenever we call counter.inc!
.
Since the mutex and the counter seem to be an inseparable pair, maybe the mutex should go into the Counter
class. You can certainly do that, but it gets a bit messy (because it breaks the Single Responsibility Principle). Also, a Counter
might not be shared across threads and so doesn’t need a mutex. A mutex is not that expensive, but it’s also not free. So we’d like to have a separate Counter
without a mutex.
The solution, then, is to implement a new class to handle the mutex:
class Counter
def initialize(value) ; @value = value ; end
def value ; @value ; end
def inc! ; @value += 1 ; end
end
class CounterWithMutex
def initialize(value)
@mutex = Mutex.new
@counter = Counter.new(value)
end
def value ; @counter.value ; end
def inc!
@mutex.synchronize { @counter.inc! }
end
end
counter = CounterWithMutex.new(0)
t1 = Thread.new { 10.times { counter.inc! } }
t2 = Thread.new { 10.times { counter.inc! } }
t1.join
t2.join
counter.value # => ???
A few things to notice:
CounterWithMutex
could probably be made more general and simpler with some metaprogramming.2CounterWithMutex#value
. That method only reads the value, so there’s no danger of a race condition.3What if you need a shared counter across multiple servers? It’s easy to share instances of Ruby objects across threads because they work in shared memory, but each server (or really each Ruby process) has their own memory space.
We can use an external store like Redis.
require 'redis'
REDIS_URL = 'redis://127.0.0.1/' # set this for your Redis server
class Counter
def initialize(value, value_key:, redis:)
@value_key = value_key
@redis = redis
@redis.setnx(@value_key, value)
end
def value
@redis.get(@value_key)
end
def inc!
@redis.incr(@value_key)
end
end
redis = Redis.new(url: REDIS_URL)
value_key = 'counter of love'
counter = Counter.new(0, value_key: value_key, redis: redis)
t1 = Thread.new { 10.times { counter.inc! } }
t2 = Thread.new { 10.times { counter.inc! } }
t1.join
t2.join
counter.value # => ???
Some things to notice:
REDIS_URL
if you run this for yourself.4The Redis
class from the redis
gem provides an instance method for each Redis command.7
SETNX
(used in Counter#initialize
) sets the value for the value key if the key does not already exist in Redis. If the key already exists, this instruction does nothing.INCR
(used in Counter#inc!
) is an increment as an atomic operation.If we used SET
instead of SETNX
, then each time the program is run, the counter would get reset back to 0 (the initial value passed in). It seems a bit counterproductive to keep resetting the counter whenever a new process accesses it.
As mentioned above, this implementation makes the counter persistent. Once you’ve run the script, the counter just keeps on collecting increments. It doesn’t go away when the script is done. If you run it ten times in a row, then the counter will be incremented 200 times! Come back two days later (assuming Redis hasn’t crashed disastrously), and the counter will pick up where it left off.
Maybe this persistence works for whatever problem you’re trying to solve. If not, you might need to reset the counter (using SET
) or to delete the counter (using DEL
).
Our contrived example is misleading because the atomic operation is just a simple increment. What if the computation were more involved and complicated?
Regardless, the value still needs to be saved in Redis (or in another database) so that we can shared it among Ruby processes. Redis has a hash datatype with hash operations, and that usually works for most serialization needs.
But there’s no “read this hash from Redis, transform the hash in Ruby, and write the new hash back to Redis” instruction in Redis. You can execute Redis commands in a transaction, but that doesn’t help with any Ruby computations you need to do. You can write Lua code to be executed on the Redis server within a transaction, but that introduces a whole new language to learn.
Or you could use Redis to implement a mutex, and leave the computation in Ruby.
require 'redis'
require 'redis-semaphore'
class Counter
def initialize(value, value_key:, redis:)
@value_key = value_key
@redis = redis
@semaphore.lock do
unless @redis.exists(@value_key)
@redis.set(@value_key, value)
end
end
end
def value
@redis.get(@value_key)
end
def inc!
@semaphore.lock do
@redis.set(@value_key, @redis.get(@value_key) + 1)
end
end
def semaphore
@semaphore ||= Redis::Semaphore.new(semaphore_key, redis: @redis)
end
def semaphore_key
'semaphore::' + @value_key
end
end
The redis-semaphore
gem implements a full semaphore solution that can be used as a simple mutex.
Redis::Semaphore#lock
is the special method for synchronization.semaphore_key
is the name of the semaphore in Redis.value_key
is still the key for the value itself.I’m not exactly happy with the OO design of this Counter
. Three responsibilities are wrapped up in this class: maintaining and using the semaphore, storing the value, and computing in Ruby.
So, obviously, you can do concurrency in Ruby. It might not even be that awful. But all of these solutions are libraries and patterns and external systems that aren’t normally part of Ruby. Sure, Mutex
is in the standard library, but how many times have you seen it before today? Have you ever used it? If an interview candidate for a Ruby position came in and said they knew nothing about Mutex
, would you hire them? I knew the class existed, but I never wrote any code with it until I wrote this article.
All of these solutions put a heavy burden on the programmer. You have to worry about every variable that captured in a thread: does it or something in it get side effected? if so, is the side effect atomic to avoid race conditions?
Elixir8 doesn’t make all of these problems go away. It just incorporates them into the language itself. While I wouldn’t expect a senior Ruby dev to know about Mutex
, I would expect a normal Elixir dev (maybe even an associate or junior dev) to know the difference between Task
and Agent
.
defmodule Counter do
def start_link(val) do
Agent.start_link(fn -> val end)
end
def value(counter) do
Agent.get(counter, fn val -> val end)
end
def inc(counter) do
Agent.update(counter, fn val -> val+1 end)
end
end
{:ok, counter} = Counter.start_link(0)
t1 = Task.async(fn -> Enum.each(1..10, fn _ -> Counter.inc(counter) end) end)
t2 = Task.async(fn -> Enum.each(1..10, fn _ -> Counter.inc(counter) end) end)
Task.await(t1)
Task.await(t2)
Counter.value(counter) # => ???
We’re using two very simple tools in the standard tool belt for concurrency in Elixir. An Agent
is a process that keeps track of a state. You can put whatever you want into the state; you just have to maintain it as you use the agent. A Task
is a process that completes a computation.
Agent.start_link
takes a thunk9; the thunk’s return value is the initial state of the agent. Notice how the thunk captures the val
passed in as a parameter to Counter.start_link
. Agent.start_link
returns a tuple in the form {:ok, agent}
(or {:error, reason}
if there was a problem). So counter
in the main script is a pid (process id) for the process running the agent.
Task.async
runs its thunk in a separate process.
Counter.inc!
receives the pid of the agent, and executes Agent.update
. The update function receives the current state as a parameter, val
in this case; the function then returns the new state, val+1
.
Task.await
waits for the task to finish.
Counter.value
uses Agent.get
to fetch the value from the state. Since the value is the state, we pass the identity function to Agent.get
.
And it works.
This Elixir solution does have the drawback that you’re always using a separate process for the counter, but Elixir processes are so lightweight that I don’t consider this to be a significant overhead.
If you want to share the counter on multiple nodes, it’s not as automatic as with Redis. You have to name the nodes, connect them up, and then share the counter between them. None of this is hard, and it’s less work than setting up Redis. The upside is that we have an all-Elixir solution.
Confession: there’s a side effect in that Elixir code. Counter.inc
changes the state in the agent process. That’s a side effect. Didn’t I start this series of articles by essentially declaring that side effects are evil? If I didn’t say that exactly, I meant to.
The key is to control side effects. Elixir (and Erlang) don’t let you reassign values to existing variables. Allowing a simple reassignment opens up too many problems, and it’s so easy to program without them.
But state can be useful. Very useful. At some point, your processes have to share data somehow. Agent
implements a state shared across multiple processes. It maintains a state that can change over time.
The control here is that the change is explicit. You have to send a message (e.g., Agent.update
) to that other process in order for it to change its state. And when you receive data from that process (e.g., Agent.get
), you know full well that it might be a different value now than it was two minutes ago. You expect it to change.
As I just mentioned, communication with a process is explicit. You can’t really hide it. If a library is going to create a process, it should tell you about that process. And it should tell you when and how that process is being used by what functions. Or it should hide the process so well that you don’t need to know. I programmed in Elixir for months before I realized that IO.puts
actually sends messages to an output process that it shares across all processes on a single node. IO.puts
just worked, so I didn’t think about it.
Elixir processes are also isolated from each other, and it goes beyond keeping them away from each others’ variables. Each process is given its own chunk of memory. Garbage collection for one process does not affect another. If one process crashes, it does not cause others to crash.10
If you raise an error instead of counter.inc!
in one of the Ruby threads, the whole program crashes. You lose any (unsaved) work they’ve done up to that point, and you have to start them all over again. Sure, we could rescue errors and stop the crash, but that’s more work we have to do.
Finally, atomicity is free with Elixir processes. A process is single threaded, and it controls when it receives messages from other processes. This is why Agent#get_and_update
exists. You provide it a function that will receive the state, and it returns a tuple with two elements: the value to return (the “get”) and the new state (the “update”). You’re automatically guaranteed that the function will run atomically because Agent
doesn’t process any message from other processes until the get-and-update function is done.
I’m moderately sure that someone could come up with a reasonable way to do concurrency in Ruby. Maybe someone has, and it just needs to go mainstream. But I suspect it will always lack something that Elixir provides. (In particular, I’m thinking that Ruby will never get true process isolation. Not without radically changing the language or Ruby runtime.)
I worry also about the timetable. We have multiple cores now; we’ve had them for quiet a while, in fact. We have large problems we want to solve. We have to solve concurrency. It’s the only meaningful way for computing to progress. And Ruby seems to be lagging behind (but so are many other languages).
More importantly, Rubyists aren’t taught to code with concurrency in mind. It’s bad enough that too many Rubyists are actually Railsists. But concurrency solutions tend to be “run another server” or “use Sidekiq”. Now, don’t get me wrong, when it comes to Ruby, I’m stuck in this mindset.
But concurrency is never an afterthought with Elixir. It’s fundamental to the language. I am amused by the number of times I’ve seen the question “What the best Elixir equivalent to Sidekiq?” Um… Elixir itself? That’s not entirely a fair response; Sidekiq handles some bookkeeping that Elixir does not handle automatically (queues, limited number of concurrent processes, etc.). But there are existing (nearly fundamental) solutions for those problems in Elixir. It’s much easier to cook up the solution that you need.
But there’s a more subtle problem with a “replace Sidekiq” mindset. Sidekiq is usually invoked for exceptional circumstances: interact with this third-party API, send this email, do some maintenance, etc. With Elixir, you should think about doing things concurrently all of the time. Always send email in a separate process. If you have two or more independent queries to a database, run each of them in their own process.
So I started this series of articles based on the scoping that Elixir uses to not implement reassignments. As I wrote the initial article (yes, it was just one article at first), I realized I wanted to put this quirk into perspective that it makes concurrency better. As I started to make certain claims about code I had written, I realized that I should probably justify those claims. So we ended up with four articles where the scoping quirk is just one of the articles, one article sets up the quirk, and the last two are rants about how Elixir is so much more awesome than Ruby.
I’m more convinced than ever that Elixir can and should play an important role for us in the future. I find it very telling that I would expect a normal software engineer to be able to write the Counter
module in Elixir while I would not expect even a senior Ruby engineer to know off the top of their heads how to use a mutex. If concurrency is our future, we need the mentality that Elixir encourages. Ruby has to put together some good solutions and make them part of the Ruby mindset.
Ruby also provides a MonitorMixin
for a different kind of lock. To learn more about the different kinds of locks, I recommend these Wikipedia articles: mutual exclusion (especially “Types of mutual exclusion devices”), monitor, reentrant mutex, and semaphore. The basic idea is always the same: lock other processes out of the resource I want to modify so that we avoid race conditions. ↩
CounterWithMutex
could be made more general, but don’t. We don’t have enough uses of it yet, and more importantly my example is terribly contrived. Build out a few of these WithMutex
adapter classes to find the actual, useful patterns. That said, a judicious use of delegate
or Forwardable
would be welcome if the example were any more complicated. ↩
If you did lock the mutex in #value
, you could argue that you’re getting the freshest value possible. If another process is updating the value, you’ll just wait until its done, and that shouldn’t take too long, right? But is the freshest value really necessary? What if fifty other processes are waiting for the lock to increment the value before your simple read? Do you want your process to wait for all of them to finish just to get the freshest value? ↩
See “Getting started” in the Redis README. ↩
I’ve done a terrible job of namespacing the key. You might not be the only one using the Redis server, so you might consider using multiple databases or a namespace prefix that you add explicitly to the key (see also the redis-namespace
gem). ↩
In this case, I would argue that Redis is really fast, so the extra overhead isn’t awful. However, you can’t run this script without also running Redis. That means keeping Redis configured and maintained properly. ↩
I would like to nominate Redis for having the best documentation ever. It’s comprehensive, clear, succinct, and very searchable. We’re using two commands: ↩
I’m going to keep my article focused on Elixir, but when I make sweeping generalizations about Elixir, then it probably applies to Erlang and the BEAM (the Erlang virtual machine) as well. ↩
A thunk is a function with no arguments. ↩
You can link processes and organize them with supervisors, to handle crashed processes. This is all beyond the scope of this article. ↩