-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathserver.py
More file actions
188 lines (154 loc) · 6.55 KB
/
server.py
File metadata and controls
188 lines (154 loc) · 6.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: MIT
import threading
import time
import traceback
import zmq
class ZMQServer:
"""
Server for handling ZMQ communication.
This class implements a singleton pattern and provides methods for creating
ZMQ sockets, sending and receiving data in separate threads, and cleaning up
resources when they are no longer needed.
"""
_instance = None
def __new__(cls, *args, **kwargs):
"""Implement singleton pattern for ZMQServer."""
if not cls._instance:
cls._instance = super().__new__(cls, *args, **kwargs)
return cls._instance
def __init__(self):
"""Initialize the ZMQServer with empty collections for sockets and threads."""
# Skip initialization if already initialized (singleton pattern)
if hasattr(self, "push_sockets"):
return
self.push_sockets = {}
self.pull_sockets = {}
self.reciveing_threads = {}
self.sending_threads = {}
# ZMQ context
self._context = None
def context(self) -> zmq.Context:
"""
Returns the ZMQ context instance.
If the context has not been initialized, it creates a new ZMQ context and assigns it to the `_context` attribute.
Returns:
zmq.Context: The ZMQ context instance.
"""
if not self._context:
self._context = zmq.Context()
return self._context
def get_pull_socket(self, port: int) -> zmq.Socket:
"""
Creates and returns a new pull socket that is bound to the specified port.
Args:
port (int): The port number to bind the socket to.
Returns:
zmq.Socket: The newly created pull socket.
"""
addr = f"tcp://*:{port}"
sock = self.context().socket(zmq.PULL)
sock.set_hwm(1) # High water mark: only buffer 1 message
sock.bind(addr)
sock.setsockopt(zmq.RCVTIMEO, 1000) # 1 second timeout for receiving
poller = zmq.Poller()
poller.register(sock, zmq.POLLIN)
self.pull_sockets[port] = sock
return sock
def get_push_socket(self, port: int) -> zmq.Socket:
"""
Creates and returns a ZeroMQ PUSH socket bound to the specified port.
Args:
port (int): The port number to bind the socket to.
Returns:
zmq.Socket: The created PUSH socket.
"""
addr = f"tcp://*:{port}"
sock = self.context().socket(zmq.PUSH)
sock.setsockopt(zmq.SNDTIMEO, 1000) # 1 second timeout for sending
sock.bind(addr)
self.push_sockets[port] = sock
return sock
def subscribe_to_socket_in_loop(self, name: str, port: int, fn: callable) -> None:
"""
Receives messages from a socket in a loop and calls a given function for each message.
This method creates a new thread that continuously receives messages from the specified
port and passes them to the provided callback function.
Args:
name (str): The name of the receiving thread.
port (int): The port number to receive messages from.
fn (callable): A callable function that takes a message as input.
"""
# Create socket for receiving
sock = self.get_pull_socket(port)
stop_event = threading.Event()
def loop():
"""Thread function that continuously receives messages."""
while not stop_event.is_set():
try:
msg = sock.recv()
fn(msg)
except zmq.Again:
continue
except:
print("[isaac-zmq-server] Unable to unpack from socket...")
print(traceback.format_exc())
continue
# Clean up when thread is finsihed
sock.close()
del self.pull_sockets[port]
# Start the thread
worker = threading.Thread(target=loop)
self.reciveing_threads[name] = (worker, stop_event)
worker.start()
def publish_protobuf_in_loop(self, name: str, port: int, rate_hz: float, fn: callable) -> None:
"""
Sends protobuf messages from a socket in a loop at a specified rate.
This method creates a new thread that continuously sends protobuf messages at the specified
rate to the specified port. The protobuf message to send is obtained by calling the provided
callback function.
Args:
name (str): The name of the sending thread.
port (int): The port number to send data to.
rate_hz (float): The rate at which data is sent in Hz.
fn (callable): A callable function that returns a protobuf message.
"""
# Create socket for sending
sock = self.get_push_socket(port)
stop_event = threading.Event()
def loop():
"""Thread function that continuously sends protobuf messages at the specified rate."""
while not stop_event.is_set():
try:
# Get the protobuf message from the callback function
proto_msg = fn()
# Serialize the protobuf message and send it
sock.send(proto_msg.SerializeToString())
except zmq.Again:
continue
except Exception as e:
print(f"[isaac-zmq-server] Unable to send protobuf to socket: {e}")
continue
# Sleep to maintain the desired rate
time.sleep(1 / rate_hz)
# Clean up when thread is finished
sock.close()
del self.push_sockets[port]
# Start the sending thread
worker = threading.Thread(target=loop)
self.sending_threads[name] = (worker, stop_event)
worker.start()
def cleanup(self) -> None:
"""
Stops and joins all receiving and sending threads.
This function is used to clean up the threads when they are no longer needed.
It sets the stop event for each thread and then joins them to ensure they have finished.
"""
# Stop and join all receiving threads
for name, (worker, stop_event) in self.reciveing_threads.items():
stop_event.set()
worker.join()
# Stop and join all sending threads
for name, (worker, stop_event) in self.sending_threads.items():
stop_event.set()
worker.join()