Marin's executor framework manages the execution of experiments. This document is more about the mechanics, read this to learn more about the conventions.
An experiment is a sequence (really, a DAG) of steps, where each step is specified by the following:
- name: an identifier describing the function (and its version)
- function: a normal Python function or a Ray remote function (which enables massive parallelism)
- config: the single argument to the function, which is a dataclass; fields of the config can refer to previous steps.
A key decision in Marin is that data gets passed between steps by reading and writing to the filesystem. The rationale is two-fold:
- For very large datasets, where efficiency and robustness is a concern, we give the steps full control over serialization and deserialization.
- It makes the intermediate state completely transparent, and one can do things like monitor the state while it's being created (e.g., a jsonl file).
In particular, each step associated with an output path where that step writes its output (in any format). When a step A references another step B in its config, that step simply resolves to step B's output path, and step A is responsible for reading the data from that output path. The name of the output path includes the step name and a hash of the config (at least the part of it that's explicitly versioned) and all its dependencies.
In the hello world example, we have two steps, generating data and compute statistics.
See the documentation in executor.py for more details.
Coordination between multiple pipelines is handled via lease files. This prevents duplicate execution if, for example, 2 Executor pipelines share common ancestor steps.
Some datasets live in a specific regional bucket (e.g.
gs://marin-us-central2/documents/stackexchange/...) but experiments may run
from any region. The mirrored() wrapper marks an input path for
cross-region mirroring so that the executor copies the data to the local
marin prefix before the step runs.
from marin.execution.executor import mirrored, versioned
step = ExecutorStep(
name="train",
fn=my_training_fn,
config=TrainConfig(
dataset=mirrored(versioned("documents/stackexchange/v1"), budget_gb=50),
),
)At config instantiation time, mirrored() rewrites the path to use the
mirror:// protocol. When the step's function opens the path via fsspec,
the MirrorFileSystem transparently copies data from whichever regional bucket
has it into the local marin prefix, respecting the per-path transfer budget.
Key details:
budget_gb(default 10) caps how much data (in GB) a single step may copy cross-region. The budget is enforced via themirror_budgetcontext manager fromrigging.filesystem.- Paths that already exist in the local prefix are not re-copied.
mirrored()can wrap plain strings orVersionedValue/InputNamereferences.- To adjust the global mirror budget default, set the
MARIN_MIRROR_BUDGET_GBenvironment variable before the process starts.
Recall that a step's function can either be a normal Python function or in most realistic cases, a Ray remote function. Ray allows us to run a large computation distributed over a cluster and is generally used for large data processing tasks. For example, we can break a large dataset into shards and create a Ray function to process each shard.
Ray packages up the code from the local directory and ships it off to the appropriate machine. The environment will have the following packages installed:
- Default packages: installed on the Ray cluster (
dependenciesinpyproject.toml), which include fsspec, draccus, etc. - Step-specific packages: remote steps can specify
pip_dependency_groupsvia theremote()wrapper, a list of either (i) a key fromproject.optional-dependenciesinlib/marin/pyproject.toml(e.g.,rl), or (2) a specific pip package. This allows each step to have its own environment and not interfere with other steps.
For example, to install the dependencies specified in the
rl extra and also uv pip install google-cloud-logging,
one can do:
number_of_restarts = ExecutorStep(
name=...,
fn=remote(my_fn, pip_dependency_groups=["rl", "google-cloud-logging"]),
config=...,
)Finally, to launch an experiment, use ray_run.py, which
launches jobs to the Ray cluster:
uv run lib/marin/src/marin/run/ray_run.py -- python experiments/tutorials/hello_world.pyThis script ensure that:
- All the relevant libraries (specified above) are installed.
- The working directory is set appropriately.
- Any subpaths under submodules are appended to PYTHONPATH.
Agents can use the
add-datasetskill for a guide to dataset schema inspection and addition.