forked from rsocket/rsocket-go
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathabstract_socket.go
More file actions
68 lines (60 loc) · 2.01 KB
/
abstract_socket.go
File metadata and controls
68 lines (60 loc) · 2.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package socket
import (
"github.com/pkg/errors"
"github.com/rsocket/rsocket-go/logger"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx/flux"
"github.com/rsocket/rsocket-go/rx/mono"
)
var (
errUnimplementedMetadataPush = errors.New("METADATA_PUSH is unimplemented")
errUnimplementedFireAndForget = errors.New("FIRE_AND_FORGET is unimplemented")
errUnimplementedRequestResponse = errors.New("REQUEST_RESPONSE is unimplemented")
errUnimplementedRequestStream = errors.New("REQUEST_STREAM is unimplemented")
errUnimplementedRequestChannel = errors.New("REQUEST_CHANNEL is unimplemented")
)
// AbstractRSocket represents an abstract RSocket.
type AbstractRSocket struct {
FF func(payload.Payload)
MP func(payload.Payload)
RR func(payload.Payload) mono.Mono
RS func(payload.Payload) flux.Flux
RC func(payload.Payload, flux.Flux) flux.Flux
}
// MetadataPush starts a request of MetadataPush.
func (a AbstractRSocket) MetadataPush(message payload.Payload) {
if a.MP == nil {
logger.Errorf("%s\n", errUnimplementedMetadataPush)
return
}
a.MP(message)
}
// FireAndForget starts a request of FireAndForget.
func (a AbstractRSocket) FireAndForget(message payload.Payload) {
if a.FF == nil {
logger.Errorf("%s\n", errUnimplementedFireAndForget)
return
}
a.FF(message)
}
// RequestResponse starts a request of RequestResponse.
func (a AbstractRSocket) RequestResponse(message payload.Payload) mono.Mono {
if a.RR == nil {
return mono.Error(errUnimplementedRequestResponse)
}
return a.RR(message)
}
// RequestStream starts a request of RequestStream.
func (a AbstractRSocket) RequestStream(message payload.Payload) flux.Flux {
if a.RS == nil {
return flux.Error(errUnimplementedRequestStream)
}
return a.RS(message)
}
// RequestChannel starts a request of RequestChannel.
func (a AbstractRSocket) RequestChannel(initialRequest payload.Payload, messages flux.Flux) flux.Flux {
if a.RC == nil {
return flux.Error(errUnimplementedRequestChannel)
}
return a.RC(initialRequest, messages)
}