Programming in C++

Concurrency

Multithreading

Gerald Senarclens de Grancy

Advantages of Multithreading

  • Perceived parallelism requiring solely a single CPU core
  • CPU core can be used to higher capacity
  • Multiple CPU cores can be used
  • Shared memory space is easy to access
  • Application can continue while waiting for data
  • User interfaces stay responsive during slow background activities
  • Threads are less heavy than processes

Disadvantages of Multithreading

  • Threads are a low level concept
  • Shared memory space can lead to race conditions
  • Concept is not easy to grasp for beginners
  • A crashing thread kills the whole process
  • Threads are still "heavy"
    • Starting typically consumes 50k - 150k CPU cycles
    • Context switching typically consumes 1k - 10k CPU cycles

Managing Threads since C++11

Threads are created using std::thread

template< class F, class... Args >
explicit thread( F&& f, Args&&... args );

The constructor's first argument may be any of the following

  • Plain function pointer
  • Lambda expression
  • Functor (a function object - a class with operator())
  • Member function

After f it takes a variadic number of arguments to f

Selected API Calls

std::thread::join()
Core purpose is synchronization
Acts as a "wait" command
Blocks the current thread until the target thread completes
std::this_thread::sleep_for(const std::chrono::time_point& sleep_time)
Pauses the current thread for at least the specified duration
Current thread transitions from running to blocked (or sleeping) state
CPU scheduler switches to the next thread in the ready queue

Example

#include <iostream>
#include <string>
#include <thread>
using std::chrono::operator""ms;  // allows time point literals in `ms`
void function(std::string name);  // you may use any valid function name

int main() {
  std::thread first(function, "first");  // create and start thread
  std::thread second(function, "second");  // create and start thread
  std::cout << "Message from main()\n";
  first.join();  // wait for `first` to finish (mandatory in C++11)
  second.join();  // wait for `second` to finish (mandatory in C++11)
  std::cout << "Main thread: all worker (background) threads completed\n";
  return 0; // without `join()`, the program would crash when main(.) ends
}

void function(std::string name) {
  for (int i = 0; i < 10; ++i) {
    std::cout << "Message from " << name <<  " thread\n";
    std::this_thread::sleep_for(200ms);
  }
}
Download cpp11thread.cpp

Managing Threads since C++20

C++20 offers std::jthread which is preferable to std::thread

std::jthread has the same general behavior as std::thread

Key advantage: std::jthread automatically joins on destruction

template< class F, class... Args >
explicit jthread( F&& f, Args&&... args );

std::jthread is safe if an exception occurs before join()

Example

#include <iostream>
#include <string>
#include <thread>
using std::chrono::operator""ms;  // allows time point literals in `ms`
void function(std::string name, int count);

int main() {
  std::jthread first(function, "first", 5);  // create and start jthread
  std::jthread second(function, "second", 10);  // create and start jthread
  std::cout << "Message from main()\n";
  first.join();  // not mandatory for C++20 jthread, but still allowed
  std::cout << "Main thread: first background thread completed\n";
  return 0; // manual call to `join()` is not required
}

void function(std::string name, int count) {
  for (int i = 0; i < count; ++i) {
    std::cout << "Message from " << name <<  " thread\n";
    std::this_thread::sleep_for(200ms);
  }
}
Download cpp20thread.cpp

Example: Race Condition

#include <cstdint>
#include <iostream>
#include <thread>
#include <vector>
using std::chrono::operator""ms;
using std::cout, std::endl, std::jthread, std::vector;

class Account {
public:
  uint64_t balance() const { return balance_; }
  void deposit(uint64_t amount) { balance_ += amount; }
  void withdraw(uint64_t amount) { balance_ -= amount; }
private:
  uint64_t balance_{0};
};

int main() {
  Account account{};
  vector<jthread> threads;
  for (int i = 0; i < 10; ++i) {  // 10 jthreads
    threads.push_back(jthread([&]() {
      for (int j{0}; j < 100; ++j) {  // deposit 100 times 1
        account.deposit(1);
        std::this_thread::sleep_for(1ms);
      }
      for (int j{0}; j < 100; ++j) {  // withdraw 100 times 1
        account.withdraw(1);
      }
    }));
  }
  for (auto& th : threads) {
      th.join();
  }
  cout << "balance: " << account.balance() << endl;
}
Download race_condition.cpp

Atomic Operations (std::atomic)

In C++, operations like var++ result in three separate steps for the CPU:

  1. Read the value of var from memory into a register
  2. Increment the value in the register
  3. Write the new value back to memory

If a second thread interrupts between these steps, data may get corrupted

Atomic operations combine all three steps to a single, indivisible unit

Based on CPU instructions

Atomics allow thread-safe code without ever "locking" anything

It is not needed to put threads to sleep (no performance penalty)

+
Extremely fast, no context switching, no deadlocks
-
Only works for simple types (integers, booleans, pointers)
Cannot guard complex logic involving multiple steps or multiple variables

What can be done atomically?

std::atomic supports the following operations at the hardware level:

Increment and decrement
++ and --
Compound Assignment
+=, -=, &=, |=, ^=
Compare-and-swap (CAS)
std::atomic::compare_exchange_weak(T& expected, T desired) , std::atomic::compare_exchange_strong(T& expected, T desired)
Exchange
std::atomic::exchange(T desired)

Example

#include <atomic>
#include <cstdint>
#include <iostream>
#include <thread>
#include <vector>
using std::chrono::operator""ms;
using std::cout, std::endl, std::jthread, std::vector;

class Account {
public:
  uint64_t balance() const { return balance_; }
  void deposit(uint64_t amount) { balance_ += amount; }
  void withdraw(uint64_t amount) { balance_ -= amount; }
private:
  std::atomic<uint64_t> balance_{0};
};

int main() {
  Account account{};
  vector<jthread> threads;
  for (int i = 0; i < 10; ++i) {  // 10 jthreads
    threads.push_back(jthread([&]() {
      for (int j{0}; j < 100; ++j) {  // deposit 100 times 1
        account.deposit(1);
        std::this_thread::sleep_for(1ms);
      }
      for (int j{0}; j < 100; ++j) {  // withdraw 100 times 1
        account.withdraw(1);
      }
    }));
  }
  for (auto& th : threads) {
      th.join();
  }
  cout << "balance: " << account.balance() << endl;
}
Download atomic_ops.cpp

Lock - std::mutex

A lock or mutex (from mutual exclusion) is a synchronization primitive

It prevents state from being modified or accessed by multiple threads at once

If one thread has the lock, all other threads must wait

If a thread hits a locked mutex, the OS puts that thread to sleep

This context switch is expensive in terms of CPU cycles

Beware of deadlocks when using multiple mutexe objects

Common Usage

#include <mutex>  // provides std::mutex and std::lock_guard
std::mutex m;  // construct the mutex in unlocked state
// ...
std::lock_guard<std::mutex> lock;  // acquires ownership of m (calls m.lock())
// critical code

lock's destructor releases ownership of m (calls m.unlock())

Example

#include <iostream>
#include <mutex>  // contains std::mutex and std::lock_guard
#include <thread>
#include <vector>
using std::chrono::operator""ms;
using std::cout, std::endl, std::jthread, std::vector;

class Account {
public:
  uint64_t balance() const {
    std::lock_guard<std::mutex> lock(m_);
    return balance_;
  }
  void deposit(uint64_t amount) {
    std::lock_guard<std::mutex> lock(m_);
    balance_ += amount;
  }
  void withdraw(uint64_t amount) {
    std::lock_guard<std::mutex> lock(m_);
    balance_ -= amount;
  }
private:
  mutable std::mutex m_; // 'mutable' allows locking in const functions
  uint64_t balance_{0};
};

int main() {
  Account account{};
  vector<jthread> threads;
  for (int i = 0; i < 10; ++i) {  // 10 jthreads
    threads.push_back(jthread([&]() {
      for (int j{0}; j < 100; ++j) {  // deposit 100 times 1
        account.deposit(1);
        std::this_thread::sleep_for(1ms);
      }
      for (int j{0}; j < 100; ++j) {  // withdraw 100 times 1
        account.withdraw(1);
      }
    }));
  }
  for (auto& th : threads) {
      th.join();
  }
  cout << "balance: " << account.balance() << endl;
}
Download mutex_lock.cpp

Questions
and feedback...