Output Process for Elixir Fractals

by Jeremy D. Frens on June 5, 2016
part of the Fractals in Elixir series


In last week’s post, I outlined some things I wanted to do with my rebooted fractals program in Elixir. I’ve successfully rebooted it, and it’s coming along adequately.

This week, I’m going to talk mostly about the output process in my app.

Last week

I have one significant update from last week. I said that I was going to send around chunked data in a three-tuple: {chunk_number, options, data}. I dropped the options. The processes are going to have this as part of their state. Look for more on this next week or later.

Check out my code

My mandelbrot repo is on GitHub, and I’ve tagged the code as it existed while I wrote this article:

$ git clone https://github.com/jdfrens/mandelbrot.git
$ git checkout tags/blog_2016_05_04 -b whatever_you_want
$ cd mandelbrot/elixir

You can run it:

$ mix deps.get
$ mix spec
$ mix escript.build && fractals ../json/burningship-line-blue.json blb.ppm
$ convert blb.ppm blb.png   # from ImageMagick
$ open blb.png

The output is just a couple of blue and red gradients blended together.

You can also peruse the code through your browser: https://github.com/jdfrens/mandelbrot/tree/blog_2016_05_04/elixir.

Unordered chunks and my clever idea

The main problem I wanted to solve with my OutputWorker (code) was unordered chunks. I wrote last week about sending chunked data from one process to the next. Each processing stage can have its own pool of workers, and some chunks will take longer to process than others. So there’s no guarantee that the output worker will get the chunks in the right output order.

The most naive solution would be to receive all of the chunks, sort them by chunk number, and then write them. But then the output process really just runs synchronously after all the other processes are done. Boring!

The output process could keep track of what chunk number should be output next. Start with chunk number 1, and write that chunk when you get a message for it. Then look for chunk number 2.

However, if chunk number 2 isn’t available, but chunk number 5 is, what do you do? The naive solution would be to just save chunk number 5 in the worker’s state and process the next message.

But now you need special handling for when the worker gets to chunk number 5. Instead of waiting for a message, the worker has to check the cache in the process state. It’s doable, but the code kind of feels messy to me.

Then I realized that I could use pattern matching on the messages. Colloquially, there are two ways messages are “received”: the process’s mailbox can receive a message; the process itself can receive a message. Receiving messages into the mailbox is purely a BEAM consideration, and we programmers have no control over that. But we do have control over how the process receives messages.

When a process receives a message, it will find the first message that matches a pattern in your receive block. Not the first message; the first message that matches a pattern. If more than one pattern matches, the first one is triggered.

So receive will go through the chunk messages in the mailbox and find the right chunk number if only I match on it:

def server(chunk_number, options) do
  receive do
    {:write, {^chunk_number, data}} ->
      write_chunk(chunk_number, data, options)
      server(chunk_number + 1, options)
  end
end

chunk_number is the current chunk number that the process wants. I pattern match on that value (the pinned ^chunk_number). If that chunk number is available in the mailbox, its message is received by this code and processed. If no such message is available, this code waits until the current chunk number comes in, even if other chunks are available. The other chunks will be processed when their chunk number comes up.

Process mailbox as a priority queue!

Aside: did I say “start with chunk number 1”?

Did you cringe a bit when I said that the output worker started with chunk number 1? Did you think it was a typo? Have you been programmed for zero-based indexing? I know I am, and it makes me a bit uncomfortable to start with 1.

I started with zero-based indexing, but then the 0 output became a special case:

def server(0, options) do
  write_header(options)
  server(1, options)
end

This version of server writes the PPM header (my code, wikipedia). It needs to be the first thing in the file. I’m going to sit on this smell for a little while.

Okay. Back to the output worker…

Not a GenServer

So I’m not writing a GenServer (docs, getting started). The output worker is just a little old process spawned by myself.

I started to code it up as a GenServer. It was going pretty well until I got to the match-the-chunk-number-from-the-message trick. I wrote a handle_cast function, pattern matching on the chunk number from the message and the process’s state. It failed. The server always processed the messages in the order they were received, and then sent them to my handle_cast call. By the time the chunk gets passed to handle_cast, the message is out of the mailbox, and I have to process it. And so an out-of-order message would mismatch and cause an error.

At least that’s what I think happened. I experimented enough that it’s a satisfying hypothesis, but I couldn’t find any documentation that confirmed it for me.

I really didn’t want to cache the out-of-order chunks, and re-queuing the chunks seemed… too clever. I might try either one of these in the future, but I decided I wanted to play around with some low-level Elixir code anyway.

spawn, send, and receive solution

You can read the full code for the OutputWorker online.

Spawning a worker process

def start_link(options) do
  pid = spawn_link(__MODULE__, :server, [0, options])
  Process.register(pid, __MODULE__)
  {:ok, pid}
end

I use spawn_link because it has a supervisor.

The 0 is the chunk number, and as I mentioned above, it’s to trigger the PPM header into the output file.

Since other processes need to send chunks to it, I explicitly register the process under its module’s name.

I return {:ok, pid} to mimic a GenServer. I thought I was just being clever, but it turns out that this is necessary for the child of a Supervisor.

Sending a chunk

def write(pid, chunk) do
  send pid, {:write, chunk}
end

This is the “client API” (just like a typical GenServer). It sends a message to the spawned process to write the chunk.

Receiving a chunk

Actually, we saw receiving up above. Check it out there.

server has one more version we didn’t see yet.

Done with chunks and working

def server(number, %Options{chunk_count: count} = options) when number == count+1 do
  notify_next_pid(options, {:done, self})
  server(number + 1, options)
end

This is the second version of server. It wraps things up when the worker has completed the last chunk. Or, well, it tells another process that it’s done. The server doesn’t actually die. I call server one more time with a chunk number that it will never receive in a message. I found that if I didn’t call server and let the process die, the supervisor would restart it (as it’s supposed to). The supervisor would end up restarting the worker several times because the write_header function would always fail because the output handle was already closed. I’m certain if used another supervisor strategy (like :transient) and if my worker returned the right kind of tuple, things would be okay. It wasn’t a problem I wanted to figure out (at least for now), and it seemed wrong to let this process die when the other processes were still alive.

This version also makes use of the “next pid” in the options. This pid is either the testing process or the CLI.

Without the “next pid”, the tests ran too fast for the output, and so all of the actual results were empty (i.e., no output) because the assertions were done before the output was written. If I put a sleep into the test examples, the tests passed, but sleeping is a bit fragile. So the tests wait for the :done message before they assert anything:

defp expect_output(subject, output_pid, output) do
  receive do
    {:done, ^subject} ->
      expect(StringIO.contents(output_pid)) |> to(eq({"", output}))
  after
    1000 ->
      {"", actual} = StringIO.contents(output_pid)
      flunk "spec received no output-done message, expected: #{output}, got: #{actual}"
  end
end

I flunk the assertion if it takes too long to get the :done message. This may be fragile, too, but I can always set that to a more ridiculous amount which won’t affect passing specs.

I realized that the “next pid” idea would also be useful for the CLI. Without that :done notification, the CLI would die immediately after starting up the processes. Now it waits:

def waiting_loop(options) do
  receive do
    {:writing, chunk_number} ->
      IO.puts "writing #{chunk_number} of #{options.chunk_count}"
      waiting_loop(options)
    {:done, _} ->
      Options.close_output_file(options)
  end
end

I also extended the idea so that the workers could send updates to the CLI through the “next pid”, and the CLI could display feedback. Originally, I just IO.puts messages in the OutputWorker, but that cluttered up the test output. Now the tests get to ignore the progress updates.

However, I’m not happy with the name “next pid”. It’s not a good name at all. How exactly is it the next pid? It’s actually the pid of an old process given to this new output worker. Maybe you could say that it’s the pid of the next process that should take over when the output is done, but that’s not all it’s being used for now. There’s got to be something better: “notification pid”, “update pid”, “observer pid”, … This feels like a name that I need to sleep on, so I’ll change it later.

Helper functions

Three helper functions close out the module:

def write_header(options) do
  PPM.p3_header(options.size.width, options.size.height)
  |> Enum.each(&(IO.puts(options.output_pid, &1)))
end

def write_chunk(chunk_number, data, options) do
  notify_next_pid(options, {:writing, chunk_number})
  Enum.each(data, &(IO.puts(options.output_pid, &1)))
end

def notify_next_pid(options, message) do
  send options.next_pid, message
end

I already see some refactorings that I’d like to do with this code, but it’s pretty straightforward in this form. My thinking is that I want to remove the business logic from the server function, and I accomplished that.

Next week

When I originally planned this article, I had a mostly working output worker that made me mostly happy. Then I figured I should actually battle test the worker a bit to see if it would, in fact, fit in with the other processes. So I ended up spending a good deal of time struggling with supervisors and supervisor strategies and tasks and spawned processes and fractal options and chunked data and a command-line interface.

So that’s what I’ll talk about next week: the supervisory tree, the CLI, those pesky fractal options, and chunked data (including the “missing” options).

Footnotes

Wait… no footnotes? I must have done something wrong.1

  1. Phew. I saved myself.

elixir fractals