-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmonitor.py
More file actions
372 lines (323 loc) · 14.3 KB
/
monitor.py
File metadata and controls
372 lines (323 loc) · 14.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
import time
from pathlib import Path
from extensions import COMMON_EXTENSIONS, BLACKLIST_EXTENSIONS
from scanner import load_glossar, analyze_file, encode_sequence, sequence_to_fingerprint
from db_utils import save_snapshot, init_db, open_db
from watchdog_utils import start_file_hybrid, start_folder_hybrid
# ---------------------------------------------------
# Sichere Analyse
# ---------------------------------------------------
from hashlib import md5
def safe_file_analysis(filepath, glossar, dna=True, max_retries=3):
abs_path = str(Path(filepath).resolve())
for attempt in range(max_retries):
try:
if not Path(abs_path).exists():
raise FileNotFoundError(f"Datei nicht gefunden: {abs_path}")
code = Path(abs_path).read_text(encoding="utf-8")
if not code.strip():
seq, dna_text, fingerprint = [0], "0", "00000000"
else:
try:
seq = analyze_file(abs_path, glossar, dna=dna)
dna_text = encode_sequence(seq)
fingerprint = sequence_to_fingerprint(seq)
except Exception as parse_err:
print(f"[SAFE_ANALYSIS] Parser-Fehler → Fallback genutzt: {parse_err}")
seq = [ord(c) % 256 for c in code[:1000]]
dna_text = ""
fingerprint = md5(code.encode("utf-8")).hexdigest()
return seq, dna_text, code, fingerprint
except Exception as e:
if attempt == max_retries - 1:
code = Path(abs_path).read_text(encoding="utf-8") if Path(abs_path).exists() else ""
fingerprint = md5(code.encode("utf-8")).hexdigest() if code else "00000000"
return [0], "", code, fingerprint
time.sleep(0.5)
# ---------------------------------------------------
# Datei-Monitor
# ---------------------------------------------------
def monitor_file(filepath, interval=60, dna=True, db_path="conum.db", exts=None, queue=None):
print(f"[MONITOR-START] File={filepath} | Interval={interval}s | DB={db_path}", flush=True)
# Extension-Check hinzufügen
exts = exts or COMMON_EXTENSIONS
if not any(str(filepath).endswith(ext) for ext in exts):
print(f"[MONITOR-FILE][WARN] Datei {filepath} wird nicht überwacht (nicht in {exts})")
return None
init_db(db_path)
glossar = load_glossar()
abs_path = Path(filepath).resolve()
abs_db_path = str(Path(db_path).resolve()) # Absoluter DB-Pfad für Queue
# Initial-Snapshot
try:
seq, dna_text, code, fingerprint = safe_file_analysis(abs_path, glossar, dna=dna)
with open_db(db_path) as conn:
sid, is_new, _ = save_snapshot(
conn,
str(abs_path),
fingerprint,
dna_text,
code,
source="initial"
)
print(f"[FILE_MONITOR] Initial snapshot {sid} created")
if queue:
queue.put({
"type": "snapshot",
"file": str(abs_path),
"db": abs_db_path, # Absoluter Pfad
"event": "initial"
})
except Exception as e:
print(f"[FILE_MONITOR] Could not create initial snapshot: {e}")
if queue:
queue.put({
"type": "log",
"msg": f"Initial snapshot failed: {e}"
})
# Starte Hybrid (Watchdog + Polling)
observer = start_file_hybrid(
db_path, # String-Pfad übergeben
glossar,
abs_path,
interval=interval,
queue=queue,
exts=exts
)
return observer
# ---------------------------------------------------
# Ordner-Monitor
# ---------------------------------------------------
def monitor_folder(folderpath, interval=60, dna=True, db_path="conum.db", exts=None, queue=None):
print(f"[MONITOR-START] Folder={folderpath} | Interval={interval}s | DB={db_path} | Exts={exts}", flush=True)
init_db(db_path)
glossar = load_glossar()
abs_folder = Path(folderpath).resolve()
abs_db_path = str(Path(db_path).resolve()) # Absoluter DB-Pfad für Queue
# Initial-Snapshots
try:
for ext in (exts or [".py"]):
for filepath in abs_folder.rglob(f"*{ext}"):
# Blacklist-Check für DB-Dateien
if (str(filepath).endswith('.db') or
str(filepath).endswith('.db-wal') or
str(filepath).endswith('.db-shm')):
continue
_, dna_text, code, fingerprint = safe_file_analysis(filepath, glossar, dna=dna)
with open_db(db_path) as conn:
sid, is_new, _ = save_snapshot(conn, str(filepath), fingerprint, dna_text, code, source="initial")
if is_new:
print(f"[FOLDER_MONITOR] Initial snapshot for: {filepath.name} (ID {sid})")
if queue:
queue.put({
"type": "snapshot",
"file": str(filepath),
"db": abs_db_path, # Absoluter Pfad
"event": "initial"
})
except Exception as e:
print(f"[FOLDER_MONITOR] Initial snapshot phase failed: {e}")
if queue:
queue.put({"type": "log", "msg": f"Folder initial snapshot failed: {e}"})
# Starte Hybrid (Watchdog + Polling)
observer = start_folder_hybrid(
open_db(db_path), # DB-Connection-Objekt übergeben
glossar,
abs_folder, # Diese Variable ist bereits in Zeile 86 definiert: abs_folder = Path(folderpath).resolve()
interval=interval,
db_path=db_path,
queue=queue,
exts=exts or COMMON_EXTENSIONS
)
return observer
# ---------------------------------------------------
# Realtime-Monitor (für direkte Tests, optional)
# ---------------------------------------------------
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import threading
class RealtimeHandler(FileSystemEventHandler):
def __init__(self, db_path, glossar, dna=True, exts=None, watch_folder=None, queue=None):
init_db(db_path)
self.db_path = db_path
self.abs_db_path = str(Path(db_path).resolve()) # Absoluter DB-Pfad für Queue
self.glossar = glossar
self.dna = dna
self.exts = exts or [".py"]
self.error_counts = {}
self.processing = set()
self.watch_folder = Path(watch_folder).resolve() if watch_folder else None
self.queue = queue
if self.watch_folder and self.watch_folder.is_dir():
print(f"[REALTIME] Initial snapshot phase for folder: {self.watch_folder}")
for ext in self.exts:
for filepath in self.watch_folder.rglob(f"*{ext}"):
self.create_initial_snapshot(filepath)
def should_process_file(self, filepath):
if not any(str(filepath).endswith(ext) for ext in self.exts):
return False
if any(str(filepath).endswith(ext) for ext in BLACKLIST_EXTENSIONS):
return False
if str(filepath) in self.processing:
return False
if self.error_counts.get(str(filepath), 0) >= 10:
return False
return True
def create_initial_snapshot(self, filepath):
abs_path = str(Path(filepath).resolve())
try:
seq, dna_text, code, fingerprint = safe_file_analysis(abs_path, self.glossar, dna=self.dna)
with open_db(self.db_path) as conn:
sid, is_new, _ = save_snapshot(conn, abs_path, fingerprint, dna_text, code, source="initial")
if is_new:
print(f"[REALTIME] Initial snapshot for: {Path(abs_path).name} (ID {sid})")
if self.queue:
self.queue.put({
"type": "snapshot",
"file": abs_path,
"db": self.abs_db_path, # Absoluter Pfad
"event": "initial"
})
except Exception as e:
print(f"[REALTIME] Could not create initial snapshot for {Path(abs_path).name}: {e}")
if self.queue:
self.queue.put({"type": "log", "msg": f"Realtime initial failed: {e}"})
def process_file_safe(self, filepath):
print(f"[DEBUG] process_file_safe called for: {filepath}")
abs_path = str(Path(filepath).resolve())
if not self.should_process_file(abs_path):
return
self.processing.add(abs_path)
try:
import time
t0 = time.time()
# kurze Pause, falls Datei noch geschrieben wird
time.sleep(0.1)
# --- Analyse ---
seq, dna_text, code, fingerprint = safe_file_analysis(abs_path, self.glossar, dna=self.dna)
t1 = time.time()
print(f"[PROFILE] safe_file_analysis took {t1 - t0:.3f}s for: {Path(abs_path).name}")
# --- Snapshot speichern ---
with open_db(self.db_path) as conn:
sid, is_new, _ = save_snapshot(conn, abs_path, fingerprint, dna_text, code, source="realtime")
t2 = time.time()
print(f"[PROFILE] save_snapshot took {t2 - t1:.3f}s → total {t2 - t0:.3f}s for: {Path(abs_path).name}")
print(f"[REALTIME] {Path(abs_path).name} → Snapshot {sid}")
# --- Queue-Event senden ---
if self.queue:
self.queue.put({
"type": "snapshot",
"file": abs_path,
"db": self.abs_db_path, # Absoluter Pfad
"event": "realtime"
})
except Exception as e:
self.error_counts[abs_path] = self.error_counts.get(abs_path, 0) + 1
if self.error_counts[abs_path] <= 3:
print(f"[REALTIME] {Path(abs_path).name}: {e}")
# --- Fehler auch ins Queue-Log ---
if self.queue:
self.queue.put({
"type": "log",
"msg": f"Realtime error for {abs_path}: {e}"
})
finally:
self.processing.discard(abs_path)
def on_modified(self, event):
print(f"[DEBUG] on_modified triggered for: {event.src_path}") # ✅ Sichtbar machen
if not event.is_directory:
threading.Thread(
target=self.process_file_safe,
args=(event.src_path,),
daemon=True
).start()
def start_realtime_monitor(path, db_path, glossar, dna=True, exts=None, queue=None):
event_handler = RealtimeHandler(db_path, glossar, dna=dna, exts=exts, queue=queue)
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
print(f"[REALTIME] Monitoring started: {path}")
return observer
# ---------------------------------------------------
# CLI
# ---------------------------------------------------
if __name__ == "__main__":
import sys
import argparse
from watchdog.observers import Observer
from monitor import RealtimeHandler # Stelle sicher, dass das korrekt importiert ist
def debug(msg):
print(f"[MONITOR] {msg}", file=sys.stdout, flush=True)
def debug_err(msg):
print(f"[MONITOR ERROR] {msg}", file=sys.stderr, flush=True)
parser = argparse.ArgumentParser(description="CoNum – Monitoring (Realtime Test)")
parser.add_argument("path", help="Pfad zu Datei oder Ordner")
parser.add_argument("--interval", type=int, default=20, help="Scan-Intervall in Sekunden (min: 1)")
parser.add_argument("--folder", action="store_true", help="Ordner statt einzelne Datei überwachen")
parser.add_argument("--db", default="conum.db", help="Pfad zur SQLite-Datenbank")
parser.add_argument("--ext", nargs="+", default=COMMON_EXTENSIONS, help="Liste erlaubter Dateiendungen")
parser.add_argument("--realtime", action="store_true", help="Nutze direkten RealtimeHandler für Tests") # NEU
args = parser.parse_args()
if args.interval < 1:
debug_err("Interval muss mindestens 1 Sekunde sein")
sys.exit(1)
target_path = Path(args.path)
if not target_path.exists():
debug_err(f"Pfad existiert nicht: {args.path}")
sys.exit(1)
debug("CoNum Monitor gestartet")
debug(f"Target: {args.path}")
debug(f"Mode: {'Folder' if args.folder else 'File'}")
debug(f"Interval: {args.interval}s")
debug(f"Extensions: {args.ext}")
debug(f"Database: {args.db}")
debug(f"Realtime Test Mode: {'Ja' if args.realtime else 'Nein'}")
try:
init_db(args.db)
glossar = load_glossar()
abs_path = target_path.resolve()
observer = None
# ✅ --- FALL 1: RealtimeHandler direkt für Tests ---
if args.realtime and args.folder:
debug("[REALTIME-MODE] Direkter Start von RealtimeHandler für Ordner")
event_handler = RealtimeHandler(
db_path=args.db,
glossar=glossar,
dna=True,
exts=args.ext,
watch_folder=str(abs_path),
queue=None # CLI braucht keine Queue
)
observer = Observer()
observer.schedule(event_handler, str(abs_path), recursive=True)
observer.start()
# ✅ --- FALL 2: Normaler Hybrid-Monitor-Modus ---
elif args.folder:
observer = monitor_folder(
str(abs_path),
interval=args.interval,
db_path=args.db,
exts=args.ext,
queue=None # CLI braucht keine Queue
)
else:
observer = monitor_file(
str(abs_path),
interval=args.interval,
db_path=args.db,
queue=None # CLI braucht keine Queue
)
if observer:
try:
observer.join()
except KeyboardInterrupt:
observer.stop()
observer.join()
debug("Monitor gestoppt (Ctrl+C)")
except KeyboardInterrupt:
debug("Monitor gestoppt (Ctrl+C)")
except Exception as e:
debug_err(f"Kritischer Fehler: {e}")
import traceback
traceback.print_exc(file=sys.stderr)
sys.exit(1)