Sai Charan Arvapally
University of Alberta, Canada
← Back to Main Article
HPX's parallel_scheduler is a thin domain-aware wrapper around the
existing
thread_pool_policy_scheduler<hpx::launch>. Rather than building an entirely new
execution backend, it reuses HPX's mature thread pool infrastructure and plugs into stdexec's
domain-based sender transformation pipeline.
get_parallel_scheduler()
|
v
parallel_scheduler (owns shared_ptr<parallel_scheduler_backend>)
|
|-- schedule() --> sender<parallel_scheduler>
| |-- env exposes: get_completion_scheduler<set_value_t>
| | get_domain -> parallel_scheduler_domain
| |-- connect(receiver) --> operation_state
| |-- start() --> checks stop_token
| delegates to backend->schedule(proxy)
|
|-- bulk / bulk_chunked / bulk_unchunked
|-- parallel_scheduler_domain::transform_sender()
|-- extracts underlying thread_pool_policy_scheduler (fast path)
| OR uses backend virtual dispatch (virtual path)
|
|-- Fast Path: creates thread_pool_bulk_sender<..., IsChunked, IsParallel, IsUnsequenced>
| |-- uses work-stealing index queues
| |-- NUMA-aware thread placement
| |-- main-thread participation
| |-- adaptive chunking strategies
|
|-- Virtual Path: creates virtual_parallel_bulk_op
|-- calls backend->schedule_bulk_chunked() or schedule_bulk_unchunked()
|-- type-erased through base_parallel_bulk_op
parallel_scheduler └── shared_ptr<parallel_scheduler_backend> (type-erased, heap-allocated) parallel_scheduler_backend (abstract interface) ├── schedule(std::span<std::byte> storage, receiver_proxy&) ├── schedule_bulk_chunked(std::span<std::byte> storage, n, bulk_proxy&) ├── schedule_bulk_unchunked(std::span<std::byte> storage, n, bulk_proxy&) ├── equal_to(other&) → bool ├── get_underlying_scheduler() → const thread_pool_policy_scheduler* └── get_pu_mask() → const mask_type* hpx_parallel_scheduler_backend (concrete default implementation) └── wraps thread_pool_policy_scheduler + mask_type └── implements all virtual methods using HPX thread pool get_parallel_scheduler() └── queries query_parallel_scheduler_backend() └── wraps the returned shared_ptr in a parallel_scheduler operation_state<Receiver> ├── receiver_ (user's receiver) ├── backend_ (shared_ptr to backend) ├── proxy_ (concrete_receiver_proxy, member — not local) ├── storage_[256] (pre-allocated buffer for backend use) └── start() → backend_->schedule(std::span(storage_), proxy_) operator== → backend_->equal_to(*other.backend_)
parallel_scheduler_receiver_proxy (abstract) ├── set_value() = 0 ├── set_error(exception_ptr) = 0 ├── set_stopped() = 0 └── stop_requested() → bool = 0 parallel_scheduler_bulk_item_receiver_proxy : parallel_scheduler_receiver_proxy └── execute(begin, end) = 0 (for bulk operations) concrete_receiver_proxy<Receiver> : parallel_scheduler_receiver_proxy └── implements all methods by forwarding to the actual receiver concrete_bulk_proxy<F, IsChunked, Values...> : parallel_scheduler_bulk_item_receiver_proxy ├── execute(begin, end): │ if constexpr (IsChunked) │ f_(begin, end, ...values) // chunked: call once per range │ else │ for i in [begin, end): f_(i, ...values) // unchunked: call per index └── set_value() → forwards values to receiver
The parallel scheduler supports four execution policies from the C++ standard library, each controlling different aspects of parallel execution:
std::execution::seq)stdexec::schedule(sched) | stdexec::bulk(stdexec::seq, 1000, [](int i) { process(i); });
Behavior:
is_parallel = falsestd::execution::par)stdexec::schedule(sched) | stdexec::bulk(stdexec::par, 1000, [](int i) { process(i); });
Behavior:
is_parallel = true, is_unsequenced = falsestd::execution::par_unseq)stdexec::schedule(sched) | stdexec::bulk(stdexec::par_unseq, 1000, [](int i) { process(i); });
Behavior:
is_parallel = true, is_unsequenced = truestd::execution::unseq)stdexec::schedule(sched) | stdexec::bulk(stdexec::unseq, 1000, [](int i) { process(i); });
Behavior:
is_parallel = false, is_unsequenced = true// In parallel_scheduler_domain::transform_sender() // Extract policy from bulk operation auto&& [tag, data, child] = sndr; auto&& [pol, shape, f] = data; // Determine execution mode at compile time constexpr bool is_parallel = !is_sequenced_policy_v<std::decay_t<decltype(pol.__get())>>; constexpr bool is_unsequenced = is_unsequenced_bulk_policy_v<std::decay_t<decltype(pol.__get())>>; // Create thread_pool_bulk_sender with policy flags using fast_sender_t = thread_pool_bulk_sender< hpx::launch, ChildSender, Shape, F, is_chunked, is_parallel, is_unsequenced>;
When you call schedule() on the parallel scheduler:
auto sched = hpx::execution::experimental::get_parallel_scheduler(); auto work = stdexec::schedule(sched) | stdexec::then([]() { return 42; }); stdexec::sync_wait(std::move(work));
Step-by-step execution:
sender<parallel_scheduler>operation_state<Receiver>:
shared_ptr<backend>concrete_receiver_proxy wrapping the receiverstop_token.stop_requested()set_stopped(receiver) and returnsbackend_->schedule(std::span(storage_), proxy_)thread_pool_policy_schedulerproxy_.set_value() on completionproxy_.set_value() → stdexec::set_value(receiver_)then() operationP2079R10 §4.2 requires a zero-allocation contract. The parallel scheduler provides a 256-byte pre-allocated buffer to the backend:
// In operation_state alignas(parallel_scheduler_storage_alignment) std::byte storage_[parallel_scheduler_storage_size]; // 256 bytes // Passed to backend backend_->schedule(std::span<std::byte>(storage_), proxy_);
Benefits:
Chunked Mode (bulk_chunked):
f(begin, end, ...values)par)// Chunked bulk operation stdexec::schedule(sched) | stdexec::bulk_chunked(stdexec::par, 1000, [](size_t begin, size_t end) { // Process range [begin, end) for (size_t i = begin; i < end; ++i) { process(i); } });
Unchunked Mode (bulk_unchunked):
f(index, ...values)// Unchunked bulk operation stdexec::schedule(sched) | stdexec::bulk_unchunked(stdexec::par, 1000, [](size_t i) { // Process single index i process(i); });
HPX uses different chunking strategies based on the execution mode:
// Chunked mode: larger chunks for cache locality constexpr std::size_t get_bulk_scheduler_chunk_size_chunked() { return 16; // Process 16 elements per chunk } // Unchunked mode: fine-grained for load balancing constexpr std::size_t get_bulk_scheduler_chunk_size_unchunked() { return 1; // Process 1 element per chunk }
The fast path uses HPX's work-stealing scheduler with per-thread index queues:
Thread Pool (N worker threads)
|
|-- Thread 0: local_queue [0, 16, 32, ...]
|-- Thread 1: local_queue [1, 17, 33, ...]
|-- Thread 2: local_queue [2, 18, 34, ...]
|-- ...
|-- Thread N-1: local_queue [N-1, N+15, N+31, ...]
|
|-- Work-stealing: When a thread finishes its queue,
| it steals from other threads' queues
|
|-- Main thread participation: The calling thread also
participates in work execution
The parallel_scheduler_domain intercepts bulk operations and transforms them
to use HPX's optimized execution:
bulk_chunked_t or bulk_unchunked_tparallel_schedulerseq, par, par_unseq,
unseq)
is_chunked: from sender tagis_parallel: from policy typeis_unsequenced: from policy typeget_underlying_scheduler()get_pu_mask()thread_pool_bulk_senderparallel_bulk_dispatch_sender with virtual data// parallel_scheduler_domain::transform_sender() template <bulk_chunked_or_unchunked_sender Sender, typename Env> auto transform_sender(set_value_t, Sender&& sndr, Env const& env) const { // 1. Extract bulk parameters auto&& [tag, data, child] = sndr; auto&& [pol, shape, f] = data; // 2. Get parallel_scheduler from environment auto par_sched = get_completion_scheduler<set_value_t>(get_env(child)); // 3. Check for fast path availability auto const* underlying_ptr = par_sched.get_underlying_scheduler(); auto const* pu_mask_ptr = par_sched.get_pu_mask(); // 4. Determine execution modes constexpr bool is_chunked = !__sender_for<Sender, bulk_unchunked_t>; constexpr bool is_parallel = !is_sequenced_policy_v<...>; constexpr bool is_unsequenced = is_unsequenced_bulk_policy_v<...>; // 5. Fast path: create optimized thread_pool_bulk_sender if (underlying_ptr != nullptr && pu_mask_ptr != nullptr) { auto fast_sender = thread_pool_bulk_sender< hpx::launch, ChildSender, Shape, F, is_chunked, is_parallel, is_unsequenced>( *underlying_ptr, child, iota_shape, f, *pu_mask_ptr); return dispatch_sender{fast_path_data{fast_sender}}; } // 6. Virtual path: route through backend virtual methods return dispatch_sender{virtual_path_data{ par_sched.get_backend(), shape, f, child}}; }
When using the default hpx_parallel_scheduler_backend, bulk operations take the
fast path through thread_pool_bulk_sender:
Fast Path Execution:
1. Domain transformation creates thread_pool_bulk_sender
2. connect() creates operation_state with:
- Work-stealing index queues (one per thread)
- NUMA-aware thread placement
- Main thread participation
3. start() distributes work across queues:
- Thread 0 gets indices [0, chunk_size, 2*chunk_size, ...]
- Thread 1 gets indices [1, chunk_size+1, 2*chunk_size+1, ...]
- etc.
4. Each thread processes its queue
5. Work-stealing: idle threads steal from busy threads
6. All threads complete → set_value(receiver)
Performance characteristics:
✓ Zero virtual dispatch overhead
✓ Optimized work distribution
✓ NUMA-aware execution
✓ Work-stealing load balancing
✓ Main thread participation
When using a custom backend that doesn't provide get_underlying_scheduler(),
bulk operations take the virtual path:
Virtual Path Execution:
1. Domain transformation creates parallel_bulk_dispatch_sender
2. connect() creates virtual_parallel_bulk_op (heap-allocated)
3. Child sender connects to internal receiver
4. start() on child sender
5. Child completes with values
6. Internal receiver creates concrete_bulk_proxy with:
- Function f
- Values from child
- IsChunked flag
7. Calls backend->schedule_bulk_chunked() or schedule_bulk_unchunked()
8. Backend executes: proxy.execute(begin, end) for each chunk
9. proxy.execute() calls:
- If chunked: f(begin, end, ...values) once
- If unchunked: f(i, ...values) for each i in [begin, end)
10. Backend calls proxy.set_value() when done
11. Proxy forwards to receiver
Performance characteristics:
✓ One heap allocation (virtual_parallel_bulk_op)
✓ Virtual dispatch overhead (negligible for bulk work)
✓ Custom backend controls execution strategy
✓ Flexible for different execution models
To return a single type from transform_sender, we use a variant-based dispatch sender:
template <typename FastSender, typename ChildSender, typename F, bool IsChunked> struct parallel_bulk_dispatch_sender { struct fast_path_data { FastSender sender; }; struct virtual_path_data { shared_ptr<parallel_scheduler_backend> backend; size_t count; F f; ChildSender child; }; std::variant<fast_path_data, virtual_path_data> data_; // connect() checks which path and creates appropriate operation_state template <typename Receiver> auto connect(Receiver&& rcvr) { return std::visit([&](auto& d) { if constexpr (is_fast_path_data<decltype(d)>) { // Fast path: wrap thread_pool_bulk_sender op return dispatch_op{make_unique<fast_parallel_bulk_op>( d.sender, rcvr)}; } else { // Virtual path: create virtual_parallel_bulk_op return dispatch_op{make_unique<virtual_parallel_bulk_op>( d.backend, d.count, d.f, d.child, rcvr)}; } }, data_); } };
| Aspect | Fast Path | Virtual Path |
|---|---|---|
| Heap Allocations | 0 (stack-based) | 1 (virtual_parallel_bulk_op) |
| Virtual Dispatch | None | Per bulk operation |
| Work Distribution | Optimized queues | Backend-defined |
| Work-Stealing | Yes | Backend-dependent |
| NUMA Awareness | Yes | Backend-dependent |
| Use Case | Default HPX backend | Custom backends |
Key Insight: For bulk operations processing thousands of elements, the overhead of one heap allocation and virtual dispatch is negligible compared to the actual work performed. This design provides flexibility for custom backends while maintaining optimal performance for the default HPX backend.
One of the key features of P2079R10 is the replaceability API, which allows applications to provide custom execution backends while maintaining a standard interface. This enables specialized implementations for different hardware, execution models, or performance requirements.
The parallel_scheduler_backend is an abstract interface that defines the contract
for custom backends:
class parallel_scheduler_backend { public: virtual ~parallel_scheduler_backend() = default; // Schedule a single task virtual void schedule( std::span<std::byte> storage, parallel_scheduler_receiver_proxy& proxy) = 0; // Schedule bulk operations (chunked mode) virtual void schedule_bulk_chunked( std::span<std::byte> storage, std::size_t n, parallel_scheduler_bulk_item_receiver_proxy& proxy) = 0; // Schedule bulk operations (unchunked mode) virtual void schedule_bulk_unchunked( std::span<std::byte> storage, std::size_t n, parallel_scheduler_bulk_item_receiver_proxy& proxy) = 0; // Compare backends for equality virtual bool equal_to(const parallel_scheduler_backend& other) const = 0; // Optional: Fast path optimization virtual const thread_pool_policy_scheduler<hpx::launch>* get_underlying_scheduler() const { return nullptr; } // Optional: NUMA mask for thread affinity virtual const mask_type* get_pu_mask() const { return nullptr; } };
Here's how to implement a custom backend for specialized execution:
class my_custom_backend : public parallel_scheduler_backend { my_thread_pool pool_; public: void schedule( std::span<std::byte> storage, parallel_scheduler_receiver_proxy& proxy) override { // Check if work was cancelled if (proxy.stop_requested()) { proxy.set_stopped(); return; } // Submit work to custom thread pool pool_.submit([&proxy]() { try { // Do work... proxy.set_value(); // Signal completion } catch (...) { proxy.set_error(std::current_exception()); } }); } void schedule_bulk_chunked( std::span<std::byte> storage, std::size_t n, parallel_scheduler_bulk_item_receiver_proxy& proxy) override { // Divide work into chunks and distribute constexpr std::size_t chunk_size = 16; std::size_t num_chunks = (n + chunk_size - 1) / chunk_size; for (std::size_t i = 0; i < num_chunks; ++i) { std::size_t begin = i * chunk_size; std::size_t end = std::min(begin + chunk_size, n); pool_.submit([&proxy, begin, end]() { proxy.execute(begin, end); // Process chunk }); } // Wait for all chunks to complete pool_.wait_all(); proxy.set_value(); } void schedule_bulk_unchunked( std::span<std::byte> storage, std::size_t n, parallel_scheduler_bulk_item_receiver_proxy& proxy) override { // Fine-grained: each index is a separate task for (std::size_t i = 0; i < n; ++i) { pool_.submit([&proxy, i]() { proxy.execute(i, i + 1); // Single index }); } pool_.wait_all(); proxy.set_value(); } bool equal_to(const parallel_scheduler_backend& other) const override { return this == &other; // Pointer equality } };
P2079R10 defines a factory function that applications can replace to provide custom backends:
// Default factory function (returns HPX backend) std::shared_ptr<parallel_scheduler_backend> query_parallel_scheduler_backend() { static auto backend = std::make_shared<hpx_parallel_scheduler_backend>(); return backend; } // Application replaces the factory at startup void set_parallel_scheduler_backend_factory( std::function<std::shared_ptr<parallel_scheduler_backend>()> factory); // Usage: Install custom backend int main() { // Replace backend before any parallel work set_parallel_scheduler_backend_factory([]() { return std::make_shared<my_custom_backend>(); }); // Now all parallel schedulers use the custom backend auto sched = hpx::execution::experimental::get_parallel_scheduler(); // ... use sched with custom backend ... }
The parallel_scheduler stores a shared_ptr<parallel_scheduler_backend>,
enabling backend sharing across multiple scheduler instances:
// Multiple schedulers can share the same backend auto sched1 = get_parallel_scheduler(); auto sched2 = get_parallel_scheduler(); // Equality check uses backend->equal_to() if (sched1 == sched2) { // Same backend (default HPX implementation shares a singleton) } // Custom backends control equality semantics class value_based_backend : public parallel_scheduler_backend { int thread_count_; public: bool equal_to(const parallel_scheduler_backend& other) const override { auto* o = dynamic_cast<const value_based_backend*>(&other); return o && o->thread_count_ == thread_count_; } };
All backend methods receive a std::span<std::byte> storage buffer (256 bytes)
that backends can use for small operation states without heap allocation:
void schedule(std::span<std::byte> storage, parallel_scheduler_receiver_proxy& proxy) override { // Option 1: Use storage for small state (zero allocation) if (sizeof(my_op_state) <= storage.size()) { auto* op = new (storage.data()) my_op_state{...}; // ... use op ... } // Option 2: Heap allocate if state is too large else { auto op = std::make_unique<my_op_state>(...); // ... use op ... } }
Key Benefits: The replaceability API provides flexibility without sacrificing performance. The default HPX backend delivers optimal performance through the fast path, while custom backends enable specialized execution strategies for different domains and hardware.
The HPX parallel scheduler implementation demonstrates how to build a high-performance, standards-compliant execution system by:
This architecture balances performance, flexibility, and standards compliance, making it suitable for production use while remaining extensible for future enhancements.