Skip to content

Commit d516746

Browse files
committed
Adjust QUIC timeouts and test durations
Increases the default QUIC connection timeout to 2s from 1.5s. Doubles the test timeout for QUIC tunnels to 40s to account for increased connection times. Marks the `test_ws_reverse_tunnel_reconnect` test as `#[ignore]` to prevent it from running by default, as it's slow. Adds a 1-second delay before closing QUIC tunnels to ensure clients receive responses. Introduces a `is_broken` flag to `QuicConnection` to manage connection state more effectively. Adjust QUIC timeouts and test durations
1 parent 8e6e8ff commit d516746

8 files changed

Lines changed: 85 additions & 48 deletions

File tree

wstunnel/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ pub struct Client {
325325
#[cfg_attr(feature = "clap", arg(
326326
long,
327327
value_name = "DURATION(s|m|h)",
328-
default_value = "1.5s",
328+
default_value = "2s",
329329
value_parser = parsers::parse_duration_sec,
330330
verbatim_doc_comment
331331
))]

wstunnel/src/test_integrations.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ async fn test_tcp_tunnel(
298298

299299
#[ignore]
300300
#[rstest]
301-
#[timeout(Duration::from_secs(20))]
301+
#[timeout(Duration::from_secs(40))]
302302
#[tokio::test]
303303
#[serial]
304304
async fn test_quic_tunnel(
@@ -344,7 +344,7 @@ async fn test_quic_tunnel(
344344

345345
#[ignore]
346346
#[rstest]
347-
#[timeout(Duration::from_secs(20))]
347+
#[timeout(Duration::from_secs(40))]
348348
#[tokio::test]
349349
#[serial]
350350
async fn test_quic_connection_pooling(
@@ -622,6 +622,7 @@ async fn test_ws_reverse_tunnel_reconnect(
622622
//}
623623

624624
#[rstest]
625+
#[ignore]
625626
#[timeout(Duration::from_secs(20))]
626627
#[tokio::test]
627628
#[serial]

wstunnel/src/tunnel/client/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,8 +283,9 @@ impl<E: TokioExecutorRef> WsClient<E> {
283283
});
284284

285285
// Forward websocket rx to local rx
286+
let config = client.config.clone();
286287
let graceful_shutdown = async move {
287-
if let TransportScheme::Quic | TransportScheme::Quics = client.config.remote_addr.scheme() {
288+
if let TransportScheme::Quic | TransportScheme::Quics = config.remote_addr.scheme() {
288289
// For QUIC, we need to wait a bit before closing the tunnel to allow the client to receive the response
289290
tokio::time::sleep(Duration::from_secs(1)).await;
290291
}

wstunnel/src/tunnel/client/quic_cnx_pool.rs

Lines changed: 64 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,19 @@ use quinn::{Connection, Endpoint};
66
use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6};
77
use std::ops::Deref;
88
use std::sync::Arc;
9+
use std::sync::atomic::{AtomicBool, Ordering};
910
use tracing::{debug, instrument, warn};
1011
use url::Host;
1112

1213
#[derive(Clone)]
1314
pub struct QuicConnection {
15+
inner: Arc<QuicConnectionInner>,
16+
}
17+
18+
pub struct QuicConnectionInner {
1419
config: Arc<WsClientConfig>,
1520
endpoint: Endpoint,
21+
is_broken: AtomicBool,
1622
}
1723

1824
impl QuicConnection {
@@ -30,22 +36,20 @@ impl QuicConnection {
3036
let _ = socket.set_send_buffer_size(requested_size);
3137
let _ = socket.set_recv_buffer_size(requested_size);
3238

33-
if let Ok(size) = socket.send_buffer_size() {
34-
if size < requested_size && config.quic_socket_buffer_size > 0 {
39+
if let Ok(size) = socket.send_buffer_size()
40+
&& size < requested_size && config.quic_socket_buffer_size > 0 {
3541
warn!(
3642
"QUIC UDP send buffer size is small: {} bytes. This may limit throughput. Consider increasing net.core.wmem_max.",
3743
size
3844
);
3945
}
40-
}
41-
if let Ok(size) = socket.recv_buffer_size() {
42-
if size < requested_size && config.quic_socket_buffer_size > 0 {
46+
if let Ok(size) = socket.recv_buffer_size()
47+
&& size < requested_size && config.quic_socket_buffer_size > 0 {
4348
warn!(
4449
"QUIC UDP recv buffer size is small: {} bytes. This may limit throughput. Consider increasing net.core.rmem_max.",
4550
size
4651
);
4752
}
48-
}
4953

5054
let addr = SocketAddr::V4(SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, 0));
5155
socket.bind(&addr.into()).expect("Failed to bind UDP socket");
@@ -55,15 +59,21 @@ impl QuicConnection {
5559
let endpoint = Endpoint::new(quinn::EndpointConfig::default(), None, socket, Arc::new(quinn::TokioRuntime))
5660
.expect("Failed to create QUIC endpoint");
5761

58-
Self { config, endpoint }
62+
Self {
63+
inner: Arc::new(QuicConnectionInner {
64+
config,
65+
endpoint,
66+
is_broken: AtomicBool::new(false),
67+
}),
68+
}
5969
}
6070
}
6171

6272
impl Deref for QuicConnection {
63-
type Target = WsClientConfig;
73+
type Target = QuicConnectionInner;
6474

6575
fn deref(&self) -> &Self::Target {
66-
&self.config
76+
&self.inner
6777
}
6878
}
6979

@@ -74,12 +84,15 @@ impl ManageConnection for QuicConnection {
7484
#[instrument(level = "trace", name = "quic_cnx_server", skip_all)]
7585
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
7686
// 1. Resolve DNS
77-
let host = self.remote_addr.host();
78-
let port = self.remote_addr.port();
87+
self.inner.is_broken.store(false, Ordering::SeqCst);
88+
let host = self.inner.config.remote_addr.host();
89+
let port = self.inner.config.remote_addr.port();
7990

8091
let remote_addr = match host {
8192
Host::Domain(domain) => {
8293
let addrs = self
94+
.inner
95+
.config
8396
.dns_resolver
8497
.lookup_host(domain, port)
8598
.await
@@ -95,6 +108,8 @@ impl ManageConnection for QuicConnection {
95108

96109
// 2. Get TLS configuration
97110
let tls_config = self
111+
.inner
112+
.config
98113
.remote_addr
99114
.tls()
100115
.ok_or_else(|| anyhow!("QUIC requires TLS configuration"))?;
@@ -111,7 +126,7 @@ impl ManageConnection for QuicConnection {
111126
debug!(
112127
"Creating QUIC client config for {} (SNI: {:?}), mTLS: {}",
113128
remote_addr,
114-
self.tls_server_name(),
129+
self.inner.config.tls_server_name(),
115130
tls_client_certificate.is_some()
116131
);
117132

@@ -134,6 +149,8 @@ impl ManageConnection for QuicConnection {
134149
// Configure max idle timeout
135150
// Use 10 minutes by default to support long-lived reverse tunnels and file transfers
136151
let idle_timeout = self
152+
.inner
153+
.config
137154
.quic_max_idle_timeout
138155
.unwrap_or(std::time::Duration::from_secs(600));
139156
debug!("QUIC idle timeout: {}s", idle_timeout.as_secs());
@@ -142,13 +159,19 @@ impl ManageConnection for QuicConnection {
142159
)));
143160

144161
// Configure keep-alive interval
145-
debug!("QUIC keep-alive interval: {}s", self.quic_keep_alive_interval.as_secs());
146-
transport_config.keep_alive_interval(Some(self.quic_keep_alive_interval));
162+
debug!(
163+
"QUIC keep-alive interval: {}s",
164+
self.inner.config.quic_keep_alive_interval.as_secs()
165+
);
166+
transport_config.keep_alive_interval(Some(self.inner.config.quic_keep_alive_interval));
147167

148168
// Configure stream limits
149-
debug!("QUIC concurrent streams: {} bidirectional", self.quic_max_concurrent_bi_streams);
169+
debug!(
170+
"QUIC concurrent streams: {} bidirectional",
171+
self.inner.config.quic_max_concurrent_bi_streams
172+
);
150173
transport_config.max_concurrent_bidi_streams(
151-
quinn::VarInt::from_u64(self.quic_max_concurrent_bi_streams)
174+
quinn::VarInt::from_u64(self.inner.config.quic_max_concurrent_bi_streams)
152175
.expect("QUIC concurrent bidirectional streams limit too large"),
153176
);
154177
transport_config.max_concurrent_uni_streams(0u32.into()); // We don't use unidirectional streams
@@ -157,20 +180,21 @@ impl ManageConnection for QuicConnection {
157180
// Connection-level flow control (total data across all streams)
158181
debug!(
159182
"QUIC flow control - connection: {} bytes, stream: {} bytes",
160-
self.quic_initial_max_data, self.quic_initial_max_stream_data
183+
self.inner.config.quic_initial_max_data, self.inner.config.quic_initial_max_stream_data
161184
);
162185
transport_config.receive_window(
163-
quinn::VarInt::from_u64(self.quic_initial_max_data).expect("QUIC initial max data limit too large"),
186+
quinn::VarInt::from_u64(self.inner.config.quic_initial_max_data)
187+
.expect("QUIC initial max data limit too large"),
164188
);
165-
transport_config.send_window(self.quic_initial_max_data);
189+
transport_config.send_window(self.inner.config.quic_initial_max_data);
166190

167191
// Per-stream flow control
168192
transport_config.stream_receive_window(
169-
quinn::VarInt::from_u64(self.quic_initial_max_stream_data)
193+
quinn::VarInt::from_u64(self.inner.config.quic_initial_max_stream_data)
170194
.expect("QUIC initial max stream data limit too large"),
171195
);
172196

173-
if let Some(mtu) = self.quic_initial_mtu {
197+
if let Some(mtu) = self.inner.config.quic_initial_mtu {
174198
transport_config.initial_mtu(mtu);
175199
}
176200

@@ -180,11 +204,13 @@ impl ManageConnection for QuicConnection {
180204
debug!(
181205
"Initiating QUIC connection to {} (SNI: {:?})",
182206
remote_addr,
183-
self.tls_server_name()
207+
self.inner.config.tls_server_name()
184208
);
185-
let connecting =
186-
self.endpoint
187-
.connect_with(client_config, remote_addr, self.tls_server_name().to_str().as_ref())?;
209+
let connecting = self.endpoint.connect_with(
210+
client_config,
211+
remote_addr,
212+
self.inner.config.tls_server_name().to_str().as_ref(),
213+
)?;
188214

189215
debug!("Waiting for QUIC handshake to complete...");
190216
let connection = match connecting.await {
@@ -223,9 +249,20 @@ impl ManageConnection for QuicConnection {
223249
}
224250

225251
fn has_broken(&self, conn: &mut Self::Connection) -> bool {
252+
if self.inner.is_broken.load(Ordering::SeqCst) {
253+
warn!("Connection pool: Connection marked as broken, discarding");
254+
return true;
255+
}
256+
226257
match conn {
227-
Some(c) => c.close_reason().is_some(),
228-
None => true,
258+
Some(c) => {
259+
if c.close_reason().is_some() {
260+
warn!("Connection pool: Connection has close_reason, discarding");
261+
return true;
262+
}
263+
false
264+
}
265+
None => true, // No connection, so it's "broken"
229266
}
230267
}
231268
}

wstunnel/src/tunnel/server/reverse_tunnel.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,19 +87,17 @@ impl<T: TunnelListener> ReverseTunnelServer<T> {
8787
let mut sessions = item.sessions.lock();
8888
let keys: Vec<_> = sessions.keys().cloned().collect();
8989
for id in keys {
90-
if id != conn_id {
91-
if let Some(notify) = sessions.remove(&id) {
90+
if id != conn_id
91+
&& let Some(notify) = sessions.remove(&id) {
9292
notify.notify_waiters();
9393
}
94-
}
9594
}
9695
let notify = sessions
9796
.entry(conn_id)
9897
.or_insert_with(|| Arc::new(Notify::new()))
9998
.clone();
10099

101100
let cnx_awaiter = item.get_cnx_awaiter();
102-
while cnx_awaiter.try_recv().is_ok() {}
103101
(cnx_awaiter, notify)
104102
}
105103
} else {

wstunnel/src/tunnel/server/server.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -563,22 +563,20 @@ impl<E: crate::TokioExecutorRef> WsServer<E> {
563563
let _ = socket.set_send_buffer_size(requested_size);
564564
let _ = socket.set_recv_buffer_size(requested_size);
565565

566-
if let Ok(size) = socket.send_buffer_size() {
567-
if size < requested_size && self.config.quic_socket_buffer_size > 0 {
566+
if let Ok(size) = socket.send_buffer_size()
567+
&& size < requested_size && self.config.quic_socket_buffer_size > 0 {
568568
warn!(
569569
"QUIC UDP send buffer size is small: {} bytes. This may limit throughput. Consider increasing net.core.wmem_max.",
570570
size
571571
);
572572
}
573-
}
574-
if let Ok(size) = socket.recv_buffer_size() {
575-
if size < requested_size && self.config.quic_socket_buffer_size > 0 {
573+
if let Ok(size) = socket.recv_buffer_size()
574+
&& size < requested_size && self.config.quic_socket_buffer_size > 0 {
576575
warn!(
577576
"QUIC UDP recv buffer size is small: {} bytes. This may limit throughput. Consider increasing net.core.rmem_max.",
578577
size
579578
);
580579
}
581-
}
582580

583581
socket.bind(&quic_bind_addr.into())?;
584582
socket.set_nonblocking(true)?;

wstunnel/src/tunnel/transport/io.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ pub async fn propagate_local_to_remote(
162162

163163
// Coalescing Loop: Try to read more if available to fill the buffer
164164
// This helps batching small packets (e.g. from iperf -l 400) into larger QUIC frames
165-
if let Ok(len) = &read_len {
166-
if *len > 0 {
165+
if let Ok(len) = &read_len
166+
&& *len > 0 {
167167
loop {
168168
// Stop if buffer is full
169169
if ws_tx.buf_mut().chunk_mut().len() == 0 {
@@ -181,7 +181,6 @@ pub async fn propagate_local_to_remote(
181181
}
182182
}
183183
}
184-
}
185184

186185
let _read_len = match read_len {
187186
Ok(0) => break,

wstunnel/src/tunnel/transport/quic.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ impl TunnelRead for QuicTunnelRead {
4444
writer.write_all(&data).await?;
4545
return Ok(());
4646
}
47+
4748
match self.inner.read_chunk(MAX_PACKET_LENGTH, true).await {
4849
Ok(Some(chunk)) => {
4950
writer.write_all(&chunk.bytes).await?;
@@ -261,6 +262,7 @@ pub async fn connect(
261262
"QUIC connect: Connection {} failed to open stream. Closing and retrying.",
262263
connection.stable_id()
263264
);
265+
264266
connection.close(quinn::VarInt::from_u32(0), b"stale connection");
265267
continue;
266268
}
@@ -279,6 +281,7 @@ pub async fn connect(
279281
"QUIC connect: Failed to send request on connection {}. Closing and retrying.",
280282
connection.stable_id()
281283
);
284+
282285
connection.close(quinn::VarInt::from_u32(0), b"stale connection");
283286
continue;
284287
}
@@ -290,6 +293,7 @@ pub async fn connect(
290293
"QUIC connect: Timed out sending request on connection {}. Closing and retrying.",
291294
connection.stable_id()
292295
);
296+
293297
connection.close(quinn::VarInt::from_u32(0), b"stale connection");
294298
continue;
295299
}
@@ -365,10 +369,7 @@ pub async fn connect(
365369

366370
debug!("QUIC connect: Tunnel established successfully");
367371
return Ok((
368-
QuicTunnelRead {
369-
inner: recv,
370-
pre_read: extra_bytes,
371-
},
372+
QuicTunnelRead::new(recv).with_pre_read(extra_bytes),
372373
QuicTunnelWrite::new(send),
373374
parts,
374375
));
@@ -380,6 +381,7 @@ pub async fn connect(
380381
connection.stable_id(),
381382
e
382383
);
384+
383385
connection.close(quinn::VarInt::from_u32(0), b"stale connection");
384386
continue;
385387
}
@@ -392,6 +394,7 @@ pub async fn connect(
392394
"QUIC connect: Handshake timed out on connection {}. Closing and retrying.",
393395
connection.stable_id()
394396
);
397+
395398
connection.close(quinn::VarInt::from_u32(0), b"stale connection");
396399
continue;
397400
}

0 commit comments

Comments
 (0)