Skip to content

Commit 530cd6e

Browse files
committed
feat: Add RaySource, to_ray_dataset first-class method, docs, and tests
Introduces RaySource, a pure-metadata data source descriptor that lets Feast load any Ray Data-readable format (Parquet, CSV, JSON, HuggingFace datasets, MongoDB, binary files, images, TFRecords, WebDataset, SQL, and text) without requiring an intermediate Parquet file. Makes to_ray_dataset() a first-class method on RetrievalJob and FeatureStore. Wires up distributed materialization in the Ray compute engine. Adds reference documentation and unit tests. Signed-off-by: ntkathole <[email protected]>
1 parent 608b105 commit 530cd6e

15 files changed

Lines changed: 1472 additions & 47 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ The list below contains the functionality that contributors are planning to deve
185185
* [x] [Athena (contrib plugin)](https://docs.feast.dev/reference/data-sources/athena)
186186
* [x] [Clickhouse (contrib plugin)](https://docs.feast.dev/reference/data-sources/clickhouse)
187187
* [x] [Oracle (contrib plugin)](https://docs.feast.dev/reference/data-sources/oracle)
188+
* [x] [Ray source (contrib plugin)](https://docs.feast.dev/reference/data-sources/ray)
188189
* [x] Kafka / Kinesis sources (via [push support into the online store](https://docs.feast.dev/reference/data-sources/push))
189190
* **Offline Stores**
190191
* [x] [Snowflake](https://docs.feast.dev/reference/offline-stores/snowflake)

docs/SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
* [Oracle (contrib)](reference/data-sources/oracle.md)
105105
* [Athena (contrib)](reference/data-sources/athena.md)
106106
* [Clickhouse (contrib)](reference/data-sources/clickhouse.md)
107+
* [Ray (contrib)](reference/data-sources/ray.md)
107108
* [Offline stores](reference/offline-stores/README.md)
108109
* [Overview](reference/offline-stores/overview.md)
109110
* [Dask](reference/offline-stores/dask.md)

docs/getting-started/concepts/feature-retrieval.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,59 @@ training_df = store.get_historical_features(
297297
).to_df()
298298
```
299299

300+
### Step 3: Choosing an output format
301+
302+
`get_historical_features()` returns a `RetrievalJob` object. You can convert it
303+
to the format that suits your downstream pipeline:
304+
305+
**Data conversion methods**
306+
307+
| Method | Returns | When to use |
308+
|---|---|---|
309+
| `.to_df()` | `pandas.DataFrame` | General-purpose; scikit-learn, XGBoost, statsmodels |
310+
| `.to_feast_df()` | `FeastDataFrame` | Feast-native wrapper with engine metadata; preferred for Feast-internal tooling |
311+
| `.to_arrow()` | `pyarrow.Table` | Arrow-native pipelines, Polars, DuckDB, zero-copy interchange |
312+
| `.to_tensor(kind="torch")` | `Dict[str, torch.Tensor]` | Direct PyTorch training loops; numeric columns become tensors |
313+
| `.to_ray_dataset()` | `ray.data.Dataset` | Ray Train, Ray Serve, distributed ML workloads |
314+
315+
**Persistence methods**
316+
317+
| Method | Effect | When to use |
318+
|---|---|---|
319+
| `.persist(storage)` | Writes result to offline storage | Save a training dataset for later reuse or auditing |
320+
| `.to_remote_storage()` | Exports result to S3/GCS as Parquet files | Hand off to external systems or data pipelines |
321+
322+
#### Retrieving as a Ray Dataset
323+
324+
`to_ray_dataset()` is a **first-class method** on every `RetrievalJob`. When
325+
the underlying offline store is a `RayOfflineStore`, the dataset is returned
326+
directly without a copy through Arrow. For all other offline stores, a
327+
zero-copy Arrow → Ray Dataset conversion is used as a fallback.
328+
329+
```python
330+
from feast import FeatureStore
331+
332+
store = FeatureStore(".")
333+
334+
# to_ray_dataset() is a first-class method on the RetrievalJob — chain it
335+
# directly after get_historical_features().
336+
ray_ds = store.get_historical_features(
337+
entity_df=entity_df,
338+
features=["driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate"],
339+
).to_ray_dataset()
340+
341+
# Use with Ray Train
342+
import ray.train
343+
trainer = ray.train.torch.TorchTrainer(
344+
train_loop_per_worker=...,
345+
datasets={"train": ray_ds},
346+
)
347+
```
348+
349+
> **Note:** `to_ray_dataset()` requires `feast[ray]` to be installed.
350+
351+
---
352+
300353
## Retrieving online features (for model inference)
301354
Feast will ensure the latest feature values for registered features are available. At retrieval time, you need to supply a list of **entities** and the corresponding **features** to be retrieved. Similar to `get_historical_features`, we recommend using feature services as a mechanism for grouping features in a model version.
302355

docs/reference/data-sources/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,7 @@ Please see [Data Source](../../getting-started/concepts/data-ingestion.md) for a
6565
{% content-ref url="oracle.md" %}
6666
[oracle.md](oracle.md)
6767
{% endcontent-ref %}
68+
69+
{% content-ref url="ray.md" %}
70+
[ray.md](ray.md)
71+
{% endcontent-ref %}

docs/reference/data-sources/ray.md

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
# Ray Data Source (contrib)
2+
3+
> **⚠️ Contrib Plugin:**
4+
> `RaySource` is a contributed plugin shipped alongside the [Ray offline store](../offline-stores/ray.md). It may not be as stable or fully supported as core data sources.
5+
6+
`RaySource` is a pure-metadata descriptor that tells Feast **how** to load a
7+
[Ray Dataset](https://docs.ray.io/en/latest/data/api/dataset.html) from any
8+
source that Ray Data supports natively — Parquet, CSV, JSON, HuggingFace
9+
Datasets, MongoDB, binary files, images, TFRecords, and more.
10+
11+
It is the recommended data source when using the
12+
[Ray offline store](../offline-stores/ray.md) and replaces the need for
13+
`FileSource` for all non-Parquet and non-file-based data.
14+
15+
---
16+
17+
## When to use RaySource vs FileSource
18+
19+
| Scenario | Recommended source |
20+
|---|---|
21+
| Parquet files on disk / S3 / GCS (existing setup) | `FileSource` (backward compatible) |
22+
| Parquet via Ray reader (pipelines, remote auth) | `RaySource(reader_type="parquet")` |
23+
| CSV, JSON, text, images via Ray | `RaySource` |
24+
| HuggingFace `datasets` library | `RaySource(reader_type="huggingface")` |
25+
| MongoDB, SQL, TFRecords, WebDataset | `RaySource` |
26+
27+
---
28+
29+
## Installation
30+
31+
`RaySource` is bundled with the Ray offline store contrib package:
32+
33+
```bash
34+
pip install 'feast[ray]'
35+
```
36+
37+
---
38+
39+
## Supported `reader_type` values
40+
41+
| `reader_type` | Underlying Ray API | Notes |
42+
|---|---|---|
43+
| `parquet` | `ray.data.read_parquet` | S3, GCS, HDFS, local |
44+
| `csv` | `ray.data.read_csv` | |
45+
| `json` | `ray.data.read_json` | |
46+
| `text` | `ray.data.read_text` | |
47+
| `images` | `ray.data.read_images` | |
48+
| `binary_files` | `ray.data.read_binary_files` | |
49+
| `tfrecords` | `ray.data.read_tfrecords` | |
50+
| `webdataset` | `ray.data.read_webdataset` | |
51+
| `huggingface` | `ray.data.from_huggingface` | Wraps `datasets.load_dataset` |
52+
| `mongo` | `ray.data.read_mongo` | |
53+
| `sql` | `ray.data.read_sql` | Pass `connection_url` in `reader_options` |
54+
55+
---
56+
57+
## Configuration
58+
59+
### Parameters
60+
61+
| Parameter | Type | Required | Description |
62+
|---|---|---|---|
63+
| `name` | `str` | Yes | Unique name for this data source |
64+
| `reader_type` | `str` | Yes | One of the supported reader types above |
65+
| `path` | `str` | No | File or directory path (required for file-based readers) |
66+
| `reader_options` | `dict` | No | Extra keyword arguments forwarded to the Ray reader |
67+
| `timestamp_field` | `str` | No | Column containing event timestamps |
68+
| `created_timestamp_column` | `str` | No | Column containing row creation timestamps |
69+
| `tags` | `dict` | No | Arbitrary key-value metadata |
70+
| `description` | `str` | No | Human-readable description |
71+
| `owner` | `str` | No | Owning team or contact |
72+
73+
---
74+
75+
## Usage examples
76+
77+
### Parquet on S3
78+
79+
```python
80+
from feast.infra.offline_stores.contrib.ray_offline_store.ray_source import RaySource
81+
82+
driver_stats = RaySource(
83+
name="driver_stats_parquet",
84+
reader_type="parquet",
85+
path="s3://my-bucket/driver_stats/",
86+
timestamp_field="event_timestamp",
87+
)
88+
```
89+
90+
### CSV
91+
92+
```python
93+
sensor_readings = RaySource(
94+
name="sensor_readings_csv",
95+
reader_type="csv",
96+
path="/data/sensors/",
97+
timestamp_field="ts",
98+
)
99+
```
100+
101+
### HuggingFace dataset
102+
103+
Load a dataset from the [HuggingFace Hub](https://huggingface.co/datasets)
104+
directly into Feast.
105+
106+
```python
107+
from feast.infra.offline_stores.contrib.ray_offline_store.ray_source import RaySource
108+
109+
cheque_images = RaySource(
110+
name="cheque_images_hf",
111+
reader_type="huggingface",
112+
reader_options={
113+
"dataset_name": "cheques_sample_data",
114+
"split": "train",
115+
},
116+
timestamp_field="event_timestamp",
117+
)
118+
```
119+
120+
### MongoDB
121+
122+
```python
123+
transaction_log = RaySource(
124+
name="transactions_mongo",
125+
reader_type="mongo",
126+
reader_options={
127+
"uri": "mongodb://localhost:27017",
128+
"database": "featuredb",
129+
"collection": "transactions",
130+
},
131+
timestamp_field="created_at",
132+
)
133+
```
134+
135+
### SQL (via connection URL)
136+
137+
```python
138+
user_features = RaySource(
139+
name="user_features_sql",
140+
reader_type="sql",
141+
reader_options={
142+
"connection_url": "postgresql+psycopg2://user:password@host:5432/db", # pragma: allowlist secret
143+
"query": "SELECT * FROM user_features",
144+
},
145+
timestamp_field="event_timestamp",
146+
)
147+
```
148+
149+
---
150+
151+
## Using RaySource in a BatchFeatureView
152+
153+
```python
154+
from datetime import timedelta
155+
from feast import BatchFeatureView, Entity, Field
156+
from feast.types import Float32, Int64, String
157+
from feast.infra.offline_stores.contrib.ray_offline_store.ray_source import RaySource
158+
159+
cheque = Entity(name="cheque_id", description="Unique cheque identifier")
160+
161+
cheque_source = RaySource(
162+
name="cheque_images_hf",
163+
reader_type="huggingface",
164+
reader_options={
165+
"dataset_name": "cheques_sample_data",
166+
"split": "train",
167+
},
168+
timestamp_field="event_timestamp",
169+
)
170+
171+
cheque_ocr_fv = BatchFeatureView(
172+
name="cheque_ocr_features",
173+
entities=[cheque],
174+
ttl=timedelta(days=365),
175+
schema=[
176+
Field(name="cheque_id", dtype=Int64),
177+
Field(name="payee_name", dtype=String),
178+
Field(name="amount", dtype=String),
179+
Field(name="bank_name", dtype=String),
180+
Field(name="raw_text", dtype=String),
181+
],
182+
source=cheque_source,
183+
)
184+
```
185+
186+
---
187+
188+
## Retrieving data as a Ray Dataset
189+
190+
Once the feature view is materialised you can retrieve the offline features
191+
directly as a Ray Dataset using the first-class `to_ray_dataset()` method:
192+
193+
```python
194+
from feast import FeatureStore
195+
196+
store = FeatureStore(".")
197+
198+
# Chain directly on the retrieval job — to_ray_dataset() is a first-class
199+
# method on every RetrievalJobs.
200+
ds = store.get_historical_features(
201+
features=["cheque_ocr_features:payee_name", "cheque_ocr_features:amount"],
202+
entity_df=entity_df,
203+
).to_ray_dataset()
204+
205+
# Use the dataset downstream in Ray or ML pipelines
206+
ds.show(3)
207+
```
208+
209+
---
210+
211+
## Proto serialisation
212+
213+
`RaySource` is fully serialisable to Feast's protobuf registry format. The
214+
`reader_type`, `path`, and `reader_options` dict are all persisted and can be
215+
round-tripped via `to_proto()` / `from_proto()`.
216+
217+
---
218+
219+
## Limitations
220+
221+
* The Ray offline store (and therefore `RaySource`) requires `feast[ray]`.
222+
* `reader_type="sql"` requires a serialisable `connection_url`; raw
223+
`sqlalchemy.engine.Engine` objects cannot be pickled across Ray workers.
224+
* Streaming sources (Kafka, Kinesis) are not supported via `RaySource`; use
225+
the dedicated [Kafka](kafka.md) or [Kinesis](kinesis.md) data sources.
226+
227+
---
228+
229+
## Related pages
230+
231+
* [Ray Offline Store](../offline-stores/ray.md)
232+
* [Ray Compute Engine](../compute-engine/ray.md)
233+
* [Feature Retrieval](../../getting-started/concepts/feature-retrieval.md)

docs/reference/offline-stores/ray.md

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -557,14 +557,45 @@ except Exception as e:
557557
print(f"Data source validation failed: {e}")
558558
```
559559

560+
## Data Sources
561+
562+
[`RaySource`](../data-sources/ray.md) is the recommended data source for the
563+
Ray offline store. It is a pure-metadata descriptor that tells Feast how to
564+
load a Ray Dataset from any source Ray Data supports — Parquet, CSV, JSON,
565+
HuggingFace datasets, MongoDB, binary files, images, TFRecords, WebDataset,
566+
SQL, and more.
567+
568+
```python
569+
from feast.infra.offline_stores.contrib.ray_offline_store.ray_source import RaySource
570+
571+
# Load directly from the HuggingFace Hub
572+
cheque_source = RaySource(
573+
name="cheque_images_hf",
574+
reader_type="huggingface",
575+
reader_options={
576+
"dataset_name": "cheques_sample_data",
577+
"split": "train",
578+
},
579+
timestamp_field="event_timestamp",
580+
)
581+
```
582+
583+
See the [RaySource reference](../data-sources/ray.md) for a full list of
584+
`reader_type` values and configuration options.
585+
586+
> **Note:** `FileSource` (Parquet) remains supported for backward compatibility
587+
> but `RaySource(reader_type="parquet")` is preferred for new projects.
588+
560589
## Limitations
561590

562-
The Ray offline store has the following limitations:
591+
The Ray offline store has one known limitation:
563592

564-
1. **File Sources Only**: Currently supports only `FileSource` data sources
565-
2. **No Direct SQL**: Does not support SQL query interfaces
566-
3. **No Online Writes**: Cannot write directly to online stores
567-
4. **No Complex Transformations**: The Ray offline store focuses on data I/O operations. For complex feature transformations (aggregations, joins, custom UDFs), use the [Ray Compute Engine](../compute-engine/ray.md) instead
593+
* **`online_write_batch` not implemented**: The `OfflineStore.online_write_batch()` interface
594+
is not supported by the Ray offline store. This does **not** affect materialization —
595+
`feast materialize` writes to the online store correctly via the
596+
[Ray Compute Engine](../compute-engine/ray.md). The restriction only applies to callers
597+
that invoke `online_write_batch` on the offline store object directly, which is an
598+
uncommon pattern outside of custom tooling.
568599

569600
## Integration with Ray Compute Engine
570601

docs/roadmap.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ The list below contains the functionality that contributors are planning to deve
2020
* [x] [Athena (contrib plugin)](https://docs.feast.dev/reference/data-sources/athena)
2121
* [x] [Clickhouse (contrib plugin)](https://docs.feast.dev/reference/data-sources/clickhouse)
2222
* [x] [Oracle (contrib plugin)](https://docs.feast.dev/reference/data-sources/oracle)
23+
* [x] [Ray source (contrib plugin)](https://docs.feast.dev/reference/data-sources/ray)
2324
* [x] Kafka / Kinesis sources (via [push support into the online store](https://docs.feast.dev/reference/data-sources/push))
2425
* **Offline Stores**
2526
* [x] [Snowflake](https://docs.feast.dev/reference/offline-stores/snowflake)

0 commit comments

Comments
 (0)