Skip to content

Commit dae9c32

Browse files
authored
feat: support Next/Complete payloads (#148)
1 parent 63b1066 commit dae9c32

File tree

2 files changed

+41
-6
lines changed

2 files changed

+41
-6
lines changed

internal/socket/subscriber_request_channel.go

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,26 @@ import (
1212
"go.uber.org/atomic"
1313
)
1414

15+
// FinalPayload is a marker interface for payloads that should be sent with FlagNext|FlagComplete.
16+
type FinalPayload interface {
17+
payload.Payload
18+
IsFinal() bool
19+
}
20+
21+
// finalPayloadWrapper wraps a payload and marks it as final.
22+
type finalPayloadWrapper struct {
23+
payload.Payload
24+
}
25+
26+
func (f finalPayloadWrapper) IsFinal() bool {
27+
return true
28+
}
29+
30+
// NewFinalPayload creates a final payload that will be sent with FlagNext|FlagComplete.
31+
func NewFinalPayload(p payload.Payload) payload.Payload {
32+
return finalPayloadWrapper{Payload: p}
33+
}
34+
1535
type requestChannelSubscriber struct {
1636
sid uint32
1737
dc *DuplexConnection
@@ -52,15 +72,22 @@ func (r *requestChannelSubscriber) OnSubscribe(ctx context.Context, s rx.Subscri
5272
}
5373

5474
type respondChannelSubscriber struct {
55-
sid uint32
56-
n uint32
57-
dc *DuplexConnection
58-
rcv flux.Processor
59-
subscribed chan<- struct{}
60-
calls *atomic.Int32
75+
sid uint32
76+
n uint32
77+
dc *DuplexConnection
78+
rcv flux.Processor
79+
subscribed chan<- struct{}
80+
calls *atomic.Int32
81+
sentFinalNext atomic.Bool
6182
}
6283

6384
func (r *respondChannelSubscriber) OnNext(next payload.Payload) {
85+
if _, ok := next.(FinalPayload); ok {
86+
r.sentFinalNext.Store(true)
87+
r.OnComplete()
88+
r.dc.sendPayload(r.sid, next, core.FlagNext|core.FlagComplete)
89+
return
90+
}
6491
r.dc.sendPayload(r.sid, next, core.FlagNext)
6592
}
6693

@@ -75,6 +102,9 @@ func (r *respondChannelSubscriber) OnComplete() {
75102
if r.calls.Inc() == 2 {
76103
r.dc.unregister(r.sid)
77104
}
105+
if r.sentFinalNext.Load() {
106+
return
107+
}
78108
complete := framing.NewWriteablePayloadFrame(r.sid, nil, nil, core.FlagComplete)
79109
done := make(chan struct{})
80110
complete.HandleDone(func() {

rsocket.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ func NewAbstractSocket(opts ...OptAbstractSocket) RSocket {
8686
return sk
8787
}
8888

89+
// NewFinalPayload is a wrapper/marker that allows a payload to be sent with both Next and Complete flags.
90+
func NewFinalPayload(p payload.Payload) payload.Payload {
91+
return socket.NewFinalPayload(p)
92+
}
93+
8994
// MetadataPush register request handler for MetadataPush.
9095
func MetadataPush(fn func(request payload.Payload)) OptAbstractSocket {
9196
return func(socket *socket.AbstractRSocket) {

0 commit comments

Comments
 (0)