JM

Thread Safety and Race Conditions

You may have noticed something peculiar about the previous section, particularly in the parallel reduction algorithm. This algorithm is broken up into different stages. What one may notice, is that this structure is somewhat arbitrary. One can consider alternative ways in which to break down the sum into pairwise operations. Why were the pairs chosen adjacent, not in an arbitrary order? Another way to break down the sum of n n elements using k k workers is to have each worker sum up nk \left \lfloor \frac{n}{k} \right \rfloor elements (with one worker taking any left-over elements). As kn k \ll n (usually), one can then assign one of the workers to sum the remaining k k elements to produce a final output. The reason we chose this process is to make sure that each worker does not access the same memory at the same time.

Why is this so important? Let us inspect the following example:

julia
function naive_parallel_sum(arr)
    sum_arr = zero(eltype(arr))
    @inbounds Threads.@threads for i in eachindex(arr)
        sum_arr += arr[i]
    end
    return sum_arr
end

A naive implementation of a parallel reduction. This code should not be used without modifications.

Let’s run this code with an example:

julia
function naive_parallel_sum(arr)
    sum_arr = zero(eltype(arr))
    @inbounds Threads.@threads for i in eachindex(arr)
        sum_arr += arr[i]
    end
    return sum_arr
end

arr = rand(1024);
real_sum = sum(arr)

Now we know what to expect, let’s see what is returned from our sum.

julia
naive_parallel_sum(arr)

Now this result is different, let’s do a few runs to see if we get the same incorrect result:

julia
naive_parallel_sum(arr)
naive_parallel_sum(arr)
naive_parallel_sum(arr)

Each time the sum is called, we get a different result. This is the quintessential example of a race condition in our code. We only have one piece of memory, the variable represented by sum_arr, which is being read from and written to by each of our workers in parallel.

Each worker first reads the value of sum_arr and stores this value in a register inside the Arithmetic Logic Unit (ALU) of the current worker. It then reads the value from the input array which is read-only and does not change. Once these two values are inside the ALU, their sum is calculated and then written into the memory of the variable sum_arr. This whole process occurs in parallel, which means that another worker may have changed the value in sum_arr while another worker has an outdated value inside their ALU. Consequently, when the worker with the old value has completed the calculation, it will overwrite the intermediate result from the previous worker. For this reason, the final sum usually only accounts for a much smaller number of elements since most contributions are overwritten. We call this a race condition, since the behaviour of the algorithm depends on the order and the speed of execution, each worker “racing” to read and write to the same piece of memory.

This idea of race conditions overlaps with the idea of thread safety. A thread-safe algorithm can be executed in parallel without introducing any race conditions. An example of an algorithm which is often not thread-safe is a random number generator (RNG), as these are usually all implemented as pseudo-random number generators, which rely on an internal state to generate the next number in a deterministic process that has good enough properties to mimic a random process, while still being repeatable. If you parallelise a random process (e.g. a Monte-Carlo simulation), you should make sure that the random number generation algorithm is thread-safe, or the code may run into race conditions when the RNG reads and mutates its internal state.

The golden rule for thread safety is to check whether multiple cores are accessing the same piece of memory at the same time. If this memory is constant throughout the parallel process, this is usually fine (e.g. all cores have access to your constant data). However, if each worker tries to write to the same piece of memory, this causes a race condition.

Mitigating Race Conditions

As race-conditions are very common and can have very bad results, such as memory corruption, one would like a way to have the benefits of parallel processing, without having to alter the algorithm that much. Computer scientists and software engineers have come up with ways to mitigate this happening.

Atomics

One core concept to understand is the idea of atomics. An atomic operation is one which cannot be broken up into smaller parts done by different processors. An atomic operation must be executed serially or will break. The example from the previous section included an example of an atomic operation - incrementing a variable with another value. The one line of code -

julia
sum_arr += arr[i]
  • can be broken down into 4 distinct operations:
  1. Fetching the value arr[i] and loading into register in the ALU (One can argue that the first operation is not part of the atomic since this fetch operation is from read-only memory with respect to the task.)
  2. Fetching the value sum_arr and loading into register in the ALU
  3. Performing the sum of the two fetched values
  4. Writing the result of the sum back into the memory, represented by sum_arr

You know that the end goal of adding a value from an array to a variable is only valid if sum_arr stays constant during the operation. The operation consisting of these 3 tasks (excluding task 1), can be said to be an atomic operation. Therefore, one require that it be performed in serial, rather than in parallel. We need a way of expressing in code the need to perform atomic operations. In many languages there are special functions and libraries available to allow one to write common atomic operations (such as incrementing a variable) in a readable and thread-safe way. In Julia, there exists native support for atomics in the Threads library, so we can start with:

julia
using Base.Threads;

We can alter the previous algorithm to include this:

julia
function naive_parallel_sum_with_atomic(arr)
    sum_arr = Atomic{eltype(arr)}(zero(eltype(arr)))
    @inbounds Threads.@threads for i in eachindex(arr)
        atomic_add!(sum_arr, arr[i])
    end
    return sum_arr[]
end

A naive implementation of a parallel reduction, using atomics.

julia
function naive_parallel_sum_with_atomic(arr)
    sum_arr = Atomic{eltype(arr)}(zero(eltype(arr)))
    @inbounds Threads.@threads for i in eachindex(arr)
        atomic_add!(sum_arr, arr[i])
    end
    return sum_arr[]
end

naive_parallel_sum_with_atomic(arr)
naive_parallel_sum_with_atomic(arr)
naive_parallel_sum_with_atomic(arr)

Now, if we run this algorithm we will get fairly consistent results. Any errors in the output will be because, unlike normal addition, floating point addition is not associative. However, if we were to benchmark this solution, we would find that it is severely lacking:

julia
@btime naive_parallel_sum_with_atomic($arr)
@btime sum($arr)
Output
11.200 μs (43 allocations: 5.02 KiB)
  36.556 ns (0 allocations: 0 bytes)

This implementation is around 300 times slower than the native implementation of sum. Using atomics is incredibly slow as it forces all the operations to happen sequentially. Additionally, it requires that most threads sit around waiting. In later chapters, we will revisit this problem and implement a much faster algorithm.

If atomics are not very performant, when should they be used? The answer is when the operation running in parallel contains only a small section which needs to be performed atomically. An example would be running an expensive simulation in parallel and aggregating statistics during/after the simulation. These simple operations of aggregating the statistics are likely to be far less expensive than the simulation itself, and each thread spends most of the time in the simulation, and not that much time waiting to access the memory to change the statistics.

The Base.Threads module provides the following functions:

  • atomic_or!
  • atomic_xor!
  • atomic_sub!
  • atomic_min!
  • atomic_max!
  • atomic_cas!
  • atomic_and!
  • atomic_add!

These cover a large array of operations you may need, however, if you require more flexibility, you can use a mutex or a semaphore.

Mutexes and Semaphores

Mutexes and Semaphores are primitives for synchronising operations and processes. A mutex provides “mutual exclusion”, which means that only one task can have access to a mutex at a time, while all other tasks are blocked until control of that mutex is released. You can think of a mutex as being a lock on a door, which provides access to a room (which will act as the metaphor for the resources/operations that can only be accessed by one person at a time). Initially, the door is open, so the first person to use the room can walk in and lock the door behind them. Any other people looking to use that room will be blocked and will have to wait until the first person has finished. Once that person is finished, they need to unlock the door to leave, allowing the next person in the queue to go inside.

A mutex is usually implemented by scheduling blocked threads after the current thread with control of the mutex has been completed. This puts the other threads to sleep until they are able to continue. This can cause performance issues as waking a thread from sleep can be expensive. Alternatively, it can be implemented with a “spin lock”, which has each blocked thread keep checking whether the mutex is available over and over in a continuous cycle. This keeps the thread awake, but wastes many CPU cycles and is much less efficient than putting a thread to sleep if the thread has to wait for a longer time.

A semaphore is slightly different in that it is a signalling method which can symbolise the availability of limited resources. A semaphore is essentially an integer variable which has two atomic operations a wait and signal operation that atomically decrements and increments respectively. The semaphore is initially set to the number of resources available and cannot be decremented below 0. A thread that tries to perform the wait operation when the value is 0, has to wait until another thread releases control and increments the value. The key idea is that a resource can be given and taken by different threads. For example a producer thread can put data into a shared resource, which can then be consumed by another process when the producer sends the “signal” command.

While a mutex can provide mutual exclusion to a single resource / group of resources, a semaphore can represent a buffer or a pool of resources.

In Julia, one should look to use a ReentrantLock to act as a Mutex. Generally, semaphores are far less common, but can be found in additional packages, or easily written oneself using a ReentrantLock. The syntax can be gleamed by the following:

julia
mutex = ReentrantLock()
Threads.@threads for i in 1:length(results)
    # Long running processing
    results[i] = some_function(i)

    lock(mutex) do
        # process the results of results[i] serially
        aggregate_results = add_aggregate!(aggregate_results, results[i])
    end
end

It is important to remember that this method of programming is usually discouraged, due to the performance hit, as there are usually better implementations.