-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathclient.v
More file actions
167 lines (138 loc) · 3.62 KB
/
client.v
File metadata and controls
167 lines (138 loc) · 3.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
module redict
import net
struct BaseClient {
options ParsedOptions
mut:
connection_pool &Pooler
on_close ?fn () !
}
fn (mut c BaseClient) new_connection() !&PoolConnection {
mut cn := c.connection_pool.new_connection()!
c.init_connection(mut cn) or {
c.connection_pool.close_connection(mut cn)!
return err
}
return cn
}
fn (mut c BaseClient) get_connection() !&PoolConnection {
return c.retrieve_connection()!
}
fn (mut c BaseClient) retrieve_connection() !&PoolConnection {
mut cn := c.connection_pool.get()!
if cn.initialized {
return cn
}
c.init_connection(mut cn) or {
c.connection_pool.remove(mut cn, err.msg())
return err
}
return cn
}
fn (mut c BaseClient) init_connection(mut cn PoolConnection) ! {
if cn.initialized {
return
}
cn.initialized = true
mut cp := new_single_connection_pool(mut c.connection_pool, mut cn)
mut conn := new_connection(c.options, mut cp)
conn.hello(3, c.options.username, c.options.password, 'einar_hjortdal.redict')
if c.options.db > 0 {
conn.select_db(c.options.db)
}
}
fn (mut c BaseClient) release_connection(mut cn PoolConnection) ! {
c.connection_pool.put(mut cn)!
}
fn (mut c BaseClient) with_connection(f fn (mut PoolConnection) !) ! {
mut cn := c.get_connection()!
f(mut cn) or {
c.release_connection(mut cn)!
return err
}
c.release_connection(mut cn)!
}
fn (mut c BaseClient) dial(address string) !&net.TcpConn {
return c.options.dialer(address)
}
fn (mut c BaseClient) attempt_process(mut cmd Cmder) ! {
mut cmd_ref := unsafe { &cmd } // TODO can unsafe be removed?
c.with_connection(fn [mut cmd_ref] (mut pc PoolConnection) ! {
pc.with_writer(fn [cmd_ref] (mut wr ProtoWriter) ! {
write_cmd(mut wr, cmd_ref)!
})!
pc.with_reader(fn [mut cmd_ref] (mut rd ProtoReader) ! { // forced to use a closure here, passing cmd_ref.read_reply doesn't mutate cmd_ref
cmd_ref.read_reply(mut rd)!
})!
})!
}
fn (mut c BaseClient) process(mut cmd Cmder) ! {
mut last_error := ?IError(none)
for attempt := 0; attempt <= c.options.max_retries; attempt++ {
if attempt > 0 {
// TODO sleep before retry
}
c.attempt_process(mut cmd) or {
if attempt == c.options.max_retries {
return err
} else {
last_error = err
continue
}
}
break
}
if err := last_error {
return err
}
}
// close closes the client, releasing any open resources.
// It is rare to close a Client, as the Client is meant to be long-lived and shared between many coroutines.
fn (mut c BaseClient) close() ! {
c.connection_pool.close()!
}
// Client representing a pool of zero or more underlying connections. A client creates and frees connections
// automatically.
@[heap]
pub struct Client {
BaseClient
CmdableFn
}
// new_client returns a client according to the specified Options.
pub fn new_client(options Options) !&Client {
o := options.init()!
mut c := &Client{
options: o
connection_pool: private_new_connection_pool(o)
}
c.CmdableFn = c.process
return c
}
fn (mut c Client) process(mut cmd Cmder) ! {
c.BaseClient.process(mut cmd) or {
cmd.set_error(err)
return err
}
}
// Connection represents a single connection rather than a pool of connections. A Connection is used
// to start a new client and should not be used unless strictly necessary.
@[heap]
pub struct Connection {
BaseClient
CmdableFn
StatefulCmdableFn
}
fn new_connection(po ParsedOptions, mut cp Pooler) &Connection {
mut c := &Connection{
options: po
connection_pool: cp
}
c.CmdableFn = c.process
c.StatefulCmdableFn = c.process
return c
}
fn (mut c Connection) process(mut cmd Cmder) ! {
c.BaseClient.process(mut cmd) or {
cmd.set_error(err)
return err
}
}