Skip to content

Support drain into blocking#61

Open
acking-you wants to merge 3 commits intofereidani:mainfrom
acking-you:support_drain_into_blocking
Open

Support drain into blocking#61
acking-you wants to merge 3 commits intofereidani:mainfrom
acking-you:support_drain_into_blocking

Conversation

@acking-you
Copy link
Copy Markdown

Closes #60

Summary

Add drain_into_blocking method for both sync and async receivers. This method drains all available messages into a vector, waiting until at least one message is received (unless the channel is closed).

API

// Sync
impl<T> Receiver<T> {
    pub fn drain_into_blocking(&self, vec: &mut Vec<T>) -> Result<usize, ReceiveError>;
}

// Async
impl<T> AsyncReceiver<T> {
    pub fn drain_into_blocking(&self, vec: &mut Vec<T>) -> DrainIntoBlockingFuture<T>;
}

Note: For the async version, "blocking" refers to the semantic behavior (waiting for data), not thread blocking. The method is fully async and yields to the runtime.

Implementation

Why drain_into uses shared macro but drain_into_blocking doesn't

drain_into is non-blocking - it only retrieves currently available data and returns immediately. No waiting mechanism needed, so sync/async receivers share the same code via shared_recv_impl!() macro.

drain_into_blocking needs to wait when no data is available. Sync and async waiting mechanisms are fundamentally different:

Aspect Sync Async
Signal SyncSignal AsyncSignal
Wait thread::park() Poll::Pending + Waker
Wake thread::unpark() waker.wake()

This is the same pattern as recv() vs ReceiveFuture.

Core Logic

  1. Try drain all buffered messages (same as drain_into)
  2. If got data → return immediately
  3. If no data:
    • Check if channel closed → return error
    • Register signal to wait_list
    • Wait for sender to wake us
    • Return 1 (the received message)

Files Changed

  • src/lib.rs: Add Receiver::drain_into_blocking + AsyncReceiver::drain_into_blocking with doctest examples
  • src/future.rs: Add DrainIntoBlockingFuture with Future and Drop impl
  • tests/sync_test.rs: Add tests (basic, wait behavior, MPSC)
  • tests/async_test.rs: Add tests (basic, wait behavior, MPSC)

@acking-you
Copy link
Copy Markdown
Author

@fereidani Would appreciate a review when you have time. Thanks!

@acking-you
Copy link
Copy Markdown
Author

I've applied the implementation of this API to a locally-used geo-location-based smart proxy channel API, which is utilized by around 100 people (as described in the issue). A background task handles batch requests to the geo API to avoid triggering rate limits on IP-based geo APIs caused by multi-threaded access. This is an MPSC scenario, and the solution has worked very well—running for over a week now without any error!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feature Request: recv_many() for Batch Message Receiving

1 participant