Testing Concurrency Invariants in a Parallel Executor

Testing Concurrency Invariants in a Parallel Executor

2/24/20263 min • rust
RustConcurrencyAtomicsTesting

TL;DR — In my previous post, I introduced bounded backpressure to my parallel DAG executor.
But concurrency guarantees cannot rely on intuition.

I needed a deterministic way to prove that max_in_flight is never violated — without relying on sleep().

Contents

  1. From Architecture to Invariants
  2. Why sleep() Is Not a Test
  3. Measuring Concurrency with Atomics
  4. Tracking the Maximum with CAS (Optimistic Concurrency)
  5. Forcing Overlap Without sleep()
  6. Implementation vs Measurement
  7. The Final Assertion
  8. The Failure Test
  9. Final Thoughts

1. From Architecture to Invariants

In the previous post on bounded parallelism (Backpressure in a Parallel Executor), I introduced per-worker bounded queues and a global max_in_flight.

That design guarantees bounded dispatch — in theory.

But concurrency guarantees are meaningless unless they are tested.

Invariant:

At no point should more than max_in_flight tasks run concurrently.

We now turn that architectural claim into a measurable property.

2. Why sleep() Is Not a Test

sleep() introduces non-determinism.

OS scheduling makes timing unreliable.
Overlap becomes probabilistic.
Tests become flaky.

We need a way to force concurrency deterministically.

3. Measuring Concurrency with Atomics

To observe concurrency, we need a shared counter.

AtomicUsize is Sync, meaning it is safe to access from multiple threads. To give multiple tasks shared ownership of the same counter, we wrap it in Arc.

let cur = active.fetch_add(1, Ordering::SeqCst) + 1;

This increments the number of tasks currently executing concurrently.

Later:

active.fetch_sub(1, Ordering::SeqCst);

This decrements it.

We use atomics instead of Mutex<usize> because:

  • The critical section is minimal
  • No blocking is required
  • Atomic instructions map directly to CPU primitives
  • We only need lock-free increments/decrements

4. Tracking the Maximum with CAS (Optimistic Concurrency)

To track the peak concurrency:

loop {
    let prev = max_active.load(Ordering::SeqCst);
    if cur <= prev { break; }
 
    if max_active
        .compare_exchange(prev, cur, Ordering::SeqCst, Ordering::SeqCst)
        .is_ok()
    {
        break;
    }
}

This is an example of optimistic concurrency:

  • Assume no interference.
  • Attempt the update.
  • Retry only if another thread won the race.

The result is a lock-free max() operation.

The loop is necessary because multiple threads may attempt to update max_active simultaneously. If another thread updates it between our load and compare_exchange, we retry.

5. Forcing Overlap Without sleep()

To guarantee overlap, tasks block on a shared atomic “gate”:

while !release.load(Ordering::SeqCst) {
    std::hint::spin_loop();
}

The test thread flips release after observing that at least one task has started.

This ensures that multiple tasks are blocked at the same synchronization point before being released together.

In effect, we create a deterministic barrier:

  1. Tasks increment the active counter.
  2. They wait at the gate.
  3. The test thread opens the gate.
  4. All blocked tasks proceed concurrently.

spin_loop() is a CPU hint used during busy-waiting. It does not block the thread; it simply optimizes tight polling loops.

Because this is a test and the wait window is intentionally short, busy waiting is acceptable here.

6. Implementation vs Measurement

The executor itself uses:

  • Message passing
  • Channels
  • Ownership transfer

It avoids shared mutable state.

The test, however, deliberately introduces:

  • Shared atomic counters
  • Direct measurement
  • Explicit synchronization gates

The implementation coordinates work. The test instruments it.

This separation keeps production code simple while allowing strong invariant verification.

7. The Final Assertion

assert!(max_active.load(Ordering::SeqCst) <= cfg.max_in_flight);

A design assumption becomes a measurable invariant

8. The Failure Test

In addition, a second test (parallel_no_hang_on_task_failure_repeat) ensures:

  • No deadlock when tasks fail
  • No executor hang or deadlock under repeated task failures
  • No stuck workers

Running it repeatedly increases the chance of exposing race windows.

9. Final Thoughts

Parallel correctness is not about “it works”.

It is about:

  • Expressing invariants
  • Forcing adversarial scheduling
  • Measuring properties directly

Concurrency must be observable to be trustworthy.

Stay Updated

Get notified when I publish new articles about Web3 development, hackathon experiences, and cryptography insights.

You might also like

Designing Backpressure in a Parallel DAG Executor

How I introduced bounded backpressure into a parallel DAG scheduler using sync channels and an in-flight cap.

Rust — Trait Objects, Sized, and Why My DAG Needed `Box<dyn Fn>`

Why heterogeneous closures require type erasure, how trait objects become unsized, and why 'static is necessary when storing tasks.

Crescent Bench Lab: Measuring ZK Presentations for Real Credentials (JWT + mDL)

A small Rust lab that vendors microsoft/crescent-credentials, generates Crescent test vectors, and benchmarks zksetup/prove/show/verify across several parameters — including proof sizes and selective disclosure variants.