forked from rsocket/rsocket-go
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbase_socket.go
More file actions
126 lines (110 loc) · 2.85 KB
/
base_socket.go
File metadata and controls
126 lines (110 loc) · 2.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package socket
import (
"sync"
"time"
"github.com/pkg/errors"
"github.com/rsocket/rsocket-go/logger"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx/flux"
"github.com/rsocket/rsocket-go/rx/mono"
)
// BaseSocket is basic socket.
type BaseSocket struct {
socket *DuplexConnection
closers []func(error)
once sync.Once
reqLease *leaser
addr string
}
func (p *BaseSocket) SetAddr(addr string) {
p.addr = addr
}
func (p *BaseSocket) Addr() (string, bool) {
if tp := p.socket.currentTransport(); tp != nil {
return tp.Addr()
}
return p.addr, len(p.addr) > 0
}
// FireAndForget sends FireAndForget request.
func (p *BaseSocket) FireAndForget(message payload.Payload) {
if err := p.reqLease.allow(); err != nil {
logger.Warnf("request FireAndForget failed: %v\n", err)
}
p.socket.FireAndForget(message)
}
// MetadataPush sends MetadataPush request.
func (p *BaseSocket) MetadataPush(message payload.Payload) {
p.socket.MetadataPush(message)
}
// RequestResponse sends RequestResponse request.
func (p *BaseSocket) RequestResponse(message payload.Payload) mono.Mono {
if err := p.reqLease.allow(); err != nil {
return mono.Error(err)
}
return p.socket.RequestResponse(message)
}
// RequestStream sends RequestStream request.
func (p *BaseSocket) RequestStream(message payload.Payload) flux.Flux {
if err := p.reqLease.allow(); err != nil {
return flux.Error(err)
}
return p.socket.RequestStream(message)
}
// RequestChannel sends RequestChannel request.
func (p *BaseSocket) RequestChannel(initialRequest payload.Payload, messages flux.Flux) flux.Flux {
if err := p.reqLease.allow(); err != nil {
return flux.Error(err)
}
return p.socket.RequestChannel(initialRequest, messages)
}
// OnClose registers handler when socket closed.
func (p *BaseSocket) OnClose(fn func(error)) {
if fn != nil {
p.closers = append(p.closers, fn)
}
}
// Close closes socket.
func (p *BaseSocket) Close() error {
return p.close(true)
}
func (p *BaseSocket) close(triggerCloser bool) (err error) {
p.once.Do(func() {
err = p.socket.Close()
if !triggerCloser {
return
}
for i := len(p.closers); i > 0; i-- {
func(fn func(error)) {
defer func() {
rec := recover()
if rec == nil {
return
}
var err error
if e, ok := rec.(error); ok {
err = errors.WithStack(e)
} else {
err = errors.Errorf("%v", rec)
}
logger.Errorf("handle socket closer failed: %+v\n", err)
}()
fn(err)
}(p.closers[i-1])
}
})
return
}
func (p *BaseSocket) refreshLease(ttl time.Duration, n int64) {
deadline := time.Now().Add(ttl)
if p.reqLease == nil {
p.reqLease = newLeaser(deadline, n)
} else {
p.reqLease.refresh(deadline, n)
}
}
// NewBaseSocket creates a new BaseSocket.
func NewBaseSocket(rawSocket *DuplexConnection) *BaseSocket {
return &BaseSocket{
socket: rawSocket,
}
}