-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoverlay_controller.py
More file actions
55 lines (48 loc) · 1.61 KB
/
overlay_controller.py
File metadata and controls
55 lines (48 loc) · 1.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#!/usr/bin/env python3
import json
import os
import subprocess
class RecordingOverlay:
"""Launches overlay.py as a subprocess and streams UI updates over stdin."""
def __init__(self, base_dir: str):
self.script = os.path.join(base_dir, "overlay.py")
self._proc = None
def show(self):
if self._proc and self._proc.poll() is None:
return
self._proc = subprocess.Popen(
["/usr/bin/python3", self.script],
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
text=True,
bufsize=1,
)
print("[overlay] shown", flush=True)
def update(self, phase, text=""):
if not self._proc or self._proc.poll() is not None or self._proc.stdin is None:
return
try:
payload = json.dumps({"phase": phase, "text": text or ""})
self._proc.stdin.write(payload + "\n")
self._proc.stdin.flush()
except Exception:
pass
def hide(self):
if self._proc and self._proc.poll() is None:
try:
if self._proc.stdin:
self._proc.stdin.close()
except Exception:
pass
self._proc.terminate()
try:
self._proc.wait(timeout=2.0)
except Exception:
try:
self._proc.kill()
self._proc.wait(timeout=1.0)
except Exception:
pass
self._proc = None
print("[overlay] hidden", flush=True)