-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpolystar.py
More file actions
executable file
·171 lines (135 loc) · 4.78 KB
/
polystar.py
File metadata and controls
executable file
·171 lines (135 loc) · 4.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/bin/env python
"""
@file polystar.py
@brief General control script
@details The polystar.py script supports the following commands :
- polystar enable / disable : control systemd daemon (launch on startup)
- polystar start / kill / restart : manually kill daemon
- polystar run / stop : control pipeline
- polystar set-enemy (blue|red) : sets enemy color
- polystar shoot (on|off) : allows sending shooting orders
"""
import argparse
import sys
import subprocess
import yaml
# Parser
parser = argparse.ArgumentParser()
subparser = parser.add_subparsers(dest='command', help='Command to run : ')
subparser.add_parser('enable', help='Enable daemon')
subparser.add_parser('disable', help='Disable daemon')
subparser.add_parser('start', help='Manually start daemon')
subparser.add_parser('kill', help='Manually kill daemon')
subparser.add_parser('restart', help='Manually restart daemon')
subparser.add_parser('run', help='Run pipeline')
subparser.add_parser('stop', help='Stop pipeline')
subparser.add_parser('status', help='Get pipeline status')
subparser.add_parser('checklist', help='Pre-match checkup')
subparser.add_parser('record', help='Start recording')
set_enemy_parser = subparser.add_parser('set-enemy', help='Set enemy color')
set_enemy_parser.add_argument('color', choices=['blue', 'red'], help='Enemy color')
shoot_parser = subparser.add_parser('shoot')
shoot_parser.add_argument('val', choices=['on', 'off'])
# Commands
sysctl = '/bin/systemctl --user'
def is_daemon_running():
proc = subprocess.run(f'{sysctl} status polystar', shell=True, stdout=subprocess.PIPE)
return 'active (running)' in proc.stdout.decode()
def get_params(node: str):
try:
proc = subprocess.run(f'rosrun dynamic_reconfigure dynparam get {node}', shell=True, stdout=subprocess.PIPE, timeout=10)
except TimeoutError as te:
print(f'Error : Unresponsive after {te.timeout} seconds')
return yaml.load(proc.stdout.decode())
def call_systemd(command: str, service='polystar'):
proc = subprocess.run(f'{sysctl} {command} {service} --no-pager', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if proc.returncode == 0:
print('Success')
else:
print(f'Error :\n{proc.stderr.decode()}')
return proc.stdout
def enable():
print('Enabling daemon')
return call_systemd('enable')
def disable():
print('Disabling daemon')
return call_systemd('disable')
def start():
print('Starting daemon')
return call_systemd('start')
def kill():
print('Killing daemon')
return call_systemd('stop')
def restart():
kill()
start()
def status():
print(call_systemd('status').decode())
def run():
print('Running the pipeline')
print('Unimplemented')
def stop():
"""
Stops the pipeline (running, but idle)
"""
print('Stopping the pipeline')
print('Unimplemented')
def dynreconf(node: str, param: str, value: any):
try:
proc = subprocess.run(f'rosrun dynamic_reconfigure dynparam set {node} {param} {value}', shell=True, stderr=subprocess.PIPE, timeout=10)
except TimeoutError as te:
print(f'Error : Unresponsive after {te.timeout} seconds')
if proc.returncode == 0:
print('Success')
else:
print(f'Error :\n{proc.stderr.decode()}')
return proc
def set_enemy(color):
print(f'Setting color to {color}')
color_int = 0 if color == 'red' else 1
try:
proc = subprocess.run(f'sed -i "/enemy_color/c\\enemy_color: {color_int}" ros_ws/data/param-decision.yaml', shell=True, stderr=subprocess.PIPE, timeout=10)
except TimeoutError as te:
pass
if proc.returncode == 0:
print('Success')
else:
print(f'Error :\n{proc.stderr.decode()}')
dynreconf('decision', 'enemy_color', color_int)
def shoot(val):
print(f'Shooting to {val}')
print('Unimplemented')
def checklist():
running = is_daemon_running()
enemy_color = 'red' if get_params('decision')['enemy_color'] == 0 else 'blue'
print(f'Is daemon running : {running}')
print(f'Enemy color : {enemy_color}')
def main():
args = parser.parse_args()
if args.command == 'enable':
enable()
elif args.command == 'disable':
disable()
elif args.command == 'start':
start()
elif args.command == 'kill':
kill()
elif args.command == 'restart':
restart()
elif args.command == 'status':
status()
elif args.command == 'run':
run()
elif args.command == 'stop':
stop()
elif args.command == 'set-enemy':
set_enemy(args.color)
elif args.command == 'shoot':
shoot(args.val)
elif args.command == 'checklist':
checklist()
else:
print(f'Unsupported command "{args.command}"', file=sys.stderr)
sys.exit(-1)
if __name__ == "__main__":
main()