Skip to content

Commit 1e59b8e

Browse files
committed
feat(meta): support consuming snapshot of batch refreshable job (meta part)
1 parent f301bf3 commit 1e59b8e

26 files changed

Lines changed: 1588 additions & 105 deletions

File tree

src/meta/model/migration/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ mod m20251224_142321_sink_schema_change;
6868
mod m20251231_000000_sink_ignore_delete;
6969
mod m20260119_153927_streaming_job_is_serverless_backfill;
7070
mod m20260120_120000_streaming_job_backfill_orders;
71+
mod m20260317_000000_streaming_job_batch_refresh_seconds;
7172
mod utils;
7273

7374
pub struct Migrator;
@@ -174,6 +175,7 @@ impl MigratorTrait for Migrator {
174175
Box::new(m20251231_000000_sink_ignore_delete::Migration),
175176
Box::new(m20260119_153927_streaming_job_is_serverless_backfill::Migration),
176177
Box::new(m20260120_120000_streaming_job_backfill_orders::Migration),
178+
Box::new(m20260317_000000_streaming_job_batch_refresh_seconds::Migration),
177179
]
178180
}
179181
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use sea_orm_migration::prelude::*;
2+
3+
#[derive(DeriveMigrationName)]
4+
pub struct Migration;
5+
6+
#[async_trait::async_trait]
7+
impl MigrationTrait for Migration {
8+
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
9+
manager
10+
.alter_table(
11+
Table::alter()
12+
.table(StreamingJob::Table)
13+
.add_column(
14+
ColumnDef::new(StreamingJob::RefreshIntervalSec)
15+
.big_integer()
16+
.null(),
17+
)
18+
.to_owned(),
19+
)
20+
.await
21+
}
22+
23+
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
24+
manager
25+
.alter_table(
26+
Table::alter()
27+
.table(StreamingJob::Table)
28+
.drop_column(StreamingJob::RefreshIntervalSec)
29+
.to_owned(),
30+
)
31+
.await
32+
}
33+
}
34+
35+
#[derive(DeriveIden)]
36+
enum StreamingJob {
37+
Table,
38+
RefreshIntervalSec,
39+
}

src/meta/model/src/streaming_job.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub struct Model {
4040
pub max_parallelism: i32,
4141
pub specific_resource_group: Option<String>,
4242
pub is_serverless_backfill: bool,
43+
pub refresh_interval_sec: Option<i64>,
4344
}
4445

4546
// This data structure contains an adjacency list of backfill nodes.

src/meta/service/src/ddl_service.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ impl DdlService for DdlServiceImpl {
385385
dependencies: HashSet::new(),
386386
resource_type: Self::default_streaming_job_resource_type(),
387387
if_not_exists: req.if_not_exists,
388+
refresh_interval_sec: None,
388389
})
389390
.await?;
390391
Ok(Response::new(CreateSourceResponse {
@@ -457,6 +458,7 @@ impl DdlService for DdlServiceImpl {
457458
dependencies,
458459
resource_type: Self::default_streaming_job_resource_type(),
459460
if_not_exists: req.if_not_exists,
461+
refresh_interval_sec: None,
460462
};
461463

462464
let version = self.ddl_controller.run_command(command).await?;
@@ -550,6 +552,7 @@ impl DdlService for DdlServiceImpl {
550552
dependencies,
551553
resource_type,
552554
if_not_exists: req.if_not_exists,
555+
refresh_interval_sec: None,
553556
})
554557
.await?;
555558

@@ -603,6 +606,7 @@ impl DdlService for DdlServiceImpl {
603606
dependencies: HashSet::new(),
604607
resource_type: Self::default_streaming_job_resource_type(),
605608
if_not_exists: req.if_not_exists,
609+
refresh_interval_sec: None,
606610
})
607611
.await?;
608612

@@ -693,6 +697,7 @@ impl DdlService for DdlServiceImpl {
693697
dependencies,
694698
resource_type: Self::default_streaming_job_resource_type(),
695699
if_not_exists: request.if_not_exists,
700+
refresh_interval_sec: None,
696701
})
697702
.await?;
698703

@@ -1623,6 +1628,7 @@ impl DdlService for DdlServiceImpl {
16231628
dependencies: HashSet::new(),
16241629
resource_type: Self::default_streaming_job_resource_type(),
16251630
if_not_exists,
1631+
refresh_interval_sec: None,
16261632
})
16271633
.await?;
16281634

@@ -1696,6 +1702,7 @@ impl DdlService for DdlServiceImpl {
16961702
dependencies,
16971703
resource_type: Self::default_streaming_job_resource_type(),
16981704
if_not_exists,
1705+
refresh_interval_sec: None,
16991706
})
17001707
.await;
17011708

src/meta/src/barrier/checkpoint/control.rs

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,9 @@ impl CheckpointControl {
416416
);
417417
// Progress of independent checkpoint jobs
418418
for (job_id, job) in &database_checkpoint_control.independent_checkpoint_job_controls {
419-
progress.extend([(*job_id, job.gen_backfill_progress())]);
419+
if let Some(p) = job.gen_backfill_progress() {
420+
progress.insert(*job_id, p);
421+
}
420422
}
421423
}
422424
progress
@@ -896,22 +898,38 @@ impl DatabaseCheckpointControl {
896898
.first_inflight_barrier(self.partial_graph_id)
897899
.map(|epoch| epoch.prev);
898900
for (job_id, job) in &mut self.independent_checkpoint_job_controls {
899-
let IndependentCheckpointJobControl::CreatingStreamingJob(job) = job;
900-
if let Some((epoch, resps, info, is_finish_epoch)) = job.start_completing(
901-
partial_graph_manager,
902-
min_upstream_inflight_barrier,
903-
committed_epoch,
904-
) {
905-
let resps = resps.into_values().collect_vec();
906-
if is_finish_epoch {
907-
assert!(info.notifiers.is_empty());
908-
finished_jobs.push((*job_id, epoch, resps));
909-
continue;
910-
};
911-
independent_jobs_task.push((*job_id, epoch, resps, info));
901+
match job {
902+
IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) => {
903+
if let Some((epoch, resps, info, is_finish_epoch)) = creating_job
904+
.start_completing(
905+
partial_graph_manager,
906+
min_upstream_inflight_barrier,
907+
committed_epoch,
908+
)
909+
{
910+
let resps = resps.into_values().collect_vec();
911+
if is_finish_epoch {
912+
assert!(info.notifiers.is_empty());
913+
finished_jobs.push((*job_id, epoch, resps));
914+
continue;
915+
};
916+
independent_jobs_task.push((*job_id, epoch, resps, info));
917+
}
918+
}
919+
IndependentCheckpointJobControl::BatchRefresh(batch_refresh_job) => {
920+
if let Some((epoch, resps, info, tracking_job)) =
921+
batch_refresh_job.start_completing(partial_graph_manager)
922+
{
923+
let resps = resps.into_values().collect_vec();
924+
if let Some(tracking_job) = tracking_job {
925+
let task = task.get_or_insert_default();
926+
task.finished_jobs.push(tracking_job);
927+
}
928+
independent_jobs_task.push((*job_id, epoch, resps, info));
929+
}
930+
}
912931
}
913932
}
914-
915933
if !finished_jobs.is_empty() {
916934
partial_graph_manager.remove_partial_graphs(
917935
finished_jobs
@@ -924,10 +942,12 @@ impl DatabaseCheckpointControl {
924942
debug!(epoch, %job_id, "finish creating job");
925943
// It's safe to remove the creating job, because on CompleteJobType::Finished,
926944
// all previous barriers have been collected and completed.
927-
let IndependentCheckpointJobControl::CreatingStreamingJob(creating_streaming_job) =
928-
self.independent_checkpoint_job_controls
929-
.remove(&job_id)
930-
.expect("should exist");
945+
let Some(IndependentCheckpointJobControl::CreatingStreamingJob(
946+
creating_streaming_job,
947+
)) = self.independent_checkpoint_job_controls.remove(&job_id)
948+
else {
949+
panic!("finished job {job_id} should be a creating streaming job");
950+
};
931951
let tracking_job = creating_streaming_job.into_tracking_job();
932952
self.finishing_jobs_collector
933953
.collect(epoch, job_id, (resps, tracking_job));
@@ -1213,7 +1233,8 @@ impl DatabaseCheckpointControl {
12131233
};
12141234

12151235
if let Some(Command::CreateStreamingJob {
1216-
job_type: CreateStreamingJobType::SnapshotBackfill(_),
1236+
job_type:
1237+
CreateStreamingJobType::SnapshotBackfill(_) | CreateStreamingJobType::BatchRefresh(_),
12171238
..
12181239
}) = &command
12191240
&& self.state.is_paused()

0 commit comments

Comments
 (0)