Skip to content

fix(connection): Give pong its own channel#412

Open
darinspivey wants to merge 3 commits intostreamnative:masterfrom
mezmo:issue_408
Open

fix(connection): Give pong its own channel#412
darinspivey wants to merge 3 commits intostreamnative:masterfrom
mezmo:issue_408

Conversation

@darinspivey
Copy link
Copy Markdown
Contributor

The pong replies to the broker used to share the outbound channel with all other connection traffic. When the channel is full under high load, pongs were being discarded due to try_send on a bounded channel that was full. Not only does this flood the log with errors, but if the broker does not receive the pong in time, it will kill the connection and the cycle will repeat.

This commit gives pong its own dedicated bounded(1) channel so that it cannot be crowded out by other outbound traffic. The sink writer drains the pong channel ahead of the main channel via select_biased!, ensuring pong responses are flushed to the socket as soon as possible.

Fixes: #408

The pong replies to the broker used to share the outbound channel with
all other connection traffic. When the channel is full under high load,
pongs were being discarded due to try_send on a bounded channel that was
full. Not only does this flood the log with errors, but if the broker
does not receive the pong in time, it will kill the connection and the
cycle will repeat.

This commit gives pong its own dedicated bounded(1) channel so that it
cannot be crowded out by other outbound traffic. The sink writer drains
the pong channel ahead of the main channel via select_biased!, ensuring
pong responses are flushed to the socket as soon as possible.

Fixes: streamnative#408
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes connection keepalive instability under high outbound load by ensuring broker ping requests can always be answered with a pong, independent of normal outbound traffic congestion (Fixes #408).

Changes:

  • Introduces a dedicated bounded(1) async_channel for pong responses instead of sharing the main outbound channel.
  • Updates the sink-writer task to prioritize draining the pong channel via select_biased!.
  • Adjusts Receiver and its test wiring to use the new pong sender.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/connection.rs
Comment on lines +1276 to +1280
let msg = futures::select_biased! {
msg = pong_rx.recv().fuse() => match msg {
Ok(msg) => msg,
Err(_) => break,
},
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't feel like it's worth changing. The Receiver only drops when the inbound stream is done, the shutdown signal fires, or a stream error occurred--in all cases the connection is being torn down and error is set, so cascading the sink shutdown is correct behavior. Keeping a pong_tx clone alive in the sink to "keep draining rx" would just delay the inevitable while pretending the connection is still healthy.

Comment thread src/connection.rs Outdated
Comment on lines +186 to +187
if self.pong_tx.try_send(messages::pong()).is_err() {
error!("failed to send pong: pong already pending, sink may be stalled");
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will fix.

Comment thread src/connection.rs
Comment on lines 1246 to 1250
let (tx, rx) = async_channel::bounded(outbound_channel_size);
let (pong_tx, pong_rx) = async_channel::bounded(1);
let (registrations_tx, registrations_rx) = mpsc::unbounded();
let error = SharedError::new();
let (receiver_shutdown_tx, receiver_shutdown_rx) = oneshot::channel();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I added a focused Receiver-level test (receiver_routes_ping_to_pong_channel) that asserts pings get routed to the dedicated pong_tx channel rather than the shared outbound. This guards against the most realistic regression — someone reverting the Receiver back to the shared tx, which would re-introduce #408.

I deliberately didn't add a test for the sink-writer's select_biased! priority itself. That code lives inline inside Connection::new() and isn't unit-testable without either (a) extracting the sink loop into a free function purely for testability, or (b) a TCP-loopback integration test that fills the wire and asserts pong ordering. Both felt like more invasive change than the bot's ask warranted. select_biased! has well-known, language-level semantics, the block is short and unlikely to be silently broken without the channel separation also being undone, and the channel-separation test catches the realistic failure mode.

Happy to do the refactor if you'd prefer the stronger guarantee.

When the pong tries to send, the channel may be closed. Make sure to use
an error messages that makes sense for all conditions.
Adds a focused regression test for the Receiver: when a ping arrives on
the inbound stream, the pong response must land on `pong_tx` (the
dedicated bounded(1) channel) rather than the shared outbound channel.

This is the structural half of the streamnative#408 fix. If anyone reverts the
Receiver back to using the shared outbound `tx`, this test goes red.
Copy link
Copy Markdown
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the channel is full under high load, pongs were being discarded due to try_send on a bounded channel that was full.

This behavior should be expected. Other clients also send Ping/Pong in the same channel of other requests. https://github.com/apache/pulsar/blob/cd0ab9d6ad33f9cde9bcf56177a3f6f9deb9f510/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java#L93

IMO, the ping pong RPC is not only used for simple connectivity detection. It actually detects whether the connection can process the requests in time.

If you have encountered this issue in production, maybe you'd better investigate which other commands have blocked the I/O thread too long

@darinspivey
Copy link
Copy Markdown
Contributor Author

Thanks for pushing back — I want to make sure I'm not papering over a real "your client is unhealthy" signal.

The thing that nags me is that pong is the only outbound message that silently discards under backpressure. After #319, every other control path either blocks (tx.send(msg).await at lines 363, 703, 1311) or surfaces SlowDown to the caller (line 711). Pong's try_send + drop pattern looks like an oversight from the bounded migration in #312 rather than an intentional health signal — if it were intentional, the client-side ping at line 363 would behave the same way and it doesn't.

The Netty comparison is also a bit different in shape: PulsarHandler writes ping/pong onto the same pipeline as application traffic with no bounded application queue, so "can't write" really does mean "I/O is stuck." With tokio's bounded(100), a full channel just means producer fan in is outpacing broker acks — TCP backpressure is regulating upstream, the I/O task itself is fine. We saw that play out in production: sustained publish load → channel full → pong dropped → broker disconnects healthy clients → reconnect, repeat. Meanwhile brokers are stable, but busy--certainly not underprovisioned.

I considered just switching pong to tx.send().await to match the others, but pong_tx.try_send is called from inside Receiver::poll_next (line 185), which is the only task draining the inbound stream. Blocking it on a full outbound stalls SendReceipt and consumer-message routing, which feeds back into more outbound pressure. The dedicated bounded(1) keeps pong guaranteed-immediate without coupling it to the producer queue.

If the worry is losing the diagnostic signal, I'm happy to keep an error! on Closed (already added) and surface Full as a metric so a falling-behind caller is still visible — just without the disconnect cascade. Open to other shapes if youd prefer.

@BewareMyPower
Copy link
Copy Markdown
Contributor

Oh it makes sense. I just asked LLM to write a mermaid graph

flowchart TD
    subgraph App["Application tasks"]
        P1["Producer::send_non_blocking / send"]
        K1["Keepalive task: send_ping()"]
    end

    subgraph CS["ConnectionSender"]
        S1["send_message_non_blocking()"]
        S2["send_ping()"]
    end

    subgraph Reg["registrations_tx (unbounded mpsc)"]
        R1["Register::Request { key, resolver }"]
        R2["Register::Ping { resolver }"]
    end

    subgraph Out["outbound tx (bounded async_channel)"]
        O1["Produce/BaseCommand::Send"]
        O2["Ping"]
        O3["Pong"]
        O4["Other outbound RPCs"]
    end

    subgraph Conn["Connection tasks"]
        RX["Receiver future\nreads broker frames\nand manages pending_requests"]
        TX["Socket writer loop\nwhile let Ok(msg)=rx.recv()\n  sink.send(msg).await"]
    end

    subgraph Broker["Broker socket"]
        B1["TCP / framed sink+stream"]
    end

    P1 --> S1
    K1 --> S2

    S1 --> R1
    S1 -->|tx.try_send or tx.send| O1

    S2 --> R2
    S2 -->|tx.send| O2

    R1 --> RX
    R2 --> RX

    O1 --> TX
    O2 --> TX
    O3 --> TX
    O4 --> TX

    TX --> B1
    B1 --> RX

    RX -->|inbound Ping from broker| O3
Loading

Actually I think #312 mixed up the back pressure on send requests and the socket writes.

We should only use the bounded channel for Send commands:

let (tx, rx) = async_channel::bounded(outbound_channel_size);

We should use a dedicated unbounded channel for other commands, including Pong.

loop {
    let msg = futures::select_biased! {
        msg = control_rx.recv().fuse() => msg?, // other commands are sent via control_tx
        msg = data_rx.recv().fuse() => msg?, // send commands are sent via bounded data_rx
    };

    sink.send(msg).await?;
}

WDYT?

@darinspivey
Copy link
Copy Markdown
Contributor Author

Yeah, this is the right shape--splitting the data plane from the control plane handles pong as a special case of the general rule and gets rid of the latent stall risk on auth_challenge and friends. Happy to take this on.

Quick sanity-check on classification before I start cutting:

  • Pong, client-side ping (line 363), auth_challenge response (line 1311), Ack / Flow / ... / their Close variants -> all control_tx. Anything I'm missing or that you'd put on the data side?
  • block_if_queue_full / SlowDown--I'd keep that for the Send path (it's the actual producer backpressure knob now) and drop it everywhere else since control is unbounded. Sound right?

Also, given the scope is bigger than the original PR, do you want me to push it onto this branch, or close #412 and open a fresh one against master with a more accurat title?

@BewareMyPower
Copy link
Copy Markdown
Contributor

Yes, both are right.

You can open a new PR or just edit this PR's title and description. Both are okay since the main reviewer (me) has all context.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Pong responses discarded under high load, causing broker to kill connections

3 participants