Skip to content

Commit 3398281

Browse files
Page cache prefetching with prefetch_page_cache
1 parent 05268ad commit 3398281

12 files changed

Lines changed: 453 additions & 49 deletions

File tree

.github/workflows/python.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ jobs:
225225
pip install JollyJack --pre --find-links ./dist --break-system-packages --only-binary=:all: --force-reinstall --no-index --no-deps
226226
python3 ./benchmarks/benchmark_jollyjack.py
227227
env:
228-
JJ_benchmark_mode: ${{ matrix.mode }}
228+
JJB_benchmark_mode: ${{ matrix.mode }}
229229

230230
publish:
231231
if: ${{ !github.event.repository.fork && startsWith(github.ref, 'refs/tags/v') }}

README.md

Lines changed: 107 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,28 @@ your workload is I/O-bound or memory-/CPU-bound.
4848

4949
### Large datasets (exceed filesystem cache)
5050

51-
For datasets larger than the available page cache, performance is typically I/O-bound.
51+
For datasets larger than the available page cache, performance is typically
52+
I/O-bound. Enabling either `pre_buffer=True` or `prefetch_page_cache=True`
53+
brings throughput close to the raw I/O ceiling.
5254

5355
Recommended configuration:
5456

55-
- `use_threads = True`, `pre_buffer = True`, `JJ_READER_BACKEND = io_uring_odirect`
57+
- `use_threads = True`, `prefetch_page_cache = True`, `pre_buffer = False`,
58+
with the default reader backend.
5659

57-
This combination bypasses the page cache, reduces double-buffering, and allows deeper I/O queues via io_uring.
60+
Both options reach near-identical throughput. `prefetch_page_cache` avoids the
61+
temporary buffer copies that `pre_buffer` uses (see section below) and the
62+
increased LLC miss rate.
5863

5964
### Small datasets (fit in filesystem cache)
6065

61-
For datasets that comfortably fit in RAM, performance is typically CPU- or memory-bound.
66+
For datasets that comfortably fit in RAM, performance is typically CPU- or
67+
memory-bound.
6268

6369
Recommended configuration:
6470

65-
- `use_threads = False`, `pre_buffer = False`, and the default reader backend (no io_uring).
71+
- `use_threads = True`, `prefetch_page_cache = True`, `pre_buffer = False`,
72+
with the default reader backend.
6673

6774
### Pre-buffering and `cache_options`
6875

@@ -102,10 +109,62 @@ To debug allocator issues with mimalloc, run with `MIMALLOC_SHOW_STATS=1` and
102109

103110
### Pre-buffering and `ARROW_IO_THREADS`
104111

105-
When `pre_buffer=True`, Arrow dispatches reads to its IO thread pooll,
112+
When `pre_buffer=True`, Arrow dispatches reads to its IO thread pool,
106113
configured via the `ARROW_IO_THREADS` environment variable (default: 8).
107114
Tuning this value may improve performance.
108115

116+
### Page cache prefetching with `prefetch_page_cache`
117+
118+
With `pre_buffer=True`, Arrow's IO thread pool allocates temporary buffers
119+
and fills them on the IO thread's core. When worker threads on different
120+
cores later consume those buffers, the data is cold in their caches,
121+
causing LLC misses.
122+
123+
`prefetch_page_cache` provides an alternative: it calls
124+
`posix_fadvise(POSIX_FADV_WILLNEED)` to tell the kernel to start loading
125+
the relevant byte ranges into the page cache. Each worker thread then
126+
reads directly via `pread` into its own locally-allocated buffer, keeping
127+
data hot in its local CPU caches.
128+
129+
Two ways to use it:
130+
131+
**As a parameter on `read_into_numpy`:**
132+
133+
```python
134+
jj.read_into_numpy(
135+
source=path,
136+
metadata=pr.metadata,
137+
np_array=np_array,
138+
row_group_indices=range(pr.metadata.num_row_groups),
139+
column_indices=range(pr.metadata.num_columns),
140+
prefetch_page_cache=True,
141+
)
142+
```
143+
144+
This is only useful for local or network-mounted file systems that have a
145+
page cache. Remote file systems such as S3 will not benefit from this.
146+
147+
**As a standalone call** (when you want to prefetch ahead of time, e.g.
148+
from a different thread):
149+
150+
```python
151+
jj.prefetch_page_cache(
152+
source=path,
153+
metadata=pr.metadata,
154+
row_group_indices=range(pr.metadata.num_row_groups),
155+
column_indices=range(pr.metadata.num_columns),
156+
)
157+
158+
jj.read_into_numpy(
159+
source=path,
160+
metadata=pr.metadata,
161+
np_array=np_array,
162+
row_group_indices=range(pr.metadata.num_row_groups),
163+
column_indices=range(pr.metadata.num_columns),
164+
pre_buffer=False,
165+
)
166+
```
167+
109168
## Requirements
110169

111170
- pyarrow ~= 24.0.0
@@ -253,6 +312,48 @@ with fs.LocalFileSystem().open_input_file(path) as f:
253312
print(np_array)
254313
```
255314

315+
### Using page cache prefetching
316+
```python
317+
np_array = np.zeros((n_rows, n_columns), dtype="f", order="F")
318+
pr = pq.ParquetReader()
319+
pr.open(path)
320+
321+
# cache_options controls which byte ranges are prefetched into the page cache
322+
cache_options = pa.CacheOptions(
323+
hole_size_limit=8192,
324+
range_size_limit=16*1024*1024,
325+
lazy=False,
326+
)
327+
328+
# Prefetch and read in one call
329+
jj.read_into_numpy(
330+
source=path,
331+
metadata=pr.metadata,
332+
np_array=np_array,
333+
row_group_indices=range(pr.metadata.num_row_groups),
334+
column_indices=range(pr.metadata.num_columns),
335+
cache_options=cache_options,
336+
prefetch_page_cache=True,
337+
)
338+
339+
# Or prefetch separately, then read
340+
jj.prefetch_page_cache(
341+
source=path,
342+
metadata=pr.metadata,
343+
row_group_indices=range(pr.metadata.num_row_groups),
344+
column_indices=range(pr.metadata.num_columns),
345+
cache_options=cache_options,
346+
)
347+
jj.read_into_numpy(
348+
source=path,
349+
metadata=pr.metadata,
350+
np_array=np_array,
351+
row_group_indices=range(pr.metadata.num_row_groups),
352+
column_indices=range(pr.metadata.num_columns),
353+
pre_buffer=False,
354+
)
355+
```
356+
256357
### Generating a PyTorch tensor to read into
257358
```python
258359
import torch

benchmarks/benchmark_jollyjack.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class BenchmarkSettings(BaseSettings):
4848
dtypes: list[str] = ["float32", "float16"]
4949
compressions: list[str] = ["none"]
5050
pre_buffer: list[bool] = [False, True]
51+
prefetch_page_cache: list[bool] = [False, True]
5152
use_threads: list[bool] = [False, True]
5253

5354
@classmethod
@@ -170,7 +171,6 @@ def generate_data(n_columns, n_row_groups, path, compression, dtype):
170171
if parquet_matches(
171172
path, n_columns, n_row_groups, cfg.chunk_size, compression, dtype
172173
):
173-
print(f"Reusing existing {path}")
174174
return
175175

176176
t = time.time()
@@ -215,21 +215,23 @@ def get_thread_local_np_array(dtype):
215215
return np_array
216216

217217

218-
def worker_jollyjack_numpy(use_threads, pre_buffer, dtype, path):
218+
def worker_jollyjack_numpy(use_threads, pre_buffer, prefetch_page_cache, dtype, path):
219219

220220
np_array = get_thread_local_np_array(dtype)
221221
cache_options = pa.CacheOptions(
222222
hole_size_limit=8192, # default
223223
range_size_limit=16 * 1024 * 1024, # 16 MB, fits in mimalloc arena
224224
lazy=False,
225225
)
226+
226227
jj.read_into_numpy(
227228
source=path,
228229
metadata=None,
229230
np_array=np_array,
230231
row_group_indices=row_groups_to_read,
231232
column_indices=column_indices_to_read,
232233
pre_buffer=pre_buffer,
234+
prefetch_page_cache=prefetch_page_cache,
233235
use_threads=use_threads,
234236
cache_options=cache_options,
235237
)
@@ -436,10 +438,12 @@ def measure_reading(max_workers, worker):
436438
print(f".")
437439
for n_workers in cfg.worker_counts:
438440
for pre_buffer in cfg.pre_buffer:
439-
for use_threads in cfg.use_threads:
440-
print(
441-
f"`jj.read_into_numpy` jj_reader:{jj_reader}, n_workers:{n_workers}, use_threads:{use_threads}, pre_buffer:{pre_buffer}, duration:{measure_reading(n_workers, lambda path:worker_jollyjack_numpy(use_threads, pre_buffer, dtype.to_pandas_dtype(), path = path))}"
442-
)
441+
for prefetch_page_cache in cfg.prefetch_page_cache:
442+
for use_threads in cfg.use_threads:
443+
444+
print(
445+
f"`jj.read_into_numpy` jj_reader:{jj_reader}, n_workers:{n_workers}, use_threads:{use_threads}, pre_buffer:{pre_buffer}, prefetch_page_cache:{prefetch_page_cache}, duration:{measure_reading(n_workers, lambda path:worker_jollyjack_numpy(use_threads, pre_buffer, prefetch_page_cache, dtype.to_pandas_dtype(), path = path))}"
446+
)
443447

444448
if {"all", "jj_torch"} & cfg.benchmarks_to_run:
445449
print(f".")

jollyjack/cjollyjack.pxd

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ cdef extern from "jollyjack.h":
1717
, const vector[string] &column_names
1818
, const vector[int] &target_column_indices
1919
, bool pre_buffer
20+
, bool prefetch_page_cache
2021
, bool use_threads
2122
, int64_t expected_rows
2223
, CCacheOptions cache_options
@@ -32,7 +33,7 @@ cdef extern from "jollyjack.h":
3233
size_t dst_stride1_size,
3334
vector[int] row_indices) except + nogil
3435

35-
cdef void ExperimentalAdviseWillNeed (shared_ptr[CRandomAccessFile] source
36+
cdef void PrefetchPageCache (shared_ptr[CRandomAccessFile] source
3637
, shared_ptr[CFileMetaData] file_metadata
3738
, vector[int] column_indices
3839
, const vector[int] &row_groups

jollyjack/jollyjack.cc

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ void ReadIntoMemory (std::shared_ptr<arrow::io::RandomAccessFile> source
295295
, const std::vector<std::string> &column_names
296296
, const std::vector<int> &target_column_indices
297297
, bool pre_buffer
298+
, bool prefetch_page_cache
298299
, bool use_threads
299300
, int64_t expected_rows
300301
, arrow::io::CacheOptions cache_options)
@@ -328,6 +329,19 @@ void ReadIntoMemory (std::shared_ptr<arrow::io::RandomAccessFile> source
328329
}
329330
}
330331

332+
if (prefetch_page_cache)
333+
{
334+
auto read_ranges = parquet_reader->GetReadRanges(row_groups,
335+
column_indices,
336+
cache_options.hole_size_limit,
337+
cache_options.range_size_limit
338+
).ValueOrDie();
339+
auto status = source->WillNeed(read_ranges);
340+
if (!status.ok()) {
341+
throw std::logic_error(status.message());
342+
}
343+
}
344+
331345
if (pre_buffer)
332346
{
333347
parquet_reader->PreBuffer(row_groups, column_indices, arrowReaderProperties.io_context(), cache_options);
@@ -649,7 +663,7 @@ void CopyToRowMajor (void* src_buffer, size_t src_stride0_size, size_t src_strid
649663

650664
}
651665

652-
void ExperimentalAdviseWillNeed(
666+
void PrefetchPageCache(
653667
std::shared_ptr<arrow::io::RandomAccessFile> source,
654668
std::shared_ptr<parquet::FileMetaData> file_metadata,
655669
std::vector<int> column_indices,

jollyjack/jollyjack.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ void ReadIntoMemory (std::shared_ptr<arrow::io::RandomAccessFile> source
1212
, const std::vector<std::string> &column_names
1313
, const std::vector<int> &target_column_indices
1414
, bool pre_buffer
15+
, bool prefetch_page_cache
1516
, bool use_threads
1617
, int64_t expected_rows
1718
, arrow::io::CacheOptions cache_options);
@@ -26,7 +27,8 @@ void CopyToRowMajor (void* src_buffer,
2627
size_t dst_stride1_size,
2728
std::vector<int> row_indices);
2829

29-
void ExperimentalAdviseWillNeed(
30+
// Calls posix_fadvise(POSIX_FADV_WILLNEED) on the byte ranges for the requested columns/row groups.
31+
void PrefetchPageCache(
3032
std::shared_ptr<arrow::io::RandomAccessFile> source,
3133
std::shared_ptr<parquet::FileMetaData> file_metadata,
3234
std::vector<int> column_indices,

0 commit comments

Comments
 (0)