Producer and Consumer
It is very common to have two tasks running in parallel, one which produces data and the other that consumes that data in some way. One example is a simulation which is expensive and run on a background thread and another consumer thread which processes this simulation and controls a live plot. This allows for a split responsibility, which makes the code more reusable, since the simulation code does not need to be hooked up to the plotting code directly.
Julia provides a data structure called a Channel
, which makes implementing this pattern very easy. A channel is a data structure for storing information, which internally uses locks (mutexes and semaphores) to synchronise data access between different threads.
When constructing a channel, we can specify a capacity along with the data type of the elements stored in the channel:
capacity = 8;
buffer = Channel{Float64}(capacity)
Here, we can store a maximum of 8 floating point numbers in the channel called buffer
.
Now let’s write a function which will send data into the channel. An example function is given below.
function producer_fn(buffer::AbstractChannel, total_items)
for i in 1:total_items
sleep(0.02) # simulate work
put!(buffer, rand(Float64))
end
close(buffer)
nothing
end
A function to produce data and store it in a buffer.
The function put!
is similar to push!
, as it sends the data in the second argument into the channel. However, if the channel is currently full, it will cause the calling thread to hang until there is a free space to put the data in. The final line calling the close
function makes sure that the channel cannot accept any more inputs. Also, closing the channel ensures that the consumer knows that the stream of data has ended when all the elements are used up.
We need to also consume the data, which we will also write in a function given below.
function consumer_fn(buffer::AbstractChannel)
s = zero(eltype(buffer))
for item in buffer
s += item
end
s
end
A function to consume the data produced by the producer function. This is done using a very simple summation.
Here, we are safely iterating through the buffer with a for
loop. This is a safe way to iterate. One can manually iterate through the channel using the take!
command, but using a for
loop like this tends to be a better option.
We can finally write some code to see this in action, using the Threads.@spawn
macro to start work on a different thread.
function producer_fn(buffer::AbstractChannel, total_items)
for i in 1:total_items
sleep(0.02) # simulate work
put!(buffer, rand(Float64))
end
close(buffer)
nothing
end
function consumer_fn(buffer::AbstractChannel)
s = zero(eltype(buffer))
for item in buffer
s += item
end
s
end
buffer = Channel{Float64}(8)
Threads.@spawn producer_fn(buffer, 50);
@time result = consumer_fn(buffer)
1.606553 seconds (55.63 k allocations: 2.897 MiB, 2.74% compilation time)
If we want to schedule the consumer on a different thread as well, the return value from the Threads.@spawn
value is a Task
object, not the result. We have to manually fetch
the result to consume it. For example:
result_task = Threads.@spawn consumer_fn(buffer)
result = fetch(result_task) # hangs until the task is complete
This pattern is very useful when you want to read data from a file and start processing it immediately, without having to wait for the file to finish reading.
Additionally, if producing data and consuming data take very different amounts of time, one can have more producers than consumers and vice versa to scale up the entire process. For example:
buffer = Channel{Float64}(8)
[Threads.@spawn producer_fn(buffer, 10) for _ in 1:5];
@time result = consumer_fn(buffer)
0.296907 seconds (1.25 k allocations: 55.562 KiB, 11 lock conflicts)
Here, the result is roughly what we expect still, but we were able to reduce the time waiting for the buffer to fill up since we had multiple consumers. Note that we must re-open the buffer before execution.
When using channels (and mutexes and semaphores generally), one should try to avoid deadlocking your code. A deadlock occurs when one thread is endlessly waiting for something that will never happen, usually because another thread is also in a deadlock. This usually happens because one thread is waiting for the results of another thread, but the other thread is waiting on the first thread.