Task-based Parallelism
It is important to acknowledge that using the Threads.@threads
macro is not the only way to achieve parallelism in Julia. There are other libraries that provide their own macros such as LoopVectorization.jl, which provides the @tturbo
for thread-based speed-ups. However, one can also manage the threads directly, using Threads.@spawn
.
Threads.@spawn
works by creating a task that is scheduled to run on any available thread. This returns a task object that can be used to gather results or for synchronisation between the threads. For example, we can write the chunk based parallelism using the Threads.@spawn
convention, with a bit more boilerplate code.
function est_pi_mc_spawn(n)
num_threads = Threads.nthreads()
block_size = div(n, num_threads, RoundUp)
task_handles = Vector{Task}(undef, num_threads)
for i in 1:num_threads
if i == num_threads
num_darts = n-block_size*(num_threads-1)
else
num_darts = block_size
end
task_handles[i] = Threads.@spawn est_pi_mc_serial($num_darts)*$num_darts/4
end
n_c = 0.
for task in task_handles
n_c += fetch(task)
end
return 4 * n_c / n
end
One can see that we now have two for
loops, the first actually spawns the threads to do the work, and the second has to fetch the results from each task handle. Spawning the task simply schedules it to commence, but fetching it waits for the thread to finish. This is a much lower-level approach to threading, and can be useful when you do not know how many times one needs to loop. One will also notice that some values were escaped using the $
character, this makes the macro evaluate and print the value of the chosen variable directly into the expression to allow for better type analysis.
We can call this method to see it works, and compare it to the threaded and chunked version we previously coded.
function est_pi_mc_serial(n)
n_c = zero(typeof(n))
for _ in 1:n
# Choose random numbers between -1 and +1 for x and y
x = rand() * 2 - 1
y = rand() * 2 - 1
# Work out the distance from origin using Pythagoras
r2 = x*x+y*y
# Count point if it is inside the circle (r^2=1)
if r2 <= 1
n_c += 1
end
end
return 4 * n_c / n
end
function est_pi_mc_threaded_chunked(n)
n_threads = Threads.nthreads()
num_inside = zeros(Float64, n_threads)
# Calculate maximum chunk size
chunk_size = div(n, n_threads, RoundUp)
# Create an iterator and collect to turn into an array
iter = collect(enumerate(Iterators.partition(1:n, chunk_size)))
Threads.@threads for info in iter
i, idx_range = info # Unpack the tuple from enumerate
n_block = length(idx_range)
pi_est = est_pi_mc_serial(n_block)
num_inside[i] = pi_est*n_block/4
end
n_c = sum(num_inside)
return 4 * n_c / n
end
function est_pi_mc_spawn(n)
num_threads = Threads.nthreads()
block_size = div(n, num_threads, RoundUp)
task_handles = Vector{Task}(undef, num_threads)
for i in 1:num_threads
if i == num_threads
num_darts = n-block_size*(num_threads-1)
else
num_darts = block_size
end
task_handles[i] = Threads.@spawn est_pi_mc_serial($num_darts)*$num_darts/4
end
n_c = 0.
for task in task_handles
n_c += fetch(task)
end
return 4 * n_c / n
end
n = 1_000_000
@btime est_pi_mc_threaded_chunked($n)
@btime est_pi_mc_spawn($n)
287.500 μs (46 allocations: 5.25 KiB)
516.700 μs (61 allocations: 5.17 KiB)
One can see that the performance is slightly better in this example, but at the cost of added complexity. Using Threads.@spawn
should be done very sparingly, as it can very easily introduce errors at runtime. It is strongly recommended that one sticks to using the Threads.@threads
macro for multithreading parallelisation, and only look at using the Threads.@spawn
if it cannot be implemented otherwise, or one finds that the performance is much better otherwise. For most use-cases, Threads.@threads
is as fast or even faster than an equivalent Threads.@spawn
implementation.