-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsend_to_worker.py
More file actions
73 lines (63 loc) · 1.86 KB
/
send_to_worker.py
File metadata and controls
73 lines (63 loc) · 1.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"""
send_to_worker.py
Simulates AIA in Texas
Sends action to California worker
Via hash only
"""
import json
import hashlib
import time
from pathlib import Path
from data_to_fold import encode_to_6base
def send_action(action_payload, uid, seed, watch_dir="incoming"):
folded = encode_to_6base(action_payload)
hash_obj = {
"uid": uid,
"seed": seed,
"input": folded,
"timestamp": time.time()
}
hash_key = hashlib.sha256(
json.dumps(hash_obj, sort_keys=True).encode()
).hexdigest()
transmission = {
"uid": uid,
"hash_key": hash_key,
"seed": seed,
"timestamp": hash_obj["timestamp"]
}
packet = {
"transmission": transmission,
"fold": folded
}
Path(watch_dir).mkdir(exist_ok=True)
ts = int(hash_obj["timestamp"])
out = Path(watch_dir) / f"action_{ts}.json"
with open(out, "w") as f:
json.dump(packet, f, indent=2)
print(f"[✔] Action folded — strand: {folded['strand_length']} bits")
print(f"[✔] Hash: {hash_key[:16]}...")
print(f"[✔] Written to {out}")
print(f"[✔] Nothing else crossed.")
# LHT mode — optional
USE_LHT = True # set False for single hash
if USE_LHT:
from lht import chunk_transmission
chunk_transmission(
hash_key, uid, seed,
output_dir="outgoing/lht"
)
print("[✔] LHT streams written")
print("[✔] Send outgoing/lht/ to receiver")
else:
print(f"[✔] Single hash: {hash_key[:16]}...")
if __name__ == "__main__":
send_action(
action_payload={
"action": "ping",
"from": "AIA — Haskell Texas",
"message": "Worker alive check — March 15 2026"
},
uid="aia_texas_001",
seed="haskell-texas-2026"
)