Skip to content

Commit 039c28a

Browse files
committed
fix(connection): Give pong its own channel
The pong replies to the broker used to share the outbound channel with all other connection traffic. When the channel is full under high load, pongs were being discarded due to try_send on a bounded channel that was full. Not only does this flood the log with errors, but if the broker does not receive the pong in time, it will kill the connection and the cycle will repeat. This commit gives pong its own dedicated bounded(1) channel so that it cannot be crowded out by other outbound traffic. The sink writer drains the pong channel ahead of the main channel via select_biased!, ensuring pong responses are flushed to the socket as soon as possible. Fixes: #408
1 parent cf67345 commit 039c28a

1 file changed

Lines changed: 22 additions & 10 deletions

File tree

src/connection.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ impl crate::authentication::Authentication for Authentication {
9393

9494
pub(crate) struct Receiver<S: Stream<Item = Result<Message, ConnectionError>>> {
9595
inbound: Pin<Box<S>>,
96-
outbound: async_channel::Sender<Message>,
96+
pong_tx: async_channel::Sender<Message>,
9797
error: SharedError,
9898
pending_requests: BTreeMap<RequestKey, oneshot::Sender<Message>>,
9999
consumers: BTreeMap<u64, mpsc::UnboundedSender<Message>>,
@@ -108,15 +108,15 @@ impl<S: Stream<Item = Result<Message, ConnectionError>>> Receiver<S> {
108108
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
109109
pub fn new(
110110
inbound: S,
111-
outbound: async_channel::Sender<Message>,
111+
pong_tx: async_channel::Sender<Message>,
112112
error: SharedError,
113113
registrations: mpsc::UnboundedReceiver<Register>,
114114
shutdown: oneshot::Receiver<()>,
115115
auth_challenge: mpsc::UnboundedSender<()>,
116116
) -> Receiver<S> {
117117
Receiver {
118118
inbound: Box::pin(inbound),
119-
outbound,
119+
pong_tx,
120120
error,
121121
pending_requests: BTreeMap::new(),
122122
received_messages: BTreeMap::new(),
@@ -182,8 +182,8 @@ impl<S: Stream<Item = Result<Message, ConnectionError>>> Future for Receiver<S>
182182
command: BaseCommand { ping: Some(_), .. },
183183
..
184184
} => {
185-
if let Err(e) = self.outbound.try_send(messages::pong()) {
186-
error!("failed to send pong: {}", e);
185+
if self.pong_tx.try_send(messages::pong()).is_err() {
186+
error!("failed to send pong: pong already pending, sink may be stalled");
187187
}
188188
}
189189
Message {
@@ -1242,6 +1242,7 @@ impl<Exe: Executor> Connection<Exe> {
12421242

12431243
let (mut sink, stream) = stream.split();
12441244
let (tx, rx) = async_channel::bounded(outbound_channel_size);
1245+
let (pong_tx, pong_rx) = async_channel::bounded(1);
12451246
let (registrations_tx, registrations_rx) = mpsc::unbounded();
12461247
let error = SharedError::new();
12471248
let (receiver_shutdown_tx, receiver_shutdown_rx) = oneshot::channel();
@@ -1251,7 +1252,7 @@ impl<Exe: Executor> Connection<Exe> {
12511252
.spawn(Box::pin(
12521253
Receiver::new(
12531254
stream,
1254-
tx.clone(),
1255+
pong_tx,
12551256
error.clone(),
12561257
registrations_rx,
12571258
receiver_shutdown_rx,
@@ -1267,8 +1268,19 @@ impl<Exe: Executor> Connection<Exe> {
12671268

12681269
let err = error.clone();
12691270
let res = executor.spawn(Box::pin(async move {
1270-
while let Ok(msg) = rx.recv().await {
1271-
// println!("real sent msg: {:?}", msg);
1271+
loop {
1272+
// Drain pong responses ahead of regular outbound messages so that
1273+
// broker keepalive pings are answered promptly even under high load.
1274+
let msg = futures::select_biased! {
1275+
msg = pong_rx.recv().fuse() => match msg {
1276+
Ok(msg) => msg,
1277+
Err(_) => break,
1278+
},
1279+
msg = rx.recv().fuse() => match msg {
1280+
Ok(msg) => msg,
1281+
Err(_) => break,
1282+
},
1283+
};
12721284
if let Err(e) = sink.send(msg).await {
12731285
err.set(e);
12741286
break;
@@ -1852,7 +1864,7 @@ mod tests {
18521864
))]
18531865
async fn receiver_auth_challenge_test() {
18541866
let (message_tx, message_rx) = mpsc::unbounded();
1855-
let (tx, _) = async_channel::bounded(10);
1867+
let (pong_tx, _pong_rx) = async_channel::bounded(1);
18561868
let (_registrations_tx, registrations_rx) = mpsc::unbounded();
18571869
let error = SharedError::new();
18581870
let (_receiver_shutdown_tx, receiver_shutdown_rx) = oneshot::channel();
@@ -1871,7 +1883,7 @@ mod tests {
18711883

18721884
tokio::spawn(Box::pin(Receiver::new(
18731885
message_rx,
1874-
tx,
1886+
pong_tx,
18751887
error.clone(),
18761888
registrations_rx,
18771889
receiver_shutdown_rx,

0 commit comments

Comments
 (0)