Last year, GenStage was announced. José Valim gave an excellent talk about it at ElixirConf 2016. GenStage gives more flexibility and power in doing work on a set of data, and makes it easier to do that work in a concurrent and distributed manner.

A part of the announcement that caught my eye was the intended replacement of Erlang’s gen_event. GenEvent’s main shortcoming is that the event manager and the handlers run in the same process, which is a significant barrier to concurrency.

Elixir’s wrapper of gen_event in GenEvent is planned to be deprecated and removed by Elixir 2.0. Any necessary interfacing with gen_event will have to be done by calling it directly. Alternatives to gen_event include a simple supervisor (as José discusses here) or using GenStage.

Verk is a job processing system, and it uses GenEvent for notifications of job starts, ends, or failures. An issue was opened recently to replace GenEvent with GenStage. I am interested in learning more about GenStage, so let’s fix both at the same time.

For the initial version, I wanted to make the simplest version that could replace GenEvent’s current functionality. The existing GenEvent behaviour will also be left alone entirely.

To get started, the optional gen_stage has to be added:

  { :gen_stage, "== 0.11.0", optional: true }

For GenStage, the event manager is replaced by a GenStage producer, that is configured to dispatch events using GenStage.BroadcastDispatcher. It will dispatch all events to all consumers, which is exactly what we are seeking to replicate from GenEvent.

defmodule Verk.EventHandler do
  @moduledoc """
  A GenStage producer that broadcasts events to subscribed consumers.
  """
  use GenStage
  def start_link do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end
  def async_notify(event) do
    GenStage.cast(__MODULE__, {:notify, event})
  end
  def init(:ok) do
    {:producer, :ok, dispatcher: GenStage.BroadcastDispatcher}
  end
  def handle_cast({:notify, event}, state) do
    {:noreply, [event], state}
  end
  def handle_demand(_demand, state) do
    {:noreply, [], state}
  end
end

The GenStage announcement shows an example of a similar module, but with buffering of events in the case that there is no demand. The above module will not. If an event is broadcasted, but there are no consumers, it is lost forever. The EventHandler then needs to be added to Verk’s supervision tree. The supervision tree used to looked like this:

children = [redis, verk_event_manager, queue_stats_watcher, schedule_manager]
           |> Kernel.++(children)

We only want to add the EventHandler in the case where GenStage is available, and Verk is configured to use it, so I added a function to do so:

defp add_gen_stage_event_handler(children) do
  if Application.get_env(:verk, :use_gen_stage, false) && Code.ensure_loaded?(GenStage) do
    [worker(Verk.EventHandler, []) | children]
  else
    children
  end
end

And then we can add it to our chain of functions that is building the supervision tree children:

children = [redis, verk_event_manager, queue_stats_watcher, schedule_manager]
           |> Kernel.++(children)
           |> add_gen_stage_event_handler()

No events are being sent right now, but there’s only one function that handles that functionality. Like before, the GenStage configuration has to be checked:

defp notify!(event) do
  if Application.get_env(:verk, :use_gen_stage, false) && Code.ensure_loaded?(GenStage) do
    Verk.EventHandler.async_notify(event)
  end
  :ok = GenEvent.ack_notify(Verk.EventManager, event)
end

I added some more tests to ensure messages are sent through GenStage as well. Excluding tests, only around 50 lines were changed and Verk is now able to support producing and consuming events via GenStage. To sum up, we added a GenStage handler to Verk using only core GenStage functionality, tests, and opened a pull request here. It’s also possible to add some more functionality around how the events are produced and consumed going forward. Some users may want to be able to have independent producers and consumers for each type of event, or the ability to not produce certain events at all. For now though, this will do.

If you have any feedback, I’d be happy to hear it on Twitter.