Skip to content

Commit 9dbf6a5

Browse files
committed
fix: Fixes cloud storage handlers
1 parent a1877a0 commit 9dbf6a5

6 files changed

Lines changed: 109 additions & 12 deletions

File tree

NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ export(is_delta_table_path)
1717
export(load_datetime)
1818
export(load_version)
1919
export(partition_columns)
20+
export(register_cloud_handlers)
2021
export(table_version)
2122
export(vacuum)
2223
export(write_deltalake)

R/00_classes.R

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
#' @import S7
22
NULL
33

4+
.onLoad <- function(...) {
5+
# Register cloud storage handlers (GCS, S3, Azure) for Delta Lake
6+
7+
register_cloud_handlers()
8+
9+
S7::methods_register()
10+
}
11+
412
#' DeltaTable S7 Class
513
#'
614
#' An S7 class representing a Delta Lake table.

R/delta_table.R

Lines changed: 68 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
#' @importFrom rlang abort
2+
NULL
3+
14
#' Create a DeltaTable object
25
#'
36
#' Opens an existing Delta Lake table at the specified path.
@@ -65,11 +68,22 @@ delta_table <- function(
6568
# Open the table
6669
internal <- delta_table_open(path, storage_options)
6770

71+
# Handle errors from Rust
72+
if (methods::is(internal, "error")) {
73+
rlang::abort(internal$value)
74+
}
75+
6876
# Time travel if requested
6977
if (!is.null(version)) {
70-
internal$load_version(as.integer(version))
78+
result <- internal$load_version(as.integer(version))
79+
if (methods::is(result, "error")) {
80+
rlang::abort(result$value)
81+
}
7182
} else if (!is.null(datetime)) {
72-
internal$load_datetime(datetime)
83+
result <- internal$load_datetime(datetime)
84+
if (methods::is(result, "error")) {
85+
rlang::abort(result$value)
86+
}
7387
}
7488

7589
# Create and return the S7 object
@@ -90,7 +104,11 @@ table_version <- new_generic("table_version", "table", function(table, ...) {
90104

91105
#' @export
92106
method(table_version, DeltaTable) <- function(table, ...) {
93-
table@internal$version()
107+
result <- table@internal$version()
108+
if (methods::is(result, "error")) {
109+
rlang::abort(result$value)
110+
}
111+
result
94112
}
95113

96114
#' Get the list of Parquet files in the current table snapshot
@@ -119,7 +137,11 @@ get_files <- new_generic("get_files", "table", function(table, ...) {
119137

120138
#' @export
121139
method(get_files, DeltaTable) <- function(table) {
122-
table@internal$get_files()
140+
result <- table@internal$get_files()
141+
if (methods::is(result, "error")) {
142+
rlang::abort(result$value)
143+
}
144+
result
123145
}
124146

125147
#' Get table metadata
@@ -138,7 +160,11 @@ get_metadata <- new_generic("get_metadata", "table", function(table, ...) {
138160

139161
#' @export
140162
method(get_metadata, DeltaTable) <- function(table) {
141-
table@internal$metadata()
163+
result <- table@internal$metadata()
164+
if (methods::is(result, "error")) {
165+
rlang::abort(result$value)
166+
}
167+
result
142168
}
143169

144170
#' Get table schema
@@ -156,7 +182,11 @@ get_schema <- new_generic("get_schema", "table", function(table, ...) {
156182

157183
#' @export
158184
method(get_schema, DeltaTable) <- function(table) {
159-
table@internal$schema()
185+
result <- table@internal$schema()
186+
if (methods::is(result, "error")) {
187+
rlang::abort(result$value)
188+
}
189+
result
160190
}
161191

162192
#' Get commit history
@@ -175,7 +205,11 @@ history <- new_generic("history", "table", function(table, ...) {
175205

176206
#' @export
177207
method(history, DeltaTable) <- function(table, ..., limit = NULL) {
178-
table@internal$history(limit)
208+
result <- table@internal$history(limit)
209+
if (methods::is(result, "error")) {
210+
rlang::abort(result$value)
211+
}
212+
result
179213
}
180214

181215
#' Get partition columns
@@ -197,7 +231,11 @@ partition_columns <- new_generic(
197231

198232
#' @export
199233
method(partition_columns, DeltaTable) <- function(table) {
200-
table@internal$partition_columns()
234+
result <- table@internal$partition_columns()
235+
if (methods::is(result, "error")) {
236+
rlang::abort(result$value)
237+
}
238+
result
201239
}
202240

203241
#' Vacuum a Delta table
@@ -227,7 +265,15 @@ method(vacuum, DeltaTable) <- function(
227265
dry_run = TRUE,
228266
enforce_retention_duration = TRUE
229267
) {
230-
table@internal$vacuum(retention_hours, dry_run, enforce_retention_duration)
268+
result <- table@internal$vacuum(
269+
retention_hours,
270+
dry_run,
271+
enforce_retention_duration
272+
)
273+
if (methods::is(result, "error")) {
274+
rlang::abort(result$value)
275+
}
276+
result
231277
}
232278

233279
#' Load a specific version of the table
@@ -246,7 +292,10 @@ load_version <- new_generic("load_version", "table", function(table, ...) {
246292

247293
#' @export
248294
method(load_version, DeltaTable) <- function(table, ..., version) {
249-
table@internal$load_version(as.integer(version))
295+
result <- table@internal$load_version(as.integer(version))
296+
if (methods::is(result, "error")) {
297+
rlang::abort(result$value)
298+
}
250299
invisible(table)
251300
}
252301

@@ -266,7 +315,10 @@ load_datetime <- new_generic("load_datetime", "table", function(table, ...) {
266315

267316
#' @export
268317
method(load_datetime, DeltaTable) <- function(table, ..., datetime) {
269-
table@internal$load_datetime(datetime)
318+
result <- table@internal$load_datetime(datetime)
319+
if (methods::is(result, "error")) {
320+
rlang::abort(result$value)
321+
}
270322
invisible(table)
271323
}
272324

@@ -280,5 +332,9 @@ method(load_datetime, DeltaTable) <- function(table, ..., datetime) {
280332
#'
281333
#' @export
282334
is_delta_table_path <- function(path, storage_options = NULL) {
283-
is_delta_table(path, storage_options)
335+
result <- is_delta_table(path, storage_options)
336+
if (methods::is(result, "error")) {
337+
rlang::abort(result$value)
338+
}
339+
result
284340
}

R/extendr-wrappers.R

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@
1010
#' @useDynLib deltaR, .registration = TRUE
1111
NULL
1212

13+
#' Register cloud storage handlers (GCS, S3, Azure) for deltalake
14+
#' Called from R's .onLoad to enable cloud storage support
15+
#' @export
16+
register_cloud_handlers <- function() invisible(.Call(wrap__register_cloud_handlers))
17+
1318
#' Open a Delta Table at the specified path
1419
#' @export
1520
delta_table_open <- function(path, storage_options) .Call(wrap__delta_table_open, path, storage_options)

man/register_cloud_handlers.Rd

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/rust/src/lib.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,19 @@ use std::collections::HashMap;
1212
use std::sync::Arc;
1313
use tokio::runtime::Runtime;
1414

15+
/// Register cloud storage handlers (GCS, S3, Azure) for deltalake
16+
/// Called from R's .onLoad to enable cloud storage support
17+
/// @export
18+
#[extendr]
19+
fn register_cloud_handlers() {
20+
// Register GCS handler
21+
deltalake::gcp::register_handlers(None);
22+
// Register S3 handler
23+
deltalake::aws::register_handlers(None);
24+
// Register Azure handler
25+
deltalake::azure::register_handlers(None);
26+
}
27+
1528
/// Convert a kernel DataType to an Arrow DataType
1629
fn kernel_type_to_arrow(kernel_type: &KernelDataType) -> ArrowDataType {
1730
match kernel_type {
@@ -393,6 +406,7 @@ extendr_module! {
393406
mod deltaR;
394407
use write;
395408
impl DeltaTableInternal;
409+
fn register_cloud_handlers;
396410
fn delta_table_open;
397411
fn is_delta_table;
398412
}

0 commit comments

Comments
 (0)