Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/meta/model/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ mod m20251224_142321_sink_schema_change;
mod m20251231_000000_sink_ignore_delete;
mod m20260119_153927_streaming_job_is_serverless_backfill;
mod m20260120_120000_streaming_job_backfill_orders;
mod m20260317_000000_streaming_job_batch_refresh_seconds;
mod utils;

pub struct Migrator;
Expand Down Expand Up @@ -174,6 +175,7 @@ impl MigratorTrait for Migrator {
Box::new(m20251231_000000_sink_ignore_delete::Migration),
Box::new(m20260119_153927_streaming_job_is_serverless_backfill::Migration),
Box::new(m20260120_120000_streaming_job_backfill_orders::Migration),
Box::new(m20260317_000000_streaming_job_batch_refresh_seconds::Migration),
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(StreamingJob::Table)
.add_column(
ColumnDef::new(StreamingJob::RefreshIntervalSec)
Copy link
Copy Markdown
Contributor

@chenzl25 chenzl25 Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we record the refresh interval sec in src/meta/model/migration/src/m20251030_120000_refresh_jobs.rs ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.big_integer()
.null(),
)
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(StreamingJob::Table)
.drop_column(StreamingJob::RefreshIntervalSec)
.to_owned(),
)
.await
}
}

#[derive(DeriveIden)]
enum StreamingJob {
Table,
RefreshIntervalSec,
}
1 change: 1 addition & 0 deletions src/meta/model/src/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct Model {
pub max_parallelism: i32,
pub specific_resource_group: Option<String>,
pub is_serverless_backfill: bool,
pub refresh_interval_sec: Option<i64>,
}

// This data structure contains an adjacency list of backfill nodes.
Expand Down
7 changes: 7 additions & 0 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ impl DdlService for DdlServiceImpl {
dependencies: HashSet::new(),
resource_type: Self::default_streaming_job_resource_type(),
if_not_exists: req.if_not_exists,
refresh_interval_sec: None,
})
.await?;
Ok(Response::new(CreateSourceResponse {
Expand Down Expand Up @@ -457,6 +458,7 @@ impl DdlService for DdlServiceImpl {
dependencies,
resource_type: Self::default_streaming_job_resource_type(),
if_not_exists: req.if_not_exists,
refresh_interval_sec: None,
};

let version = self.ddl_controller.run_command(command).await?;
Expand Down Expand Up @@ -550,6 +552,7 @@ impl DdlService for DdlServiceImpl {
dependencies,
resource_type,
if_not_exists: req.if_not_exists,
refresh_interval_sec: None,
})
.await?;

Expand Down Expand Up @@ -603,6 +606,7 @@ impl DdlService for DdlServiceImpl {
dependencies: HashSet::new(),
resource_type: Self::default_streaming_job_resource_type(),
if_not_exists: req.if_not_exists,
refresh_interval_sec: None,
})
.await?;

Expand Down Expand Up @@ -693,6 +697,7 @@ impl DdlService for DdlServiceImpl {
dependencies,
resource_type: Self::default_streaming_job_resource_type(),
if_not_exists: request.if_not_exists,
refresh_interval_sec: None,
})
.await?;

Expand Down Expand Up @@ -1623,6 +1628,7 @@ impl DdlService for DdlServiceImpl {
dependencies: HashSet::new(),
resource_type: Self::default_streaming_job_resource_type(),
if_not_exists,
refresh_interval_sec: None,
})
.await?;

Expand Down Expand Up @@ -1696,6 +1702,7 @@ impl DdlService for DdlServiceImpl {
dependencies,
resource_type: Self::default_streaming_job_resource_type(),
if_not_exists,
refresh_interval_sec: None,
})
.await;

Expand Down
61 changes: 41 additions & 20 deletions src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,9 @@ impl CheckpointControl {
);
// Progress of independent checkpoint jobs
for (job_id, job) in &database_checkpoint_control.independent_checkpoint_job_controls {
progress.extend([(*job_id, job.gen_backfill_progress())]);
if let Some(p) = job.gen_backfill_progress() {
progress.insert(*job_id, p);
}
}
}
progress
Expand Down Expand Up @@ -896,22 +898,38 @@ impl DatabaseCheckpointControl {
.first_inflight_barrier(self.partial_graph_id)
.map(|epoch| epoch.prev);
for (job_id, job) in &mut self.independent_checkpoint_job_controls {
let IndependentCheckpointJobControl::CreatingStreamingJob(job) = job;
if let Some((epoch, resps, info, is_finish_epoch)) = job.start_completing(
partial_graph_manager,
min_upstream_inflight_barrier,
committed_epoch,
) {
let resps = resps.into_values().collect_vec();
if is_finish_epoch {
assert!(info.notifiers.is_empty());
finished_jobs.push((*job_id, epoch, resps));
continue;
};
independent_jobs_task.push((*job_id, epoch, resps, info));
match job {
IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) => {
if let Some((epoch, resps, info, is_finish_epoch)) = creating_job
.start_completing(
partial_graph_manager,
min_upstream_inflight_barrier,
committed_epoch,
)
{
let resps = resps.into_values().collect_vec();
if is_finish_epoch {
assert!(info.notifiers.is_empty());
finished_jobs.push((*job_id, epoch, resps));
continue;
};
independent_jobs_task.push((*job_id, epoch, resps, info));
}
}
IndependentCheckpointJobControl::BatchRefresh(batch_refresh_job) => {
if let Some((epoch, resps, info, tracking_job)) =
batch_refresh_job.start_completing(partial_graph_manager)
{
let resps = resps.into_values().collect_vec();
if let Some(tracking_job) = tracking_job {
let task = task.get_or_insert_default();
task.finished_jobs.push(tracking_job);
}
independent_jobs_task.push((*job_id, epoch, resps, info));
}
}
}
}

if !finished_jobs.is_empty() {
partial_graph_manager.remove_partial_graphs(
finished_jobs
Expand All @@ -924,10 +942,12 @@ impl DatabaseCheckpointControl {
debug!(epoch, %job_id, "finish creating job");
// It's safe to remove the creating job, because on CompleteJobType::Finished,
// all previous barriers have been collected and completed.
let IndependentCheckpointJobControl::CreatingStreamingJob(creating_streaming_job) =
self.independent_checkpoint_job_controls
.remove(&job_id)
.expect("should exist");
let Some(IndependentCheckpointJobControl::CreatingStreamingJob(
creating_streaming_job,
)) = self.independent_checkpoint_job_controls.remove(&job_id)
else {
panic!("finished job {job_id} should be a creating streaming job");
};
let tracking_job = creating_streaming_job.into_tracking_job();
self.finishing_jobs_collector
.collect(epoch, job_id, (resps, tracking_job));
Expand Down Expand Up @@ -1213,7 +1233,8 @@ impl DatabaseCheckpointControl {
};

if let Some(Command::CreateStreamingJob {
job_type: CreateStreamingJobType::SnapshotBackfill(_),
job_type:
CreateStreamingJobType::SnapshotBackfill(_) | CreateStreamingJobType::BatchRefresh(_),
..
}) = &command
&& self.state.is_paused()
Expand Down
Loading
Loading