Skip to content

Commit 8da6ca1

Browse files
committed
New data delivery mechanism for HTTP/2+ Websocket
A new data_delivery mechanism called 'relay' has been added. It bypasses stream handlers (and the buffering in cowboy_stream_h) and sends the data directly to the process implementing Websocket (and should work for other similar protocols like HTTP/2 WebTransport). Flow control in HTTP/2 is maintained in a simpler way, via a configured flow value that is used to maintain the window to a reasonable value when data is received. The 'relay' data_delivery has been implemented for both HTTP/2 and HTTP/3. It has not been implemented for HTTP/1.1 since switching protocol there overrides the connection process. HTTP/2 Websocket is now better tested. A bug was fixed with the 'stream_handlers' data_delivery where active mode would not be reenabled if it was disabled at some point. The Websocket performance suite has been updated to include tests that do not use Gun. Websocket modules used by the performance suite use the 'relay' data_delivery now. Performance is improved significantly with 'relay', between 10% and 20% faster. HTTP/2 Websocket performance is not on par with HTTP/1.1 still, but the remaining difference is thought to be from the HTTP/2 overhead and flow control.
1 parent a8c7177 commit 8da6ca1

21 files changed

Lines changed: 494 additions & 79 deletions

doc/src/manual/cowboy_websocket.asciidoc

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -200,14 +200,16 @@ Cowboy does it automatically for you.
200200
[source,erlang]
201201
----
202202
opts() :: #{
203-
active_n => pos_integer(),
204-
compress => boolean(),
205-
deflate_opts => cow_ws:deflate_opts()
206-
dynamic_buffer => false | {pos_integer(), pos_integer()},
207-
idle_timeout => timeout(),
208-
max_frame_size => non_neg_integer() | infinity,
209-
req_filter => fun((cowboy_req:req()) -> map()),
210-
validate_utf8 => boolean()
203+
active_n => pos_integer(),
204+
compress => boolean(),
205+
data_delivery => stream_handlers | relay,
206+
data_delivery_flow => pos_integer(),
207+
deflate_opts => cow_ws:deflate_opts(),
208+
dynamic_buffer => false | {pos_integer(), pos_integer()},
209+
idle_timeout => timeout(),
210+
max_frame_size => non_neg_integer() | infinity,
211+
req_filter => fun((cowboy_req:req()) -> map()),
212+
validate_utf8 => boolean()
211213
}
212214
----
213215

@@ -241,6 +243,22 @@ Whether to enable the Websocket frame compression
241243
extension. Frames will only be compressed for the
242244
clients that support this extension.
243245

246+
data_delivery (stream_handlers)::
247+
248+
HTTP/2+ only. Determines how data will be delivered
249+
to the Websocket session process. `stream_handlers`
250+
is the default and makes data go through stream
251+
handlers. `relay` is a faster method introduced in
252+
Cowboy 2.14 and sends data directly. `relay` is
253+
intended to become the default in Cowboy 3.0.
254+
255+
data_delivery_flow (pos_integer())::
256+
257+
When the `relay` data delivery method is used,
258+
this value may be used to decide how much the
259+
flow control window should be for the Websocket
260+
stream. Currently only applies to HTTP/2.
261+
244262
deflate_opts (#{})::
245263

246264
Configuration for the permessage-deflate Websocket
@@ -299,6 +317,10 @@ normal circumstances if necessary.
299317

300318
== Changelog
301319

320+
* *2.14*: The `data_delivery` and `data_delivery_flow` options
321+
were added. The `relay` data delivery mechanism
322+
provides a better way of forwarding data to
323+
HTTP/2+ Websocket session processes.
302324
* *2.13*: The `active_n` default value was changed to `1`.
303325
* *2.13*: The `dynamic_buffer` option was added.
304326
* *2.13*: The `max_frame_size` option can now be set dynamically.

src/cowboy_http2.erl

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,13 @@
8282
-export_type([opts/0]).
8383

8484
-record(stream, {
85-
%% Whether the stream is currently stopping.
86-
status = running :: running | stopping,
85+
%% Whether the stream is currently in a special state.
86+
%%
87+
%% - The running state is the normal state of a stream.
88+
%% - The relaying state is used by extended CONNECT protocols to
89+
%% use a 'relay' data_delivery method.
90+
%% - The stopping state indicates the stream used the 'stop' command.
91+
status = running :: running | {relaying, non_neg_integer(), pid()} | stopping,
8792

8893
%% Flow requested for this stream.
8994
flow = 0 :: non_neg_integer(),
@@ -327,6 +332,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
327332
%% Messages pertaining to a stream.
328333
{{Pid, StreamID}, Msg} when Pid =:= self() ->
329334
before_loop(info(State, StreamID, Msg), Buffer);
335+
{'$cowboy_relay_command', {Pid, StreamID}, RelayCommand} when Pid =:= self() ->
336+
before_loop(relay_command(State, StreamID, RelayCommand), Buffer);
330337
%% Exit signal from children.
331338
Msg = {'EXIT', Pid, _} ->
332339
before_loop(down(State, Pid, Msg), Buffer);
@@ -520,6 +527,14 @@ data_frame(State0=#state{opts=Opts, flow=Flow0, streams=Streams}, StreamID, IsFi
520527
reset_stream(State0, StreamID, {internal_error, {Class, Exception},
521528
'Unhandled exception in cowboy_stream:data/4.'})
522529
end;
530+
%% Stream handlers are not used for the data when relaying.
531+
#{StreamID := #stream{status={relaying, _, RelayPid}}} ->
532+
RelayPid ! {'$cowboy_relay_data', {self(), StreamID}, IsFin, Data},
533+
%% We keep a steady flow using the configured flow value.
534+
%% Because we do not change the 'flow' value the update_window/2
535+
%% function will always maintain this value (of course with
536+
%% thresholds applying).
537+
update_window(State0, StreamID);
523538
%% We ignore DATA frames for streams that are stopping.
524539
#{} ->
525540
State0
@@ -866,6 +881,26 @@ commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade},
866881
commands(State, StreamID, Tail);
867882
%% Use a different protocol within the stream (CONNECT :protocol).
868883
%% @todo Make sure we error out when the feature is disabled.
884+
%% There are two data_delivery: stream_handlers and relay.
885+
%% The former just has the data go through stream handlers
886+
%% like normal requests. The latter relays data directly.
887+
%%
888+
%% @todo When relaying there might be some data that is
889+
%% in stream handlers and that need to be received,
890+
%% depending on whether the protocol sends data
891+
%% before processing the response.
892+
commands(State0=#state{flow=Flow, streams=Streams}, StreamID,
893+
[{switch_protocol, Headers, _Mod, ModState=#{data_delivery := relay}}|Tail]) ->
894+
State1 = info(State0, StreamID, {headers, 200, Headers}),
895+
#{StreamID := Stream} = Streams,
896+
#{data_delivery_pid := RelayPid} = ModState,
897+
%% WINDOW_UPDATE frames updating the window will be sent after
898+
%% the first DATA frame has been received.
899+
RelayFlow = maps:get(data_delivery_flow, ModState, 131072),
900+
State = State1#state{flow=Flow + RelayFlow, streams=Streams#{StreamID => Stream#stream{
901+
status={relaying, RelayFlow, RelayPid},
902+
flow=RelayFlow}}},
903+
commands(State, StreamID, Tail);
869904
commands(State0, StreamID, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
870905
State = info(State0, StreamID, {headers, 200, Headers}),
871906
commands(State, StreamID, Tail);
@@ -881,6 +916,26 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) ->
881916
cowboy:log(Log, Opts),
882917
commands(State, StreamID, Tail).
883918

919+
%% Relay data delivery commands.
920+
921+
relay_command(State, StreamID, DataCmd = {data, _, _}) ->
922+
commands(State, StreamID, [DataCmd]);
923+
%% When going active mode again we set the RelayFlow again
924+
%% and update the window if necessary.
925+
relay_command(State=#state{flow=Flow, streams=Streams}, StreamID, active) ->
926+
#{StreamID := Stream} = Streams,
927+
#stream{status={relaying, RelayFlow, _}} = Stream,
928+
update_window(State#state{flow=Flow + RelayFlow,
929+
streams=Streams#{StreamID => Stream#stream{flow=RelayFlow}}},
930+
StreamID);
931+
%% When going passive mode we don't update the window
932+
%% since we have not incremented it.
933+
relay_command(State=#state{flow=Flow, streams=Streams}, StreamID, passive) ->
934+
#{StreamID := Stream} = Streams,
935+
#stream{flow=StreamFlow} = Stream,
936+
State#state{flow=Flow - StreamFlow,
937+
streams=Streams#{StreamID => Stream#stream{flow=0}}}.
938+
884939
%% Tentatively update the window after the flow was updated.
885940

886941
update_window(State0=#state{socket=Socket, transport=Transport,

src/cowboy_http3.erl

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
%% Whether the stream is currently in a special state.
6868
status :: header | {unidi, control | encoder | decoder}
6969
| normal | {data | ignore, non_neg_integer()} | stopping
70+
| {relaying, normal | {data, non_neg_integer()}, pid()}
7071
| {webtransport_session, normal | {ignore, non_neg_integer()}}
7172
| {webtransport_stream, cow_http3:stream_id()},
7273

@@ -164,6 +165,8 @@ loop(State0=#state{opts=Opts, children=Children}) ->
164165
%% Messages pertaining to a stream.
165166
{{Pid, StreamID}, Msg} when Pid =:= self() ->
166167
loop(info(State0, StreamID, Msg));
168+
{'$cowboy_relay_command', {Pid, StreamID}, RelayCommand} when Pid =:= self() ->
169+
loop(relay_command(State0, StreamID, RelayCommand));
167170
%% WebTransport commands.
168171
{'$webtransport_commands', SessionID, Commands} ->
169172
loop(webtransport_commands(State0, SessionID, Commands));
@@ -299,6 +302,22 @@ parse1(State, Stream=#stream{status={data, Len}, id=StreamID}, Data, IsFin) ->
299302
parse(frame(State, Stream#stream{status=normal}, {data, Data1}, FrameIsFin),
300303
StreamID, Rest, IsFin)
301304
end;
305+
%% This clause mirrors the {data, Len} clause.
306+
parse1(State, Stream=#stream{status={relaying, {data, Len}, RelayPid}, id=StreamID},
307+
Data, IsFin) ->
308+
DataLen = byte_size(Data),
309+
if
310+
DataLen < Len ->
311+
%% We don't have the full frame but this is the end of the
312+
%% data we have. So FrameIsFin is equivalent to IsFin here.
313+
loop(frame(State, Stream#stream{status={relaying, {data, Len - DataLen}, RelayPid}},
314+
{data, Data}, IsFin));
315+
true ->
316+
<<Data1:Len/binary, Rest/bits>> = Data,
317+
FrameIsFin = is_fin(IsFin, Rest),
318+
parse(frame(State, Stream#stream{status={relaying, normal, RelayPid}},
319+
{data, Data1}, FrameIsFin), StreamID, Rest, IsFin)
320+
end;
302321
parse1(State, Stream=#stream{status={ignore, Len}, id=StreamID}, Data, IsFin) ->
303322
DataLen = byte_size(Data),
304323
if
@@ -311,7 +330,7 @@ parse1(State, Stream=#stream{status={ignore, Len}, id=StreamID}, Data, IsFin) ->
311330
end;
312331
%% @todo Clause that discards receiving data for stopping streams.
313332
%% We may receive a few more frames after we abort receiving.
314-
parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) ->
333+
parse1(State=#state{opts=Opts}, Stream=#stream{status=Status0, id=StreamID}, Data, IsFin) ->
315334
case cow_http3:parse(Data) of
316335
{ok, Frame, Rest} ->
317336
FrameIsFin = is_fin(IsFin, Rest),
@@ -322,6 +341,10 @@ parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) ->
322341
{more, Frame = {data, _}, Len} ->
323342
%% We're at the end of the data so FrameIsFin is equivalent to IsFin.
324343
case IsFin of
344+
nofin when element(1, Status0) =:= relaying ->
345+
%% The stream will be stored at the end of processing commands.
346+
Status = setelement(2, Status0, {data, Len}),
347+
loop(frame(State, Stream#stream{status=Status}, Frame, nofin));
325348
nofin ->
326349
%% The stream will be stored at the end of processing commands.
327350
loop(frame(State, Stream#stream{status={data, Len}}, Frame, nofin));
@@ -432,6 +455,9 @@ frame(State=#state{http3_machine=HTTP3Machine0},
432455
terminate(State#state{http3_machine=HTTP3Machine}, Error)
433456
end.
434457

458+
data_frame(State, Stream=#stream{status={relaying, _, RelayPid}, id=StreamID}, IsFin, Data) ->
459+
RelayPid ! {'$cowboy_relay_data', {self(), StreamID}, IsFin, Data},
460+
stream_store(State, Stream);
435461
data_frame(State=#state{opts=Opts},
436462
Stream=#stream{id=StreamID, state=StreamState0}, IsFin, Data) ->
437463
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
@@ -767,6 +793,18 @@ commands(State0, Stream0=#stream{id=StreamID},
767793
},
768794
%% @todo We must propagate the buffer to capsule handling if any.
769795
commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail);
796+
%% There are two data_delivery: stream_handlers and relay.
797+
%% The former just has the data go through stream handlers
798+
%% like normal requests. The latter relays data directly.
799+
commands(State0, Stream0=#stream{id=StreamID},
800+
[{switch_protocol, Headers, _Mod, ModState=#{data_delivery := relay}}|Tail]) ->
801+
State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
802+
Stream1 = #stream{status=normal} = stream_get(State, StreamID),
803+
#{data_delivery_pid := RelayPid} = ModState,
804+
%% We do not set data_delivery_flow because it is managed by quicer
805+
%% and we do not have an easy way to modify it.
806+
Stream = Stream1#stream{status={relaying, normal, RelayPid}},
807+
commands(State, Stream, Tail);
770808
commands(State0, Stream0=#stream{id=StreamID},
771809
[{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
772810
State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
@@ -872,6 +910,20 @@ send_instructions(State=#state{conn=Conn, local_encoder_id=EncoderID},
872910
cowboy_quicer:send(Conn, EncoderID, EncData)),
873911
State.
874912

913+
%% Relay data delivery commands.
914+
915+
relay_command(State, StreamID, DataCmd = {data, _, _}) ->
916+
Stream = stream_get(State, StreamID),
917+
commands(State, Stream, [DataCmd]);
918+
relay_command(State=#state{conn=Conn}, StreamID, active) ->
919+
ok = maybe_socket_error(State,
920+
cowboy_quicer:setopt(Conn, StreamID, active, true)),
921+
State;
922+
relay_command(State=#state{conn=Conn}, StreamID, passive) ->
923+
ok = maybe_socket_error(State,
924+
cowboy_quicer:setopt(Conn, StreamID, active, false)),
925+
State.
926+
875927
%% We mark the stream as being a WebTransport stream
876928
%% and then continue parsing the data as a WebTransport
877929
%% stream. This function is common for incoming unidi

src/cowboy_quicer.erl

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
%% Streams.
2626
-export([start_bidi_stream/2]).
2727
-export([start_unidi_stream/2]).
28+
-export([setopt/4]).
2829
-export([send/3]).
2930
-export([send/4]).
3031
-export([send_datagram/2]).
@@ -54,6 +55,9 @@ start_bidi_stream(_, _) -> no_quicer().
5455
-spec start_unidi_stream(_, _) -> no_return().
5556
start_unidi_stream(_, _) -> no_quicer().
5657

58+
-spec setopt(_, _, _, _) -> no_return().
59+
setopt(_, _, _, _) -> no_quicer().
60+
5761
-spec send(_, _, _) -> no_return().
5862
send(_, _, _) -> no_quicer().
5963

@@ -109,7 +113,7 @@ sockname(Conn) ->
109113
| {error, any()}.
110114

111115
peercert(Conn) ->
112-
quicer_nif:peercert(Conn).
116+
quicer_nif:peercert(Conn).
113117

114118
-spec shutdown(quicer_connection_handle(), quicer_app_errno())
115119
-> ok | {error, any()}.
@@ -154,6 +158,13 @@ start_stream(Conn, InitialData, OpenFlag) ->
154158
Error
155159
end.
156160

161+
-spec setopt(quicer_connection_handle(), cow_http3:stream_id(), active, boolean())
162+
-> ok | {error, any()}.
163+
164+
setopt(_Conn, StreamID, active, Value) ->
165+
StreamRef = get({quicer_stream, StreamID}),
166+
quicer:setopt(StreamRef, active, Value).
167+
157168
-spec send(quicer_connection_handle(), cow_http3:stream_id(), iodata())
158169
-> ok | {error, any()}.
159170

0 commit comments

Comments
 (0)