Skip to content

Commit 3a1f4bd

Browse files
authored
Merge pull request #9 from timescale/feat/streaming-phase3
feat(streaming): Phase 3 — end-to-end async streaming from network
2 parents db8f31e + 142534a commit 3a1f4bd

6 files changed

Lines changed: 1923 additions & 300 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ sysinfo = "0.38"
3434
governor = "0.10"
3535
redis = { version = "1.0", features = ["tokio-comp", "connection-manager"] }
3636
tokio-postgres = "0.7"
37-
tokio-util = { version = "0.7", features = ["io"], optional = true }
37+
tokio-util = { version = "0.7", features = ["io", "io-util"], optional = true }
3838

3939
# Optional JS hooks (Boa engine). When disabled, sources with hooks config are ignored.
4040
boa_engine = { version = "0.21", optional = true }

src/poll/cursor.rs

Lines changed: 241 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -174,47 +174,7 @@ pub(super) async fn poll_cursor_pagination(
174174
let record_url = response.url().clone();
175175
let record_status = response.status().as_u16();
176176
let record_headers = response.headers().clone();
177-
let body_bytes = read_body_with_limit(response, source.max_response_bytes).await?;
178-
if let Some(ref rs) = record_state {
179-
rs.save(
180-
source_id,
181-
record_url.as_str(),
182-
record_status,
183-
&record_headers,
184-
&body_bytes,
185-
)?;
186-
}
187-
if !status.is_success() {
188-
let body_lossy = String::from_utf8_lossy(&body_bytes);
189-
if cursor.is_some() && status.as_u16() >= 400 && status.as_u16() < 500 {
190-
let lower = body_lossy.to_lowercase();
191-
let is_expired = status.as_u16() == 410
192-
|| lower.contains("expired")
193-
|| lower.contains("invalid cursor")
194-
|| lower.contains("cursor invalid");
195-
if is_expired && source.on_cursor_error == Some(CursorExpiredBehavior::Reset) {
196-
store_set_or_skip(&store, source_id, source, global, "cursor", "").await?;
197-
tracing::warn!(
198-
source = %source_id,
199-
"cursor expired (4xx), reset; next poll from start"
200-
);
201-
return Ok(());
202-
}
203-
}
204-
anyhow::bail!("http {} {}", status, body_lossy);
205-
}
206-
total_bytes += body_bytes.len() as u64;
207-
if let Some(limit) = max_bytes
208-
&& total_bytes > limit
209-
{
210-
tracing::warn!(
211-
source = %source_id,
212-
total_bytes,
213-
max_bytes = limit,
214-
"reached max_bytes limit, stopping cursor pagination"
215-
);
216-
break;
217-
}
177+
let mut response = Some(response);
218178
let mut next_cursor: Option<String> = None;
219179
let mut event_count = 0usize;
220180
let mut emitted_count = 0u64;
@@ -223,64 +183,261 @@ pub(super) async fn poll_cursor_pagination(
223183
#[cfg(feature = "streaming")]
224184
if source.response_streaming.is_some() {
225185
use super::streaming;
226-
let parse_result = match streaming::parse_streaming(&body_bytes, source) {
227-
Ok(r) => r,
228-
Err(e) => {
229-
if source.on_parse_error == Some(OnParseErrorBehavior::Skip) {
230-
tracing::warn!(source = %source_id, error = %e, "parse error, stopping cursor pagination");
231-
return Ok(());
186+
use crate::config::StreamingMode;
187+
188+
let use_full = source.response_streaming == Some(StreamingMode::Full)
189+
&& source.on_invalid_utf8.is_none();
190+
191+
if use_full {
192+
let resp = response.take().unwrap();
193+
if !status.is_success() {
194+
let body_bytes = read_body_with_limit(resp, source.max_response_bytes).await?;
195+
if let Some(ref rs) = record_state {
196+
rs.save(
197+
source_id,
198+
record_url.as_str(),
199+
record_status,
200+
&record_headers,
201+
&body_bytes,
202+
)?;
232203
}
233-
return Err(e).context("streaming parse");
204+
let body_lossy = String::from_utf8_lossy(&body_bytes);
205+
if cursor.is_some() && status.as_u16() >= 400 && status.as_u16() < 500 {
206+
let lower = body_lossy.to_lowercase();
207+
let is_expired = status.as_u16() == 410
208+
|| lower.contains("expired")
209+
|| lower.contains("invalid cursor")
210+
|| lower.contains("cursor invalid");
211+
if is_expired
212+
&& source.on_cursor_error == Some(CursorExpiredBehavior::Reset)
213+
{
214+
store_set_or_skip(&store, source_id, source, global, "cursor", "")
215+
.await?;
216+
tracing::warn!(
217+
source = %source_id,
218+
"cursor expired (4xx), reset; next poll from start"
219+
);
220+
return Ok(());
221+
}
222+
}
223+
anyhow::bail!("http {} {}", status, body_lossy);
234224
}
235-
};
236-
next_cursor =
237-
json_path_str(parse_result.metadata(), cursor_path).filter(|s| !s.is_empty());
238-
let obj_path = source.response_event_object_path.as_deref();
239-
for result in parse_result.iter(&body_bytes) {
240-
let event_value = match result {
241-
Ok(v) => v,
225+
226+
let tee_path = record_state.as_ref().map(|_| {
227+
std::env::temp_dir().join(format!(
228+
"helr-tee-{}-{}.bin",
229+
std::process::id(),
230+
source_id.replace(|c: char| !c.is_ascii_alphanumeric(), "_")
231+
))
232+
});
233+
let (mut event_rx, meta_rx, join_handle) = streaming::stream_and_parse(
234+
resp,
235+
source.response_events_path.clone(),
236+
source.max_response_bytes,
237+
tee_path.clone(),
238+
)
239+
.await?;
240+
241+
let obj_path = source.response_event_object_path.as_deref();
242+
while let Some(result) = event_rx.recv().await {
243+
let event_value = match result {
244+
Ok(v) => v,
245+
Err(e) => {
246+
if source.on_parse_error == Some(OnParseErrorBehavior::Skip) {
247+
tracing::warn!(source = %source_id, error = %e, "parse error, stopping cursor pagination");
248+
return Ok(());
249+
}
250+
return Err(e).context("streaming parse element");
251+
}
252+
};
253+
let event_value = match streaming::unwrap_event_object(event_value, obj_path) {
254+
Some(v) => v,
255+
None => continue,
256+
};
257+
event_count += 1;
258+
if let Some(ref inc) = source.incremental_from {
259+
update_max_timestamp_single(
260+
&mut incremental_max_ts,
261+
&event_value,
262+
&inc.event_timestamp_path,
263+
);
264+
}
265+
if let Some(ref st) = source.state {
266+
update_max_timestamp_single(
267+
&mut watermark_max_ts,
268+
&event_value,
269+
&st.watermark_field,
270+
);
271+
}
272+
if let Some(d) = &source.dedupe {
273+
let id = event_id(&event_value, &d.id_path).unwrap_or_default();
274+
if dedupe::seen_and_add(&dedupe_store, source_id, id, d.capacity).await {
275+
continue;
276+
}
277+
}
278+
total_events += 1;
279+
emitted_count += 1;
280+
let emitted = build_emitted_event(source, source_id, &path, event_value);
281+
emit_event_line(global, source_id, source, &event_sink, &emitted)?;
282+
}
283+
284+
join_handle
285+
.await
286+
.map_err(|e| anyhow::anyhow!("streaming parser panicked: {}", e))??;
287+
288+
let metadata = meta_rx
289+
.await
290+
.map_err(|_| anyhow::anyhow!("metadata channel closed"))?
291+
.context("parse metadata")?;
292+
next_cursor = json_path_str(&metadata, cursor_path).filter(|s| !s.is_empty());
293+
294+
if let (Some(rs), Some(tp)) = (&record_state, &tee_path) {
295+
let body = std::fs::read(tp).context("read tee file for recording")?;
296+
rs.save(
297+
source_id,
298+
record_url.as_str(),
299+
record_status,
300+
&record_headers,
301+
&body,
302+
)?;
303+
let _ = std::fs::remove_file(tp);
304+
}
305+
_streamed = true;
306+
} else {
307+
let resp = response.take().unwrap();
308+
let body_bytes = read_body_with_limit(resp, source.max_response_bytes).await?;
309+
if let Some(ref rs) = record_state {
310+
rs.save(
311+
source_id,
312+
record_url.as_str(),
313+
record_status,
314+
&record_headers,
315+
&body_bytes,
316+
)?;
317+
}
318+
if !status.is_success() {
319+
let body_lossy = String::from_utf8_lossy(&body_bytes);
320+
if cursor.is_some() && status.as_u16() >= 400 && status.as_u16() < 500 {
321+
let lower = body_lossy.to_lowercase();
322+
let is_expired = status.as_u16() == 410
323+
|| lower.contains("expired")
324+
|| lower.contains("invalid cursor")
325+
|| lower.contains("cursor invalid");
326+
if is_expired
327+
&& source.on_cursor_error == Some(CursorExpiredBehavior::Reset)
328+
{
329+
store_set_or_skip(&store, source_id, source, global, "cursor", "")
330+
.await?;
331+
tracing::warn!(
332+
source = %source_id,
333+
"cursor expired (4xx), reset; next poll from start"
334+
);
335+
return Ok(());
336+
}
337+
}
338+
anyhow::bail!("http {} {}", status, body_lossy);
339+
}
340+
let parse_result = match streaming::parse_streaming(&body_bytes, source) {
341+
Ok(r) => r,
242342
Err(e) => {
243343
if source.on_parse_error == Some(OnParseErrorBehavior::Skip) {
244344
tracing::warn!(source = %source_id, error = %e, "parse error, stopping cursor pagination");
245345
return Ok(());
246346
}
247-
return Err(e).context("parse event element");
347+
return Err(e).context("streaming parse");
248348
}
249349
};
250-
let event_value = match streaming::unwrap_event_object(event_value, obj_path) {
251-
Some(v) => v,
252-
None => continue,
253-
};
254-
event_count += 1;
255-
if let Some(ref inc) = source.incremental_from {
256-
update_max_timestamp_single(
257-
&mut incremental_max_ts,
258-
&event_value,
259-
&inc.event_timestamp_path,
260-
);
261-
}
262-
if let Some(ref st) = source.state {
263-
update_max_timestamp_single(
264-
&mut watermark_max_ts,
265-
&event_value,
266-
&st.watermark_field,
267-
);
268-
}
269-
if let Some(d) = &source.dedupe {
270-
let id = event_id(&event_value, &d.id_path).unwrap_or_default();
271-
if dedupe::seen_and_add(&dedupe_store, source_id, id, d.capacity).await {
272-
continue;
350+
next_cursor =
351+
json_path_str(parse_result.metadata(), cursor_path).filter(|s| !s.is_empty());
352+
let obj_path = source.response_event_object_path.as_deref();
353+
for result in parse_result.iter(&body_bytes) {
354+
let event_value = match result {
355+
Ok(v) => v,
356+
Err(e) => {
357+
if source.on_parse_error == Some(OnParseErrorBehavior::Skip) {
358+
tracing::warn!(source = %source_id, error = %e, "parse error, stopping cursor pagination");
359+
return Ok(());
360+
}
361+
return Err(e).context("parse event element");
362+
}
363+
};
364+
let event_value = match streaming::unwrap_event_object(event_value, obj_path) {
365+
Some(v) => v,
366+
None => continue,
367+
};
368+
event_count += 1;
369+
if let Some(ref inc) = source.incremental_from {
370+
update_max_timestamp_single(
371+
&mut incremental_max_ts,
372+
&event_value,
373+
&inc.event_timestamp_path,
374+
);
375+
}
376+
if let Some(ref st) = source.state {
377+
update_max_timestamp_single(
378+
&mut watermark_max_ts,
379+
&event_value,
380+
&st.watermark_field,
381+
);
273382
}
383+
if let Some(d) = &source.dedupe {
384+
let id = event_id(&event_value, &d.id_path).unwrap_or_default();
385+
if dedupe::seen_and_add(&dedupe_store, source_id, id, d.capacity).await {
386+
continue;
387+
}
388+
}
389+
total_events += 1;
390+
emitted_count += 1;
391+
let emitted = build_emitted_event(source, source_id, &path, event_value);
392+
emit_event_line(global, source_id, source, &event_sink, &emitted)?;
274393
}
275-
total_events += 1;
276-
emitted_count += 1;
277-
let emitted = build_emitted_event(source, source_id, &path, event_value);
278-
emit_event_line(global, source_id, source, &event_sink, &emitted)?;
394+
_streamed = true;
279395
}
280-
_streamed = true;
281396
}
282397

283398
if !_streamed {
399+
let resp = response.take().unwrap();
400+
let body_bytes = read_body_with_limit(resp, source.max_response_bytes).await?;
401+
if let Some(ref rs) = record_state {
402+
rs.save(
403+
source_id,
404+
record_url.as_str(),
405+
record_status,
406+
&record_headers,
407+
&body_bytes,
408+
)?;
409+
}
410+
if !status.is_success() {
411+
let body_lossy = String::from_utf8_lossy(&body_bytes);
412+
if cursor.is_some() && status.as_u16() >= 400 && status.as_u16() < 500 {
413+
let lower = body_lossy.to_lowercase();
414+
let is_expired = status.as_u16() == 410
415+
|| lower.contains("expired")
416+
|| lower.contains("invalid cursor")
417+
|| lower.contains("cursor invalid");
418+
if is_expired && source.on_cursor_error == Some(CursorExpiredBehavior::Reset) {
419+
store_set_or_skip(&store, source_id, source, global, "cursor", "").await?;
420+
tracing::warn!(
421+
source = %source_id,
422+
"cursor expired (4xx), reset; next poll from start"
423+
);
424+
return Ok(());
425+
}
426+
}
427+
anyhow::bail!("http {} {}", status, body_lossy);
428+
}
429+
total_bytes += body_bytes.len() as u64;
430+
if let Some(limit) = max_bytes
431+
&& total_bytes > limit
432+
{
433+
tracing::warn!(
434+
source = %source_id,
435+
total_bytes,
436+
max_bytes = limit,
437+
"reached max_bytes limit, stopping cursor pagination"
438+
);
439+
break;
440+
}
284441
let value: serde_json::Value = match source.on_invalid_utf8 {
285442
Some(InvalidUtf8Behavior::Replace) | Some(InvalidUtf8Behavior::Escape) => {
286443
let body = bytes_to_string(&body_bytes, source.on_invalid_utf8)?;

0 commit comments

Comments
 (0)