Skip to content

Commit f9150d3

Browse files
zxqfd555Manul from Pathway
authored andcommitted
support running pipelines on multiple machines (#10076)
GitOrigin-RevId: a09118be92790cde34aa793a3c567b42d0f86d28
1 parent 7ba8cdc commit f9150d3

13 files changed

Lines changed: 597 additions & 36 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
77

88
### Added
99
- `pw.io.milvus.write` connector, which writes a Pathway table to a Milvus collection. Row additions are sent as upserts and row deletions are sent as deletes keyed on the configured primary key column. Requires a Pathway Scale license.
10+
- `pathway spawn` now supports the `--addresses` and `--process-id` flags for multi-machine deployments. Pass a comma-separated list of `host:port` addresses for all processes and the index of the local process; Pathway will connect the cluster over TCP without requiring all processes to run on the same machine.
1011

1112
## [0.30.0] - 2026-03-24
1213

docs/2.developers/4.user-guide/80.advanced/60.worker_count_scaling.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ In any case, you can't have less than one worker. Therefore, even if the pipelin
4545

4646
The scaling process scales only by increasing or decreasing the number of **processes**. Threads are **not used for dynamic scaling** in this mechanism. This way, if your initial configuration uses thread workers or uses both, threads and processes, the scaling will only change the process number. For example, if you launched the computation with one process, containing two workers, the upscaling will lead to two processes, having two workers each. On the other hand, downscaling from the initial configuration in this case won't be possible, since the number of processes is already equal to one.
4747

48+
### Fixed Address Pool
49+
50+
Dynamic scaling is not available when the worker pool is defined via the `--addresses` flag. In that mode, the set of processes is fixed for the entire run: Pathway cannot add or remove machines at runtime. If your pipeline is launched with `--addresses`, scaling signals from workers are ignored and a warning is emitted to the logs. To use dynamic scaling, let Pathway manage the processes itself by using `--processes` instead.
51+
4852
### License Limitations
4953

5054
You need a Pathway License in order for the scaling to work. You can obtain your free Pathway Scale license [here](/get-license). The page contains instructions for getting the license and using it in the pipeline.
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
---
2+
title: 'Running on Multiple Machines'
3+
description: 'This page describes how to distribute a Pathway pipeline across several machines'
4+
---
5+
6+
# Running on Multiple Machines
7+
8+
Pathway pipelines can be distributed across multiple machines. Each machine runs a process, and together they form a single logical computation. Workers on different machines communicate over TCP, exchanging data and progress information the same way co-located processes do.
9+
10+
This is useful when:
11+
- The dataset or working state does not fit in the memory of a single machine.
12+
- The computation is CPU-bound enough to saturate all cores on one host.
13+
- You want to co-locate workers with partitioned data sources (e.g., Kafka brokers) to reduce network transfer.
14+
15+
## How It Works
16+
17+
Every Pathway worker — regardless of which machine it runs on — executes the same dataflow on a different shard of the data. The workers discover each other through a fixed list of `host:port` addresses provided at startup. All processes must be started before any of them begins processing data: the pipeline waits until the full cluster is assembled. While the pipeline waits for all of its workers, you will see a "Preparing Pathway computation" log message.
18+
19+
This is different from the default single-machine multi-process mode (`pathway spawn -n N`), where Pathway automatically assigns ports on `127.0.0.1` and launches all processes itself. In the multi-machine mode, you are responsible for starting one process per machine and telling each process where all the others are.
20+
21+
## Setting Up
22+
23+
### 1. Decide on addresses
24+
25+
Choose a `host:port` pair for each process. The port must be reachable from all other machines in the cluster. For example, with two machines:
26+
27+
| Process | Address |
28+
|---------|---------------------|
29+
| 0 | `192.168.1.10:9000` |
30+
| 1 | `192.168.1.11:9000` |
31+
32+
### 2. Start the process on each machine
33+
34+
On **machine 0**:
35+
36+
```bash
37+
pathway spawn \
38+
--addresses 192.168.1.10:9000,192.168.1.11:9000 \
39+
--process-id 0 \
40+
python pipeline.py
41+
```
42+
43+
On **machine 1**:
44+
45+
```bash
46+
pathway spawn \
47+
--addresses 192.168.1.10:9000,192.168.1.11:9000 \
48+
--process-id 1 \
49+
python pipeline.py
50+
```
51+
52+
Both commands receive the same `--addresses` list. The `--process-id` flag tells each machine which entry in that list belongs to it — process 0 binds to `192.168.1.10:9000`, process 1 binds to `192.168.1.11:9000`.
53+
54+
The two commands can be started in any order. The process that starts first will wait for the others to connect before beginning computation.
55+
56+
Note that a single machine can host more than one process. In that case, use the same host with different ports for each process on that machine:
57+
58+
```bash
59+
pathway spawn \
60+
--addresses 192.168.1.10:9000,192.168.1.10:9001,192.168.1.11:9000 \
61+
--process-id 0 \
62+
python pipeline.py
63+
```
64+
65+
```bash
66+
pathway spawn \
67+
--addresses 192.168.1.10:9000,192.168.1.10:9001,192.168.1.11:9000 \
68+
--process-id 1 \
69+
python pipeline.py
70+
```
71+
72+
```bash
73+
pathway spawn \
74+
--addresses 192.168.1.10:9000,192.168.1.10:9001,192.168.1.11:9000 \
75+
--process-id 2 \
76+
python pipeline.py
77+
```
78+
79+
Here processes 0 and 1 both run on `192.168.1.10`, listening on ports `9000` and `9001` respectively, while process 2 runs on `192.168.1.11`.
80+
81+
Please keep in mind that due to how the communication internally works, **the list of workers must have them in the same order in all of the launched commands**. Only the **`--process-id`** parameter must be varied, taking all values from **`0`** through **the length of the list minus one**.
82+
83+
### 3. Use threads for intra-machine parallelism
84+
85+
The `--threads` flag works independently of `--addresses`. To run two threads per machine with the two-machine setup above, add `--threads 2` to both commands. This gives four total workers: two on each machine.
86+
87+
```bash
88+
pathway spawn \
89+
--addresses 192.168.1.10:9000,192.168.1.11:9000 \
90+
--process-id 0 \
91+
--threads 2 \
92+
python pipeline.py
93+
```
94+
95+
### 4. Add persistence (recommended)
96+
97+
When running across machines, data persistence is strongly recommended. If any process crashes, the whole cluster must be restarted. Persistence ensures the pipeline resumes from the last checkpoint rather than replaying from the beginning:
98+
99+
```python
100+
persistence_config = pw.persistence.Config(
101+
backend=pw.persistence.Backend.s3(
102+
bucket_name="my-bucket",
103+
root_path="pathway-state/",
104+
),
105+
)
106+
107+
pw.run(persistence_config=persistence_config)
108+
```
109+
110+
It is important to use a shared storage (S3, GCS, Azure Blob, NFS) so that all machines can read and write the same state.
111+
112+
## License
113+
114+
Running Pathway on multiple machines requires a Pathway Scale or Pathway Enterprise license. You can obtain a free Pathway Scale license [here](/get-license). The page contains instructions for getting the license and using it in your pipeline.
115+
116+
## Limitations
117+
118+
**No dynamic scaling.** The `--addresses` flag defines a fixed worker pool. Pathway's autoscaling mechanism (described in [Dynamic Worker Scaling](/developers/user-guide/advanced/worker-count-scaling/)) is not available when a fixed address list is used. The number of processes is determined by the length of the `--addresses` list and cannot change at runtime.
119+
120+
**All processes must start for the pipeline to begin.** If one machine fails to start or takes too long, the others will wait indefinitely. There is no partial startup or degraded mode.
121+
122+
**At-least-once delivery.** As with all Pathway deployments, recovery after a crash replays data from the last committed checkpoint. Records written after the last checkpoint but before the crash may be processed again. Exactly-once semantics are available in the enterprise edition.
123+
124+
**Same binary on all machines.** All machines must run the same version of Pathway and the same pipeline code. Mismatched versions will cause a connection failure or undefined behavior.
125+
126+
**Firewall and networking.** Each machine must be able to reach all others on the specified ports. Pathway does not support NAT traversal or proxies between workers.
127+
128+
## Conclusion
129+
130+
To run a Pathway pipeline across multiple machines:
131+
132+
1. **Choose one `host:port` per process** and ensure the ports are mutually reachable.
133+
2. **Start each process independently** using `pathway spawn --addresses <list> --process-id <N>`.
134+
3. **Use shared persistent storage** to enable fast recovery after restarts.
135+
4. **Do not mix `--addresses` with `--processes`** — the process count is derived from the address list.
136+
137+
If you have any questions, feel free to reach out on [Discord](http://discord.com/invite/pathway) or open an issue on our [GitHub](https://github.com/pathwaycom/pathway/issues/).
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import sys
2+
3+
import pathway as pw
4+
5+
input_path = sys.argv[1]
6+
output_path = sys.argv[2]
7+
8+
t = pw.io.plaintext.read(input_path, mode="static")
9+
pw.io.jsonlines.write(t, output_path)
10+
pw.run()

integration_tests/common/test_cli.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
REPOSITORY_URL = "https://github.com/pathway-labs/airbyte-to-deltalake"
1111
TRACKED_REPOSITORY_URL = "https://github.com/pathwaycom/pathway/"
12+
IDENTITY_PROGRAM = os.path.join(os.path.dirname(__file__), "identity.py")
1213

1314

1415
def count_commits_in_pathway_repository(tmp_path):
@@ -42,3 +43,104 @@ def test_repository_url_feature(tmp_path):
4243
expected_n_commits = count_commits_in_pathway_repository(tmp_path)
4344

4445
assert actual_n_commits == expected_n_commits
46+
47+
48+
def invoke_spawn(runner, args):
49+
return runner.invoke(
50+
cli.spawn, args + ["python", IDENTITY_PROGRAM, "input.txt", "output.jsonl"]
51+
)
52+
53+
54+
def test_processes_and_addresses_are_mutually_exclusive(runner):
55+
result = invoke_spawn(
56+
runner, ["--processes", "2", "--addresses", "host0:10000,host1:10000"]
57+
)
58+
assert result.exit_code != 0
59+
assert "--processes and --addresses are mutually exclusive" in result.output
60+
61+
62+
def test_process_id_requires_addresses(runner):
63+
result = invoke_spawn(runner, ["--process-id", "1"])
64+
assert result.exit_code != 0
65+
assert "--process-id requires --addresses" in result.output
66+
67+
68+
def test_addresses_requires_process_id(runner):
69+
result = invoke_spawn(runner, ["--addresses", "host0:10000,host1:10000"])
70+
assert result.exit_code != 0
71+
assert "--process-id is required when --addresses is set" in result.output
72+
73+
74+
def test_address_invalid_format_no_colon(runner):
75+
result = invoke_spawn(runner, ["--addresses", "host010000", "--process-id", "0"])
76+
assert result.exit_code != 0
77+
assert "expected host:port format" in result.output
78+
79+
80+
def test_address_invalid_format_non_numeric_port(runner):
81+
result = invoke_spawn(runner, ["--addresses", "host0:abc", "--process-id", "0"])
82+
assert result.exit_code != 0
83+
assert "expected host:port format" in result.output
84+
85+
86+
def test_address_port_zero(runner):
87+
result = invoke_spawn(runner, ["--addresses", "host0:0", "--process-id", "0"])
88+
assert result.exit_code != 0
89+
assert "must be in range" in result.output
90+
91+
92+
def test_address_port_too_large(runner):
93+
result = invoke_spawn(runner, ["--addresses", "host0:99999", "--process-id", "0"])
94+
assert result.exit_code != 0
95+
assert "must be in range" in result.output
96+
97+
98+
def test_addresses_duplicate_entries(runner):
99+
result = invoke_spawn(
100+
runner,
101+
["--addresses", "host0:10000,host0:10000", "--process-id", "0"],
102+
)
103+
assert result.exit_code != 0
104+
assert "duplicate entries" in result.output
105+
106+
107+
def test_process_id_out_of_range(runner):
108+
result = invoke_spawn(
109+
runner,
110+
["--addresses", "host0:10000,host1:10000", "--process-id", "5"],
111+
)
112+
assert result.exit_code != 0
113+
assert "--process-id 5 is out of range" in result.output
114+
115+
116+
def test_process_id_negative(runner):
117+
result = invoke_spawn(
118+
runner,
119+
["--addresses", "host0:10000,host1:10000", "--process-id", "-1"],
120+
)
121+
assert result.exit_code != 0
122+
assert "--process-id -1 is out of range" in result.output
123+
124+
125+
def test_threads_zero(runner):
126+
result = invoke_spawn(runner, ["--threads", "0"])
127+
assert result.exit_code != 0
128+
assert "--threads must be at least 1" in result.output
129+
130+
131+
def test_threads_negative(runner):
132+
result = invoke_spawn(runner, ["--threads", "-4"])
133+
assert result.exit_code != 0
134+
assert "--threads must be at least 1" in result.output
135+
136+
137+
def test_processes_zero(runner):
138+
result = invoke_spawn(runner, ["--processes", "0"])
139+
assert result.exit_code != 0
140+
assert "--processes must be at least 1" in result.output
141+
142+
143+
def test_first_port_overflow(runner):
144+
result = invoke_spawn(runner, ["--processes", "3", "--first-port", "65534"])
145+
assert result.exit_code != 0
146+
assert "exceeds the maximum" in result.output
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import json
2+
import os
3+
import time
4+
import uuid
5+
6+
import pytest
7+
8+
from pathway.cli import create_process_handles, terminate_process_handles
9+
10+
IDENTITY_PROGRAM = os.path.join(os.path.dirname(__file__), "identity.py")
11+
12+
13+
def test_two_machine_identity(tmp_path, two_free_ports):
14+
port1, port2 = two_free_ports
15+
addresses = f"127.0.0.1:{port1},127.0.0.1:{port2}"
16+
input_path = tmp_path / "input.txt"
17+
output_path = tmp_path / "output.jsonl"
18+
input_path.write_text("hello world\n")
19+
20+
env_base = os.environ.copy()
21+
common_args = dict(
22+
processes=2,
23+
threads=1,
24+
first_port=port1,
25+
addresses=addresses,
26+
env_base=env_base,
27+
program="python",
28+
arguments=[IDENTITY_PROGRAM, str(input_path), str(output_path)],
29+
)
30+
31+
process0 = create_process_handles(
32+
**common_args, process_id=0, run_id=str(uuid.uuid4())
33+
)[0]
34+
try:
35+
time.sleep(15)
36+
assert process0.poll() is None, "Process 0 exited before process 1 was launched"
37+
assert not output_path.exists(), "Output appeared before process 1 was launched"
38+
39+
time.sleep(15)
40+
assert process0.poll() is None, "Process 0 exited before process 1 was launched"
41+
assert not output_path.exists(), "Output appeared before process 1 was launched"
42+
43+
process1 = create_process_handles(
44+
**common_args, process_id=1, run_id=str(uuid.uuid4())
45+
)[0]
46+
try:
47+
deadline = time.time() + 60
48+
while time.time() < deadline:
49+
if process0.poll() is not None and process1.poll() is not None:
50+
break
51+
time.sleep(0.5)
52+
else:
53+
pytest.fail("Processes did not complete within 60 seconds")
54+
55+
assert (
56+
process0.returncode == 0
57+
), f"Process 0 exited with code {process0.returncode}"
58+
assert (
59+
process1.returncode == 0
60+
), f"Process 1 exited with code {process1.returncode}"
61+
finally:
62+
terminate_process_handles([process1])
63+
finally:
64+
terminate_process_handles([process0])
65+
66+
assert output_path.exists(), "Output file was not created"
67+
rows = [
68+
json.loads(line)
69+
for line in output_path.read_text().splitlines()
70+
if line.strip()
71+
]
72+
assert any(row.get("data") == "hello world" for row in rows)

0 commit comments

Comments
 (0)