JM

Table of Contents

Producer and Consumer

It is very common to have two tasks running in parallel, one which produces data and the other that consumes that data in some way. One example is a simulation which is expensive and run on a background thread and another consumer thread which processes this simulation and controls a live plot. This allows for a split responsibility, which makes the code more reusable, since the simulation code does not need to be hooked up to the plotting code directly.

Julia provides a data structure called a Channel, which makes implementing this pattern very easy. A channel is a data structure for storing information, which internally uses locks (mutexes and semaphores) to synchronise data access between different threads.

When constructing a channel, we can specify a capacity along with the data type of the elements stored in the channel:

julia
capacity = 8;
buffer = Channel{Float64}(capacity)

Here, we can store a maximum of 8 floating point numbers in the channel called buffer.

Now let’s write a function which will send data into the channel. An example function is given below.

julia
function producer_fn(buffer::AbstractChannel, total_items)
    for i in 1:total_items
        sleep(0.02) # simulate work
        put!(buffer, rand(Float64))
    end
    close(buffer)
    nothing
end

A function to produce data and store it in a buffer.

The function put! is similar to push!, as it sends the data in the second argument into the channel. However, if the channel is currently full, it will cause the calling thread to hang until there is a free space to put the data in. The final line calling the close function makes sure that the channel cannot accept any more inputs. Also, closing the channel ensures that the consumer knows that the stream of data has ended when all the elements are used up.

We need to also consume the data, which we will also write in a function given below.

julia
function consumer_fn(buffer::AbstractChannel)
    s = zero(eltype(buffer))
    for item in buffer
        s += item
    end
    s
end

A function to consume the data produced by the producer function. This is done using a very simple summation.

Here, we are safely iterating through the buffer with a for loop. This is a safe way to iterate. One can manually iterate through the channel using the take! command, but using a for loop like this tends to be a better option.

We can finally write some code to see this in action, using the Threads.@spawn macro to start work on a different thread.

julia
function producer_fn(buffer::AbstractChannel, total_items)
    for i in 1:total_items
        sleep(0.02) # simulate work
        put!(buffer, rand(Float64))
    end
    close(buffer)
    nothing
end

function consumer_fn(buffer::AbstractChannel)
    s = zero(eltype(buffer))
    for item in buffer
        s += item
    end
    s
end

buffer = Channel{Float64}(8)
Threads.@spawn producer_fn(buffer, 50);
@time result = consumer_fn(buffer)
Output
1.606553 seconds (55.63 k allocations: 2.897 MiB, 2.74% compilation time)

If we want to schedule the consumer on a different thread as well, the return value from the Threads.@spawn value is a Task object, not the result. We have to manually fetch the result to consume it. For example:

julia
result_task = Threads.@spawn consumer_fn(buffer)
result = fetch(result_task) # hangs until the task is complete

This pattern is very useful when you want to read data from a file and start processing it immediately, without having to wait for the file to finish reading.

Additionally, if producing data and consuming data take very different amounts of time, one can have more producers than consumers and vice versa to scale up the entire process. For example:

julia
buffer = Channel{Float64}(8)
[Threads.@spawn producer_fn(buffer, 10) for _ in 1:5];
@time result = consumer_fn(buffer)
Output
0.296907 seconds (1.25 k allocations: 55.562 KiB, 11 lock conflicts)

Here, the result is roughly what we expect still, but we were able to reduce the time waiting for the buffer to fill up since we had multiple consumers. Note that we must re-open the buffer before execution.

When using channels (and mutexes and semaphores generally), one should try to avoid deadlocking your code. A deadlock occurs when one thread is endlessly waiting for something that will never happen, usually because another thread is also in a deadlock. This usually happens because one thread is waiting for the results of another thread, but the other thread is waiting on the first thread.