-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Expand file tree
/
Copy pathclient.py
More file actions
283 lines (227 loc) · 9.4 KB
/
client.py
File metadata and controls
283 lines (227 loc) · 9.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
"""Tinode gRPC client operations and message handling."""
from __future__ import print_function
import grpc
import json
import sys
import time
from tinode_grpc import pb
from tinode_grpc import pbx
import tn_globals
from tn_globals import printerr, stdoutln, to_json
from utils import dotdict
# 5 seconds timeout for .await/.must commands.
AWAIT_TIMEOUT = 5
# Handle {ctrl} server response
def handle_ctrl(ctrl):
# Run code on command completion
func = tn_globals.OnCompletion.get(ctrl.id)
if func:
del tn_globals.OnCompletion[ctrl.id]
if ctrl.code >= 200 and ctrl.code < 400:
func(ctrl.params)
if tn_globals.WaitingFor and tn_globals.WaitingFor.await_id == ctrl.id:
if 'varname' in tn_globals.WaitingFor:
tn_globals.Variables[tn_globals.WaitingFor.varname] = ctrl
if tn_globals.WaitingFor.failOnError and ctrl.code >= 400:
raise Exception(str(ctrl.code) + " " + ctrl.text)
tn_globals.WaitingFor = None
topic = " (" + str(ctrl.topic) + ")" if ctrl.topic else ""
stdoutln("\r<= " + str(ctrl.code) + " " + ctrl.text + topic)
# Lambda for handling login
def handle_login(params):
if params == None:
return None
# Protobuf map 'params' is a map which is not a python object or a dictionary. Convert it.
nice = {}
for p in params:
nice[p] = json.loads(params[p])
stdoutln("Authenticated as", nice.get('user'))
tn_globals.AuthToken = nice.get('token')
return nice
# Save cookie to file after successful login.
def save_cookie(params):
if params == None:
return
try:
cookie = open('.tn-cli-cookie', 'w')
json.dump(handle_login(params), cookie)
cookie.close()
except Exception as err:
stdoutln("Failed to save authentication cookie", err)
# Read cookie file for logging in with the cookie.
def read_cookie():
try:
cookie = open('.tn-cli-cookie', 'r')
params = json.load(cookie)
cookie.close()
return params.get("token")
except Exception as err:
printerr("Missing or invalid cookie file '.tn-cli-cookie'", err)
return None
def pop_from_output_queue():
if tn_globals.OutputQueue.empty():
return False
sys.stdout.write("\r<= "+tn_globals.OutputQueue.get())
sys.stdout.flush()
return True
# Generator of protobuf messages.
def gen_message(scheme, secret, args):
"""Client message generator: reads user input as string,
converts to pb.ClientMsg, and yields"""
import random
import threading
from input_handler import stdin
from commands import hiMsg, loginMsg, serialize_cmd
random.seed()
id = random.randint(10000,60000)
# Asynchronous input-output
tn_globals.InputThread = threading.Thread(target=stdin, args=(tn_globals.InputQueue,))
tn_globals.InputThread.daemon = True
tn_globals.InputThread.start()
try:
from importlib.metadata import version
except ImportError:
from importlib_metadata import version
import platform
APP_NAME = "tn-cli"
APP_VERSION = "3.0.1"
LIB_VERSION = version("tinode_grpc")
GRPC_VERSION = version("grpcio")
user_agent = APP_NAME + "/" + APP_VERSION + " (" + \
platform.system() + "/" + platform.release() + "); gRPC-python/" + LIB_VERSION + "+" + GRPC_VERSION
msg = hiMsg(id, args.background, user_agent, LIB_VERSION)
if tn_globals.Verbose:
stdoutln("\r=> " + to_json(msg))
yield msg
if scheme != None:
id += 1
login = lambda:None
setattr(login, 'scheme', scheme)
setattr(login, 'secret', secret)
setattr(login, 'cred', None)
msg = loginMsg(id, login, args)
if tn_globals.Verbose:
stdoutln("\r=> " + to_json(msg))
yield msg
print_prompt = True
while True:
try:
if not tn_globals.WaitingFor and tn_globals.InputQueue:
id += 1
inp = tn_globals.InputQueue.popleft()
if inp == 'exit' or inp == 'quit' or inp == '.exit' or inp == '.quit':
# Drain the output queue.
while pop_from_output_queue():
pass
return
pbMsg, cmd = serialize_cmd(inp, id, args)
print_prompt = tn_globals.IsInteractive
if isinstance(cmd, list):
# Push the expanded macro back on the command queue.
tn_globals.InputQueue.extendleft(reversed(cmd))
continue
if pbMsg != None:
if not tn_globals.IsInteractive:
sys.stdout.write("=> " + inp + "\n")
sys.stdout.flush()
if cmd.synchronous:
cmd.await_ts = time.time()
cmd.await_id = str(id)
tn_globals.WaitingFor = cmd
if not hasattr(cmd, 'no_yield'):
if tn_globals.Verbose:
stdoutln("\r=> " + to_json(pbMsg))
yield pbMsg
elif not tn_globals.OutputQueue.empty():
pop_from_output_queue()
print_prompt = tn_globals.IsInteractive
else:
if print_prompt:
sys.stdout.write("tn> ")
sys.stdout.flush()
print_prompt = False
if tn_globals.WaitingFor:
if time.time() - tn_globals.WaitingFor.await_ts > AWAIT_TIMEOUT:
stdoutln("Timeout while waiting for '{0}' response".format(tn_globals.WaitingFor.cmd))
tn_globals.WaitingFor = None
if tn_globals.IsInteractive:
time.sleep(0.1)
else:
time.sleep(0.01)
except Exception as err:
stdoutln("Exception in generator: {0}".format(err))
# The main processing loop: send messages to server, receive responses.
def run(args, schema, secret):
failed = False
try:
from prompt_toolkit import PromptSession
if tn_globals.IsInteractive:
tn_globals.Prompt = PromptSession()
# Create channel with default credentials.
tn_globals.Connection = None
if args.ssl:
opts = (('grpc.ssl_target_name_override', args.ssl_host),) if args.ssl_host else None
tn_globals.Connection = grpc.secure_channel(args.host, grpc.ssl_channel_credentials(), opts)
else:
tn_globals.Connection = grpc.insecure_channel(args.host)
# Call the server
stream = pbx.NodeStub(tn_globals.Connection).MessageLoop(gen_message(schema, secret, args))
# Read server responses
for msg in stream:
if tn_globals.Verbose:
stdoutln("\r<= " + to_json(msg))
if msg.HasField("ctrl"):
handle_ctrl(msg.ctrl)
elif msg.HasField("meta"):
what = []
if len(msg.meta.sub) > 0:
what.append("sub")
if msg.meta.HasField("desc"):
what.append("desc")
if msg.meta.HasField("del"):
what.append("del")
if len(msg.meta.tags) > 0:
what.append("tags")
stdoutln("\r<= meta " + ",".join(what) + " " + msg.meta.topic)
if tn_globals.WaitingFor and tn_globals.WaitingFor.await_id == msg.meta.id:
if 'varname' in tn_globals.WaitingFor:
tn_globals.Variables[tn_globals.WaitingFor.varname] = msg.meta
tn_globals.WaitingFor = None
elif msg.HasField("data"):
stdoutln("\n\rFrom: " + msg.data.from_user_id)
stdoutln("Topic: " + msg.data.topic)
stdoutln("Seq: " + str(msg.data.seq_id))
if msg.data.head:
stdoutln("Headers:")
for key in msg.data.head:
stdoutln("\t" + key + ": "+str(msg.data.head[key]))
stdoutln(json.loads(msg.data.content))
elif msg.HasField("pres"):
# 'ON', 'OFF', 'UA', 'UPD', 'GONE', 'ACS', 'TERM', 'MSG', 'READ', 'RECV', 'DEL', 'TAGS', 'AUX'
what = pb.ServerPres.What.Name(msg.pres.what)
stdoutln("\r<= pres " + what + " " + msg.pres.topic)
elif msg.HasField("info"):
switcher = {
pb.READ: 'READ',
pb.RECV: 'RECV',
pb.KP: 'KP',
pb.CALL: 'CALL'
}
stdoutln("\rMessage #" + str(msg.info.seq_id) + " " + switcher.get(msg.info.what, "unknown") +
" by " + msg.info.from_user_id + "; topic=" + msg.info.topic + " (" + msg.topic + ")")
else:
stdoutln("\rMessage type not handled" + str(msg))
except grpc.RpcError as err:
# print(err)
printerr("gRPC failed with {0}: {1}".format(err.code(), err.details()))
failed = True
except Exception as ex:
printerr("Request failed: {0}".format(ex))
failed = True
finally:
from tn_globals import printout
printout('Shutting down...')
tn_globals.Connection.close()
if tn_globals.InputThread != None:
tn_globals.InputThread.join(0.3)
return 1 if failed else 0