In last week’s post, I outlined some things I wanted to do with my rebooted fractals program in Elixir. I’ve successfully rebooted it, and it’s coming along adequately.
This week, I’m going to talk mostly about the output process in my app.
I have one significant update from last week. I said that I was going to send around chunked data in a three-tuple: {chunk_number, options, data}
. I dropped the options
. The processes are going to have this as part of their state. Look for more on this next week or later.
My mandelbrot
repo is on GitHub, and I’ve tagged the code as it existed while I wrote this article:
$ git clone https://github.com/jdfrens/mandelbrot.git
$ git checkout tags/blog_2016_05_04 -b whatever_you_want
$ cd mandelbrot/elixir
You can run it:
$ mix deps.get
$ mix spec
$ mix escript.build && fractals ../json/burningship-line-blue.json blb.ppm
$ convert blb.ppm blb.png # from ImageMagick
$ open blb.png
The output is just a couple of blue and red gradients blended together.
You can also peruse the code through your browser: https://github.com/jdfrens/mandelbrot/tree/blog_2016_05_04/elixir
.
The main problem I wanted to solve with my OutputWorker
(code) was unordered chunks. I wrote last week about sending chunked data from one process to the next. Each processing stage can have its own pool of workers, and some chunks will take longer to process than others. So there’s no guarantee that the output worker will get the chunks in the right output order.
The most naive solution would be to receive all of the chunks, sort them by chunk number, and then write them. But then the output process really just runs synchronously after all the other processes are done. Boring!
The output process could keep track of what chunk number should be output next. Start with chunk number 1, and write that chunk when you get a message for it. Then look for chunk number 2.
However, if chunk number 2 isn’t available, but chunk number 5 is, what do you do? The naive solution would be to just save chunk number 5 in the worker’s state and process the next message.
But now you need special handling for when the worker gets to chunk number 5. Instead of waiting for a message, the worker has to check the cache in the process state. It’s doable, but the code kind of feels messy to me.
Then I realized that I could use pattern matching on the messages. Colloquially, there are two ways messages are “received”: the process’s mailbox can receive a message; the process itself can receive a message. Receiving messages into the mailbox is purely a BEAM consideration, and we programmers have no control over that. But we do have control over how the process receive
s messages.
When a process receive
s a message, it will find the first message that matches a pattern in your receive
block. Not the first message; the first message that matches a pattern. If more than one pattern matches, the first one is triggered.
So receive
will go through the chunk messages in the mailbox and find the right chunk number if only I match on it:
def server(chunk_number, options) do
receive do
{:write, {^chunk_number, data}} ->
write_chunk(chunk_number, data, options)
server(chunk_number + 1, options)
end
end
chunk_number
is the current chunk number that the process wants. I pattern match on that value (the pinned ^chunk_number
). If that chunk number is available in the mailbox, its message is received by this code and processed. If no such message is available, this code waits until the current chunk number comes in, even if other chunks are available. The other chunks will be processed when their chunk number comes up.
Process mailbox as a priority queue!
Did you cringe a bit when I said that the output worker started with chunk number 1? Did you think it was a typo? Have you been programmed for zero-based indexing? I know I am, and it makes me a bit uncomfortable to start with 1.
I started with zero-based indexing, but then the 0 output became a special case:
def server(0, options) do
write_header(options)
server(1, options)
end
This version of server
writes the PPM header (my code, wikipedia). It needs to be the first thing in the file. I’m going to sit on this smell for a little while.
Okay. Back to the output worker…
GenServer
So I’m not writing a GenServer
(docs, getting started). The output worker is just a little old process spawn
ed by myself.
I started to code it up as a GenServer
. It was going pretty well until I got to the match-the-chunk-number-from-the-message trick. I wrote a handle_cast
function, pattern matching on the chunk number from the message and the process’s state. It failed. The server always processed the messages in the order they were received, and then sent them to my handle_cast
call. By the time the chunk gets passed to handle_cast
, the message is out of the mailbox, and I have to process it. And so an out-of-order message would mismatch and cause an error.
At least that’s what I think happened. I experimented enough that it’s a satisfying hypothesis, but I couldn’t find any documentation that confirmed it for me.
I really didn’t want to cache the out-of-order chunks, and re-queuing the chunks seemed… too clever. I might try either one of these in the future, but I decided I wanted to play around with some low-level Elixir code anyway.
spawn
, send
, and receive
solutionYou can read the full code for the OutputWorker
online.
def start_link(options) do
pid = spawn_link(__MODULE__, :server, [0, options])
Process.register(pid, __MODULE__)
{:ok, pid}
end
I use spawn_link
because it has a supervisor.
The 0
is the chunk number, and as I mentioned above, it’s to trigger the PPM header into the output file.
Since other processes need to send chunks to it, I explicitly register the process under its module’s name.
I return {:ok, pid}
to mimic a GenServer
. I thought I was just being clever, but it turns out that this is necessary for the child of a Supervisor
.
def write(pid, chunk) do
send pid, {:write, chunk}
end
This is the “client API” (just like a typical GenServer
). It sends a message to the spawned process to write the chunk.
Actually, we saw receiving up above. Check it out there.
server
has one more version we didn’t see yet.
def server(number, %Options{chunk_count: count} = options) when number == count+1 do
notify_next_pid(options, {:done, self})
server(number + 1, options)
end
This is the second version of server
. It wraps things up when the worker has completed the last chunk. Or, well, it tells another process that it’s done. The server doesn’t actually die. I call server
one more time with a chunk number that it will never receive in a message. I found that if I didn’t call server
and let the process die, the supervisor would restart it (as it’s supposed to). The supervisor would end up restarting the worker several times because the write_header
function would always fail because the output handle was already closed. I’m certain if used another supervisor strategy (like :transient
) and if my worker returned the right kind of tuple, things would be okay. It wasn’t a problem I wanted to figure out (at least for now), and it seemed wrong to let this process die when the other processes were still alive.
This version also makes use of the “next pid” in the options
. This pid is either the testing process or the CLI.
Without the “next pid”, the tests ran too fast for the output, and so all of the actual results were empty (i.e., no output) because the assertions were done before the output was written. If I put a sleep into the test examples, the tests passed, but sleeping is a bit fragile. So the tests wait for the :done
message before they assert anything:
defp expect_output(subject, output_pid, output) do
receive do
{:done, ^subject} ->
expect(StringIO.contents(output_pid)) |> to(eq({"", output}))
after
1000 ->
{"", actual} = StringIO.contents(output_pid)
flunk "spec received no output-done message, expected: #{output}, got: #{actual}"
end
end
I flunk the assertion if it takes too long to get the :done
message. This may be fragile, too, but I can always set that to a more ridiculous amount which won’t affect passing specs.
I realized that the “next pid” idea would also be useful for the CLI. Without that :done
notification, the CLI would die immediately after starting up the processes. Now it waits:
def waiting_loop(options) do
receive do
{:writing, chunk_number} ->
IO.puts "writing #{chunk_number} of #{options.chunk_count}"
waiting_loop(options)
{:done, _} ->
Options.close_output_file(options)
end
end
I also extended the idea so that the workers could send updates to the CLI through the “next pid”, and the CLI could display feedback. Originally, I just IO.puts
messages in the OutputWorker
, but that cluttered up the test output. Now the tests get to ignore the progress updates.
However, I’m not happy with the name “next pid”. It’s not a good name at all. How exactly is it the next pid? It’s actually the pid of an old process given to this new output worker. Maybe you could say that it’s the pid of the next process that should take over when the output is done, but that’s not all it’s being used for now. There’s got to be something better: “notification pid”, “update pid”, “observer pid”, … This feels like a name that I need to sleep on, so I’ll change it later.
Three helper functions close out the module:
def write_header(options) do
PPM.p3_header(options.size.width, options.size.height)
|> Enum.each(&(IO.puts(options.output_pid, &1)))
end
def write_chunk(chunk_number, data, options) do
notify_next_pid(options, {:writing, chunk_number})
Enum.each(data, &(IO.puts(options.output_pid, &1)))
end
def notify_next_pid(options, message) do
send options.next_pid, message
end
I already see some refactorings that I’d like to do with this code, but it’s pretty straightforward in this form. My thinking is that I want to remove the business logic from the server
function, and I accomplished that.
When I originally planned this article, I had a mostly working output worker that made me mostly happy. Then I figured I should actually battle test the worker a bit to see if it would, in fact, fit in with the other processes. So I ended up spending a good deal of time struggling with supervisors and supervisor strategies and tasks and spawned processes and fractal options and chunked data and a command-line interface.
So that’s what I’ll talk about next week: the supervisory tree, the CLI, those pesky fractal options, and chunked data (including the “missing” options).
Wait… no footnotes? I must have done something wrong.1
Phew. I saved myself. ↩