|
1 | | -"""Skill that extracts chunk content from Documents and writes it to a JSON file. |
| 1 | +"""Writes chunk content to a JSON file with optional per-document change detection. |
2 | 2 |
|
3 | | -Use this skill at any point in a pipeline to capture intermediate state, |
4 | | -e.g. after a splitter, so the output can be checksummed for change detection |
5 | | -without running expensive downstream skills like embedding and indexing. |
6 | | -
|
7 | | -Only the chunk text content is written as a sorted JSON array of strings — |
8 | | -volatile metadata like filenames, document IDs, and timestamps are excluded |
9 | | -so the checksum remains stable when the underlying text hasn't changed. |
| 3 | +Outputs a sorted JSON array of chunk text strings (metadata excluded). |
| 4 | +When ``checksum_path`` is set, per-chunk SHA-256 checksums (keyed by |
| 5 | +``document_id``) gate downstream processing — only changed or new chunks |
| 6 | +are kept; unchanged chunks are stripped from their documents. |
10 | 7 | """ |
11 | 8 |
|
| 9 | +import hashlib |
12 | 10 | import json |
13 | 11 | import os |
14 | 12 | from typing import List, Optional |
|
19 | 17 |
|
20 | 18 |
|
21 | 19 | class JSONWriterSkill(IndexerSkill): |
22 | | - """Extract text content from all chunks and write it as a sorted JSON array. |
23 | | -
|
24 | | - The output is a flat list of strings (one per non-empty chunk), sorted |
25 | | - alphabetically for deterministic checksumming. Documents are passed |
26 | | - through unchanged for downstream skills. |
| 20 | + """Write chunk text as a sorted JSON array with per-chunk change gating. |
27 | 21 |
|
28 | 22 | Config params: |
29 | | - output_path (str): Path to the output JSON file (default: |
30 | | - ``data/pipeline_output.json``). Parent |
31 | | - directories are created automatically. |
| 23 | + output_path (str): Output JSON path (default: ``data/pipeline_output.json``). |
| 24 | + checksum_path (str, optional): JSON file for per-chunk SHA-256 checksums |
| 25 | + keyed by ``document_id``. |
| 26 | + skip_downstream_if_unchanged (bool, optional): Strip unchanged chunks |
| 27 | + so downstream skills skip them (default: true). |
32 | 28 | """ |
33 | 29 |
|
34 | 30 | def __init__(self, skill_config: dict, global_config: Config) -> None: |
35 | 31 | super().__init__(skill_config, global_config) |
36 | 32 | self._output_path = self._config.get("output_path", "data/pipeline_output.json") |
| 33 | + self._checksum_path = self._config.get("checksum_path", None) |
| 34 | + self._skip_if_unchanged = self._config.get("skip_downstream_if_unchanged", True) |
| 35 | + |
| 36 | + def _compute_checksum(self, content_bytes: bytes) -> str: |
| 37 | + return hashlib.sha256(content_bytes).hexdigest() |
| 38 | + |
| 39 | + def _read_stored_checksums(self) -> dict: |
| 40 | + """Return stored {document_id: checksum} map, or empty dict.""" |
| 41 | + if self._checksum_path and os.path.isfile(self._checksum_path): |
| 42 | + try: |
| 43 | + with open(self._checksum_path, "r", encoding="utf-8") as f: |
| 44 | + data = json.load(f) |
| 45 | + if isinstance(data, dict): |
| 46 | + return data |
| 47 | + # Legacy format — cannot migrate, start fresh. |
| 48 | + self.logger.warning( |
| 49 | + "Checksum file contains legacy format — starting fresh." |
| 50 | + ) |
| 51 | + except Exception as e: |
| 52 | + self.logger.warning(f"Failed to read stored checksums: {e}") |
| 53 | + return {} |
| 54 | + |
| 55 | + def _write_checksums(self, checksums: dict) -> None: |
| 56 | + """Save per-document checksums to disk.""" |
| 57 | + if self._checksum_path: |
| 58 | + os.makedirs(os.path.dirname(self._checksum_path) or ".", exist_ok=True) |
| 59 | + with open(self._checksum_path, "w", encoding="utf-8") as f: |
| 60 | + json.dump(checksums, f, indent=2, ensure_ascii=False) |
| 61 | + |
| 62 | + def _compute_chunk_checksum(self, chunk) -> str: |
| 63 | + """SHA-256 checksum of a single chunk's content.""" |
| 64 | + payload = (chunk.content or "").encode("utf-8") |
| 65 | + return self._compute_checksum(payload) |
37 | 66 |
|
38 | 67 | def run(self, input: Optional[List[Document]] = None) -> List[Document]: |
39 | 68 | if not input: |
40 | 69 | self.logger.warning("JSONWriterSkill received no input — nothing to write.") |
41 | 70 | return input or [] |
42 | 71 |
|
43 | | - # Collect only the content from every chunk across all documents |
| 72 | + # Collect chunk content across all documents |
44 | 73 | contents = [] |
45 | 74 | for doc in input: |
46 | 75 | for chunk in doc.chunks: |
47 | 76 | if chunk.content: |
48 | 77 | contents.append(chunk.content) |
49 | 78 |
|
50 | | - # Sort for deterministic output (stable checksums) |
51 | | - contents.sort() |
| 79 | + contents.sort() # deterministic order for stable checksums |
52 | 80 |
|
53 | 81 | os.makedirs(os.path.dirname(self._output_path) or ".", exist_ok=True) |
54 | 82 |
|
55 | | - with open(self._output_path, "w", encoding="utf-8") as f: |
56 | | - json.dump(contents, f, indent=2, ensure_ascii=False) |
| 83 | + json_bytes = json.dumps(contents, indent=2, ensure_ascii=False).encode("utf-8") |
| 84 | + |
| 85 | + with open(self._output_path, "wb") as f: |
| 86 | + f.write(json_bytes) |
57 | 87 |
|
58 | 88 | self.logger.info( |
59 | 89 | "Wrote %d chunk content entries to %s", |
60 | 90 | len(contents), |
61 | 91 | self._output_path, |
62 | 92 | ) |
63 | 93 |
|
64 | | - # Pass-through: downstream skills can still consume the documents |
| 94 | + # ── Per-chunk checksum-based change gate ──────────────── |
| 95 | + # Each chunk is keyed by its document_id (e.g. question hash). |
| 96 | + # Only chunks whose content has changed (or are new) are kept; |
| 97 | + # unchanged chunks are removed so downstream skills skip them. |
| 98 | + if self._checksum_path: |
| 99 | + old_checksums = self._read_stored_checksums() |
| 100 | + new_checksums: dict = {} |
| 101 | + |
| 102 | + changed_count = 0 |
| 103 | + unchanged_count = 0 |
| 104 | + |
| 105 | + for doc in input: |
| 106 | + unchanged_chunks = set() |
| 107 | + |
| 108 | + for chunk in doc.chunks: |
| 109 | + doc_id = chunk.document_id or chunk.chunk_id or "unknown" |
| 110 | + chunk_checksum = self._compute_chunk_checksum(chunk) |
| 111 | + new_checksums[doc_id] = chunk_checksum |
| 112 | + |
| 113 | + old_checksum = old_checksums.get(doc_id) |
| 114 | + |
| 115 | + if old_checksum and chunk_checksum == old_checksum and self._skip_if_unchanged: |
| 116 | + unchanged_chunks.add(chunk) |
| 117 | + unchanged_count += 1 |
| 118 | + self.logger.debug( |
| 119 | + "Chunk %s unchanged — will be stripped.", |
| 120 | + doc_id[:12], |
| 121 | + ) |
| 122 | + else: |
| 123 | + changed_count += 1 |
| 124 | + if old_checksum: |
| 125 | + self.logger.debug( |
| 126 | + "Chunk %s changed (old: %s, new: %s).", |
| 127 | + doc_id[:12], |
| 128 | + old_checksum[:12], |
| 129 | + chunk_checksum[:12], |
| 130 | + ) |
| 131 | + else: |
| 132 | + self.logger.debug("Chunk %s is new.", doc_id[:12]) |
| 133 | + |
| 134 | + # Remove unchanged chunks from this document |
| 135 | + if unchanged_chunks: |
| 136 | + doc.chunks -= unchanged_chunks |
| 137 | + |
| 138 | + self.logger.info( |
| 139 | + "Change detection: %d changed/new, %d unchanged out of %d chunks.", |
| 140 | + changed_count, |
| 141 | + unchanged_count, |
| 142 | + changed_count + unchanged_count, |
| 143 | + ) |
| 144 | + |
| 145 | + self._write_checksums(new_checksums) |
| 146 | + |
65 | 147 | return input |
0 commit comments