Skip to content

Commit ec451fd

Browse files
zxqfd555Manul from Pathway
authored andcommitted
leann integration (#10107)
GitOrigin-RevId: 6da4c020ced767ba7bddc968aa7217907bba0210
1 parent 0e3f155 commit ec451fd

6 files changed

Lines changed: 695 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1111
- `pw.io.milvus.write` connector, which writes a Pathway table to a Milvus collection. Row additions are sent as upserts and row deletions are sent as deletes keyed on the configured primary key column. Requires a Pathway Scale license.
1212
- `pathway spawn` now supports the `--addresses` and `--process-id` flags for multi-machine deployments. Pass a comma-separated list of `host:port` addresses for all processes and the index of the local process; Pathway will connect the cluster over TCP without requiring all processes to run on the same machine.
1313
- `pw.xpacks.llm.parsers.AudioParser`, audio transcription parser based on OpenAI Whisper API. Accepts raw audio bytes and returns transcribed text, following the same interface as other Pathway document parsers.
14+
- `pw.io.leann.write` connector for writing Pathway tables to LEANN vector indices. LEANN uses graph-based selective recomputation to achieve 97% storage reduction compared to traditional vector databases.
1415

1516
## [0.30.0] - 2026-03-24
1617

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
# Copyright © 2026 Pathway
2+
3+
from __future__ import annotations
4+
5+
from pathlib import Path
6+
7+
from leann import LeannSearcher
8+
9+
import pathway as pw
10+
11+
12+
class DocumentSchema(pw.Schema):
13+
text: str
14+
15+
16+
class DocumentWithMetadataSchema(pw.Schema):
17+
text: str
18+
title: str
19+
category: str
20+
21+
22+
def test_basic_write(tmp_path: Path):
23+
"""Test basic writing to a LEANN index."""
24+
index_path = tmp_path / "test_basic.leann"
25+
26+
table = pw.debug.table_from_rows(
27+
schema=DocumentSchema,
28+
rows=[
29+
("LEANN is a vector database",),
30+
("Pathway enables streaming",),
31+
("Vector search is powerful",),
32+
],
33+
)
34+
35+
pw.io.leann.write(
36+
table,
37+
index_path=str(index_path),
38+
text_column=table.text,
39+
backend_name="hnsw",
40+
)
41+
42+
pw.run(monitoring_level=pw.MonitoringLevel.NONE)
43+
44+
# Verify the index was created (LEANN creates multiple files with base name)
45+
# Check for the metadata file which is always created
46+
meta_file = Path(str(index_path) + ".meta.json")
47+
assert meta_file.exists(), f"Expected {meta_file} to exist"
48+
49+
searcher = LeannSearcher(str(index_path))
50+
results = searcher.search("vector database", top_k=1)
51+
assert len(results) > 0
52+
53+
54+
def test_write_with_metadata_columns(tmp_path: Path):
55+
"""Test writing with metadata columns."""
56+
index_path = tmp_path / "test_metadata.leann"
57+
58+
table = pw.debug.table_from_rows(
59+
schema=DocumentWithMetadataSchema,
60+
rows=[
61+
("Introduction to LEANN", "LEANN Guide", "tutorial"),
62+
("Advanced vector search", "Search Tips", "advanced"),
63+
],
64+
)
65+
66+
pw.io.leann.write(
67+
table,
68+
index_path=str(index_path),
69+
text_column=table.text,
70+
metadata_columns=[table.title, table.category],
71+
backend_name="hnsw",
72+
)
73+
74+
pw.run(monitoring_level=pw.MonitoringLevel.NONE)
75+
assert Path(str(index_path) + ".meta.json").exists()
76+
77+
78+
def test_write_with_custom_text_column(tmp_path: Path):
79+
"""Test writing with a custom text column name."""
80+
index_path = tmp_path / "test_custom_column.leann"
81+
82+
class ContentSchema(pw.Schema):
83+
content: str
84+
id: int
85+
86+
table = pw.debug.table_from_rows(
87+
schema=ContentSchema,
88+
rows=[
89+
("First document content", 1),
90+
("Second document content", 2),
91+
],
92+
)
93+
94+
pw.io.leann.write(
95+
table,
96+
index_path=str(index_path),
97+
text_column=table.content,
98+
backend_name="hnsw",
99+
)
100+
pw.run(monitoring_level=pw.MonitoringLevel.NONE)
101+
assert Path(str(index_path) + ".meta.json").exists()
102+
103+
104+
def test_write_creates_parent_directories(tmp_path: Path):
105+
"""Test that write() creates parent directories if they don't exist."""
106+
index_path = tmp_path / "nested" / "dir" / "test.leann"
107+
108+
table = pw.debug.table_from_rows(
109+
schema=DocumentSchema,
110+
rows=[("Test document",)],
111+
)
112+
113+
pw.io.leann.write(
114+
table,
115+
index_path=str(index_path),
116+
text_column=table.text,
117+
backend_name="hnsw",
118+
)
119+
120+
pw.run(monitoring_level=pw.MonitoringLevel.NONE)
121+
assert Path(str(index_path) + ".meta.json").exists()
122+
123+
124+
def test_empty_table(tmp_path: Path):
125+
"""Test behavior with an empty table."""
126+
index_path = tmp_path / "test_empty.leann"
127+
128+
table = pw.debug.table_from_rows(
129+
schema=DocumentSchema,
130+
rows=[],
131+
)
132+
133+
pw.io.leann.write(
134+
table,
135+
index_path=str(index_path),
136+
text_column=table.text,
137+
backend_name="hnsw",
138+
)
139+
140+
pw.run(monitoring_level=pw.MonitoringLevel.NONE)
141+
142+
# Index should not be created for empty table
143+
assert not Path(str(index_path) + ".meta.json").exists()
144+
145+
146+
def test_write_with_diskann_backend(tmp_path: Path):
147+
pass
148+
# TODO: uncomment if diskann needs to be tested. Currently we keep it this way, since
149+
# diskann installation requires additional steps in Dockerfile and will lead to the
150+
# increase in the time required to build the base integration tests image.
151+
#
152+
# """Test writing with DiskANN backend."""
153+
# index_path = tmp_path / "test_diskann.leann"
154+
#
155+
# table = pw.debug.table_from_rows(
156+
# schema=DocumentSchema,
157+
# rows=[
158+
# ("Document one for DiskANN test",),
159+
# ("Document two for DiskANN test",),
160+
# ],
161+
# )
162+
#
163+
# pw.io.leann.write(
164+
# table,
165+
# index_path=str(index_path),
166+
# text_column=table.text,
167+
# backend_name="diskann",
168+
# )
169+
#
170+
# pw.run(monitoring_level=pw.MonitoringLevel.NONE)
171+
# assert Path(str(index_path) + ".meta.json").exists()

integration_tests/license/test_license.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
"xpack-llm-mcp",
4747
"worker-count-scaling",
4848
"multiple-machines",
49+
"leann",
4950
]
5051

5152

@@ -125,6 +126,7 @@ def test_license_default_policy(caplog):
125126
_check_entitlements("mongodb-oplog-reader")
126127
_check_entitlements("multiple-machines")
127128
_check_entitlements("mssql")
129+
_check_entitlements("leann")
128130

129131
run_all()
130132
assert "Telemetry enabled" in caplog.text

python/pathway/io/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
jsonlines,
1616
kafka,
1717
kinesis,
18+
leann,
1819
logstash,
1920
milvus,
2021
minio,
@@ -88,4 +89,5 @@
8889
"mssql",
8990
"mysql",
9091
"SynchronizedColumn",
92+
"leann",
9193
]

0 commit comments

Comments
 (0)