Sai Charan - C++ Developer

HPX Parallel Scheduler Internals

Deep Dive into Architecture and Implementation @ 2025

Sai Charan Arvapally
University of Alberta, Canada
← Back to Main Article

Table of Contents


Architecture Overview

HPX's parallel_scheduler is a thin domain-aware wrapper around the existing thread_pool_policy_scheduler<hpx::launch>. Rather than building an entirely new execution backend, it reuses HPX's mature thread pool infrastructure and plugs into stdexec's domain-based sender transformation pipeline.

Complete Execution Flow

get_parallel_scheduler()
    |
    v
parallel_scheduler  (owns shared_ptr<parallel_scheduler_backend>)
    |
    |-- schedule() --> sender<parallel_scheduler>
    |                    |-- env exposes: get_completion_scheduler<set_value_t>
    |                    |                get_domain -> parallel_scheduler_domain
    |                    |-- connect(receiver) --> operation_state
    |                                        |-- start() --> checks stop_token
    |                                                        delegates to backend->schedule(proxy)
    |
    |-- bulk / bulk_chunked / bulk_unchunked
         |-- parallel_scheduler_domain::transform_sender()
              |-- extracts underlying thread_pool_policy_scheduler (fast path)
              |   OR uses backend virtual dispatch (virtual path)
              |
              |-- Fast Path: creates thread_pool_bulk_sender<..., IsChunked, IsParallel, IsUnsequenced>
              |    |-- uses work-stealing index queues
              |    |-- NUMA-aware thread placement
              |    |-- main-thread participation
              |    |-- adaptive chunking strategies
              |
              |-- Virtual Path: creates virtual_parallel_bulk_op
                   |-- calls backend->schedule_bulk_chunked() or schedule_bulk_unchunked()
                   |-- type-erased through base_parallel_bulk_op

Type Hierarchy and Components

Core Type Hierarchy

parallel_scheduler
  └── shared_ptr<parallel_scheduler_backend>   (type-erased, heap-allocated)

parallel_scheduler_backend (abstract interface)
  ├── schedule(std::span<std::byte> storage, receiver_proxy&)
  ├── schedule_bulk_chunked(std::span<std::byte> storage, n, bulk_proxy&)
  ├── schedule_bulk_unchunked(std::span<std::byte> storage, n, bulk_proxy&)
  ├── equal_to(other&) → bool
  ├── get_underlying_scheduler() → const thread_pool_policy_scheduler*
  └── get_pu_mask() → const mask_type*

hpx_parallel_scheduler_backend (concrete default implementation)
  └── wraps thread_pool_policy_scheduler + mask_type
  └── implements all virtual methods using HPX thread pool

get_parallel_scheduler()
  └── queries query_parallel_scheduler_backend()
  └── wraps the returned shared_ptr in a parallel_scheduler

operation_state<Receiver>
  ├── receiver_                    (user's receiver)
  ├── backend_                     (shared_ptr to backend)
  ├── proxy_                       (concrete_receiver_proxy, member — not local)
  ├── storage_[256]                (pre-allocated buffer for backend use)
  └── start() → backend_->schedule(std::span(storage_), proxy_)

operator== → backend_->equal_to(*other.backend_)

Receiver Proxy Hierarchy

parallel_scheduler_receiver_proxy (abstract)
  ├── set_value() = 0
  ├── set_error(exception_ptr) = 0
  ├── set_stopped() = 0
  └── stop_requested() → bool = 0

parallel_scheduler_bulk_item_receiver_proxy : parallel_scheduler_receiver_proxy
  └── execute(begin, end) = 0     (for bulk operations)

concrete_receiver_proxy<Receiver> : parallel_scheduler_receiver_proxy
  └── implements all methods by forwarding to the actual receiver

concrete_bulk_proxy<F, IsChunked, Values...> : parallel_scheduler_bulk_item_receiver_proxy
  ├── execute(begin, end):
  │    if constexpr (IsChunked)
  │        f_(begin, end, ...values)     // chunked: call once per range
  │    else
  │        for i in [begin, end): f_(i, ...values)  // unchunked: call per index
  └── set_value() → forwards values to receiver

Execution Policies

The parallel scheduler supports four execution policies from the C++ standard library, each controlling different aspects of parallel execution:

1. Sequential Policy (std::execution::seq)

stdexec::schedule(sched)
  | stdexec::bulk(stdexec::seq, 1000, [](int i) { process(i); });

Behavior:

2. Parallel Policy (std::execution::par)

stdexec::schedule(sched)
  | stdexec::bulk(stdexec::par, 1000, [](int i) { process(i); });

Behavior:

3. Parallel Unsequenced Policy (std::execution::par_unseq)

stdexec::schedule(sched)
  | stdexec::bulk(stdexec::par_unseq, 1000, [](int i) { process(i); });

Behavior:

4. Unsequenced Policy (std::execution::unseq)

stdexec::schedule(sched)
  | stdexec::bulk(stdexec::unseq, 1000, [](int i) { process(i); });

Behavior:

Policy Implementation Details

// In parallel_scheduler_domain::transform_sender()

// Extract policy from bulk operation
auto&& [tag, data, child] = sndr;
auto&& [pol, shape, f] = data;

// Determine execution mode at compile time
constexpr bool is_parallel = 
    !is_sequenced_policy_v<std::decay_t<decltype(pol.__get())>>;

constexpr bool is_unsequenced = 
    is_unsequenced_bulk_policy_v<std::decay_t<decltype(pol.__get())>>;

// Create thread_pool_bulk_sender with policy flags
using fast_sender_t = thread_pool_bulk_sender<
    hpx::launch, ChildSender, Shape, F,
    is_chunked, is_parallel, is_unsequenced>;

Task Scheduling Flow

Single Task Scheduling

When you call schedule() on the parallel scheduler:

auto sched = hpx::execution::experimental::get_parallel_scheduler();
auto work = stdexec::schedule(sched) 
  | stdexec::then([]() { return 42; });
stdexec::sync_wait(std::move(work));

Step-by-step execution:

  1. schedule() returns a sender<parallel_scheduler>
  2. connect(receiver) creates an operation_state<Receiver>:
    • Stores the receiver
    • Stores shared_ptr<backend>
    • Creates concrete_receiver_proxy wrapping the receiver
    • Allocates 256-byte storage buffer
  3. start() is called on the operation state:
    • Checks stop_token.stop_requested()
    • If stopped: calls set_stopped(receiver) and returns
    • Otherwise: calls backend_->schedule(std::span(storage_), proxy_)
  4. Backend schedules work:
    • Default HPX backend: submits to thread_pool_policy_scheduler
    • Work-stealing thread picks up the task
    • Executes the task
    • Calls proxy_.set_value() on completion
  5. Proxy forwards completion:
    • proxy_.set_value()stdexec::set_value(receiver_)
    • Receiver processes the value
    • Chain continues with then() operation

Pre-Allocated Storage (Zero-Alloc Contract)

P2079R10 §4.2 requires a zero-allocation contract. The parallel scheduler provides a 256-byte pre-allocated buffer to the backend:

// In operation_state
alignas(parallel_scheduler_storage_alignment)
    std::byte storage_[parallel_scheduler_storage_size];  // 256 bytes

// Passed to backend
backend_->schedule(std::span<std::byte>(storage_), proxy_);

Benefits:


Bulk Operations Deep Dive

Chunked vs Unchunked Execution

Chunked Mode (bulk_chunked):

// Chunked bulk operation
stdexec::schedule(sched)
  | stdexec::bulk_chunked(stdexec::par, 1000, 
      [](size_t begin, size_t end) {
          // Process range [begin, end)
          for (size_t i = begin; i < end; ++i) {
              process(i);
          }
      });

Unchunked Mode (bulk_unchunked):

// Unchunked bulk operation
stdexec::schedule(sched)
  | stdexec::bulk_unchunked(stdexec::par, 1000, 
      [](size_t i) {
          // Process single index i
          process(i);
      });

Chunk Size Strategies

HPX uses different chunking strategies based on the execution mode:

// Chunked mode: larger chunks for cache locality
constexpr std::size_t get_bulk_scheduler_chunk_size_chunked() {
    return 16;  // Process 16 elements per chunk
}

// Unchunked mode: fine-grained for load balancing
constexpr std::size_t get_bulk_scheduler_chunk_size_unchunked() {
    return 1;   // Process 1 element per chunk
}

Work-Stealing Execution

The fast path uses HPX's work-stealing scheduler with per-thread index queues:

Thread Pool (N worker threads)
  |
  |-- Thread 0: local_queue [0, 16, 32, ...]
  |-- Thread 1: local_queue [1, 17, 33, ...]
  |-- Thread 2: local_queue [2, 18, 34, ...]
  |-- ...
  |-- Thread N-1: local_queue [N-1, N+15, N+31, ...]
  |
  |-- Work-stealing: When a thread finishes its queue,
  |                   it steals from other threads' queues
  |
  |-- Main thread participation: The calling thread also
                                  participates in work execution

Domain Transformation Pipeline

The parallel_scheduler_domain intercepts bulk operations and transforms them to use HPX's optimized execution:

Transformation Steps

  1. Detect bulk operation:
    • Check if sender is bulk_chunked_t or bulk_unchunked_t
    • Verify it completes on parallel_scheduler
  2. Extract parameters:
    • Execution policy (seq, par, par_unseq, unseq)
    • Shape (number of iterations)
    • Function to execute
    • Child sender
  3. Determine execution mode:
    • is_chunked: from sender tag
    • is_parallel: from policy type
    • is_unsequenced: from policy type
  4. Check for fast path:
    • Query get_underlying_scheduler()
    • Query get_pu_mask()
    • If both return non-null: use fast path
    • Otherwise: use virtual dispatch path
  5. Create appropriate sender:
    • Fast path: thread_pool_bulk_sender
    • Virtual path: parallel_bulk_dispatch_sender with virtual data

Domain Transformation Code Flow

// parallel_scheduler_domain::transform_sender()

template <bulk_chunked_or_unchunked_sender Sender, typename Env>
auto transform_sender(set_value_t, Sender&& sndr, Env const& env) const {
    // 1. Extract bulk parameters
    auto&& [tag, data, child] = sndr;
    auto&& [pol, shape, f] = data;
    
    // 2. Get parallel_scheduler from environment
    auto par_sched = get_completion_scheduler<set_value_t>(get_env(child));
    
    // 3. Check for fast path availability
    auto const* underlying_ptr = par_sched.get_underlying_scheduler();
    auto const* pu_mask_ptr = par_sched.get_pu_mask();
    
    // 4. Determine execution modes
    constexpr bool is_chunked = !__sender_for<Sender, bulk_unchunked_t>;
    constexpr bool is_parallel = !is_sequenced_policy_v<...>;
    constexpr bool is_unsequenced = is_unsequenced_bulk_policy_v<...>;
    
    // 5. Fast path: create optimized thread_pool_bulk_sender
    if (underlying_ptr != nullptr && pu_mask_ptr != nullptr) {
        auto fast_sender = thread_pool_bulk_sender<
            hpx::launch, ChildSender, Shape, F,
            is_chunked, is_parallel, is_unsequenced>(
                *underlying_ptr, child, iota_shape, f, *pu_mask_ptr);
        return dispatch_sender{fast_path_data{fast_sender}};
    }
    
    // 6. Virtual path: route through backend virtual methods
    return dispatch_sender{virtual_path_data{
        par_sched.get_backend(), shape, f, child}};
}

Fast Path vs Virtual Path

Fast Path (Default HPX Backend)

When using the default hpx_parallel_scheduler_backend, bulk operations take the fast path through thread_pool_bulk_sender:

Fast Path Execution:
  1. Domain transformation creates thread_pool_bulk_sender
  2. connect() creates operation_state with:
     - Work-stealing index queues (one per thread)
     - NUMA-aware thread placement
     - Main thread participation
  3. start() distributes work across queues:
     - Thread 0 gets indices [0, chunk_size, 2*chunk_size, ...]
     - Thread 1 gets indices [1, chunk_size+1, 2*chunk_size+1, ...]
     - etc.
  4. Each thread processes its queue
  5. Work-stealing: idle threads steal from busy threads
  6. All threads complete → set_value(receiver)

Performance characteristics:
  ✓ Zero virtual dispatch overhead
  ✓ Optimized work distribution
  ✓ NUMA-aware execution
  ✓ Work-stealing load balancing
  ✓ Main thread participation

Virtual Path (Custom Backends)

When using a custom backend that doesn't provide get_underlying_scheduler(), bulk operations take the virtual path:

Virtual Path Execution:
  1. Domain transformation creates parallel_bulk_dispatch_sender
  2. connect() creates virtual_parallel_bulk_op (heap-allocated)
  3. Child sender connects to internal receiver
  4. start() on child sender
  5. Child completes with values
  6. Internal receiver creates concrete_bulk_proxy with:
     - Function f
     - Values from child
     - IsChunked flag
  7. Calls backend->schedule_bulk_chunked() or schedule_bulk_unchunked()
  8. Backend executes: proxy.execute(begin, end) for each chunk
  9. proxy.execute() calls:
     - If chunked: f(begin, end, ...values) once
     - If unchunked: f(i, ...values) for each i in [begin, end)
  10. Backend calls proxy.set_value() when done
  11. Proxy forwards to receiver

Performance characteristics:
  ✓ One heap allocation (virtual_parallel_bulk_op)
  ✓ Virtual dispatch overhead (negligible for bulk work)
  ✓ Custom backend controls execution strategy
  ✓ Flexible for different execution models

Type-Erased Dispatch Sender

To return a single type from transform_sender, we use a variant-based dispatch sender:

template <typename FastSender, typename ChildSender, 
          typename F, bool IsChunked>
struct parallel_bulk_dispatch_sender {
    struct fast_path_data { FastSender sender; };
    struct virtual_path_data {
        shared_ptr<parallel_scheduler_backend> backend;
        size_t count;
        F f;
        ChildSender child;
    };
    
    std::variant<fast_path_data, virtual_path_data> data_;
    
    // connect() checks which path and creates appropriate operation_state
    template <typename Receiver>
    auto connect(Receiver&& rcvr) {
        return std::visit([&](auto& d) {
            if constexpr (is_fast_path_data<decltype(d)>) {
                // Fast path: wrap thread_pool_bulk_sender op
                return dispatch_op{make_unique<fast_parallel_bulk_op>(
                    d.sender, rcvr)};
            } else {
                // Virtual path: create virtual_parallel_bulk_op
                return dispatch_op{make_unique<virtual_parallel_bulk_op>(
                    d.backend, d.count, d.f, d.child, rcvr)};
            }
        }, data_);
    }
};

Performance Comparison

Aspect Fast Path Virtual Path
Heap Allocations 0 (stack-based) 1 (virtual_parallel_bulk_op)
Virtual Dispatch None Per bulk operation
Work Distribution Optimized queues Backend-defined
Work-Stealing Yes Backend-dependent
NUMA Awareness Yes Backend-dependent
Use Case Default HPX backend Custom backends

Key Insight: For bulk operations processing thousands of elements, the overhead of one heap allocation and virtual dispatch is negligible compared to the actual work performed. This design provides flexibility for custom backends while maintaining optimal performance for the default HPX backend.


Replaceability API (P2079R10)

One of the key features of P2079R10 is the replaceability API, which allows applications to provide custom execution backends while maintaining a standard interface. This enables specialized implementations for different hardware, execution models, or performance requirements.

Backend Interface

The parallel_scheduler_backend is an abstract interface that defines the contract for custom backends:

class parallel_scheduler_backend {
public:
    virtual ~parallel_scheduler_backend() = default;

    // Schedule a single task
    virtual void schedule(
        std::span<std::byte> storage,
        parallel_scheduler_receiver_proxy& proxy) = 0;

    // Schedule bulk operations (chunked mode)
    virtual void schedule_bulk_chunked(
        std::span<std::byte> storage,
        std::size_t n,
        parallel_scheduler_bulk_item_receiver_proxy& proxy) = 0;

    // Schedule bulk operations (unchunked mode)
    virtual void schedule_bulk_unchunked(
        std::span<std::byte> storage,
        std::size_t n,
        parallel_scheduler_bulk_item_receiver_proxy& proxy) = 0;

    // Compare backends for equality
    virtual bool equal_to(const parallel_scheduler_backend& other) const = 0;

    // Optional: Fast path optimization
    virtual const thread_pool_policy_scheduler<hpx::launch>*
        get_underlying_scheduler() const { return nullptr; }

    // Optional: NUMA mask for thread affinity
    virtual const mask_type* get_pu_mask() const { return nullptr; }
};

Custom Backend Example

Here's how to implement a custom backend for specialized execution:

class my_custom_backend : public parallel_scheduler_backend {
    my_thread_pool pool_;

public:
    void schedule(
        std::span<std::byte> storage,
        parallel_scheduler_receiver_proxy& proxy) override
    {
        // Check if work was cancelled
        if (proxy.stop_requested()) {
            proxy.set_stopped();
            return;
        }

        // Submit work to custom thread pool
        pool_.submit([&proxy]() {
            try {
                // Do work...
                proxy.set_value();  // Signal completion
            } catch (...) {
                proxy.set_error(std::current_exception());
            }
        });
    }

    void schedule_bulk_chunked(
        std::span<std::byte> storage,
        std::size_t n,
        parallel_scheduler_bulk_item_receiver_proxy& proxy) override
    {
        // Divide work into chunks and distribute
        constexpr std::size_t chunk_size = 16;
        std::size_t num_chunks = (n + chunk_size - 1) / chunk_size;

        for (std::size_t i = 0; i < num_chunks; ++i) {
            std::size_t begin = i * chunk_size;
            std::size_t end = std::min(begin + chunk_size, n);

            pool_.submit([&proxy, begin, end]() {
                proxy.execute(begin, end);  // Process chunk
            });
        }

        // Wait for all chunks to complete
        pool_.wait_all();
        proxy.set_value();
    }

    void schedule_bulk_unchunked(
        std::span<std::byte> storage,
        std::size_t n,
        parallel_scheduler_bulk_item_receiver_proxy& proxy) override
    {
        // Fine-grained: each index is a separate task
        for (std::size_t i = 0; i < n; ++i) {
            pool_.submit([&proxy, i]() {
                proxy.execute(i, i + 1);  // Single index
            });
        }
        pool_.wait_all();
        proxy.set_value();
    }

    bool equal_to(const parallel_scheduler_backend& other) const override {
        return this == &other;  // Pointer equality
    }
};

Backend Factory and Replacement

P2079R10 defines a factory function that applications can replace to provide custom backends:

// Default factory function (returns HPX backend)
std::shared_ptr<parallel_scheduler_backend>
query_parallel_scheduler_backend() {
    static auto backend = 
        std::make_shared<hpx_parallel_scheduler_backend>();
    return backend;
}

// Application replaces the factory at startup
void set_parallel_scheduler_backend_factory(
    std::function<std::shared_ptr<parallel_scheduler_backend>()> factory);

// Usage: Install custom backend
int main() {
    // Replace backend before any parallel work
    set_parallel_scheduler_backend_factory([]() {
        return std::make_shared<my_custom_backend>();
    });

    // Now all parallel schedulers use the custom backend
    auto sched = hpx::execution::experimental::get_parallel_scheduler();
    // ... use sched with custom backend ...
}

Use Cases for Custom Backends

Backend Sharing and Equality

The parallel_scheduler stores a shared_ptr<parallel_scheduler_backend>, enabling backend sharing across multiple scheduler instances:

// Multiple schedulers can share the same backend
auto sched1 = get_parallel_scheduler();
auto sched2 = get_parallel_scheduler();

// Equality check uses backend->equal_to()
if (sched1 == sched2) {
    // Same backend (default HPX implementation shares a singleton)
}

// Custom backends control equality semantics
class value_based_backend : public parallel_scheduler_backend {
    int thread_count_;
public:
    bool equal_to(const parallel_scheduler_backend& other) const override {
        auto* o = dynamic_cast<const value_based_backend*>(&other);
        return o && o->thread_count_ == thread_count_;
    }
};

Pre-Allocated Storage Contract

All backend methods receive a std::span<std::byte> storage buffer (256 bytes) that backends can use for small operation states without heap allocation:

void schedule(std::span<std::byte> storage,
              parallel_scheduler_receiver_proxy& proxy) override {
    // Option 1: Use storage for small state (zero allocation)
    if (sizeof(my_op_state) <= storage.size()) {
        auto* op = new (storage.data()) my_op_state{...};
        // ... use op ...
    }
    
    // Option 2: Heap allocate if state is too large
    else {
        auto op = std::make_unique<my_op_state>(...);
        // ... use op ...
    }
}

Key Benefits: The replaceability API provides flexibility without sacrificing performance. The default HPX backend delivers optimal performance through the fast path, while custom backends enable specialized execution strategies for different domains and hardware.


Summary

The HPX parallel scheduler implementation demonstrates how to build a high-performance, standards-compliant execution system by:

This architecture balances performance, flexibility, and standards compliance, making it suitable for production use while remaining extensible for future enhancements.