Skip to content

Commit f25ef6a

Browse files
committed
feat(meta): support periodic refresh run for batch refresh job
1 parent fdabdcd commit f25ef6a

10 files changed

Lines changed: 822 additions & 65 deletions

File tree

src/meta/src/barrier/checkpoint/control.rs

Lines changed: 145 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::collections::hash_map::Entry;
1616
use std::collections::{HashMap, HashSet};
1717
use std::future::{Future, poll_fn};
1818
use std::ops::Bound::{Excluded, Unbounded};
19+
use std::sync::atomic::AtomicU32;
1920
use std::task::Poll;
2021

2122
use anyhow::anyhow;
@@ -38,7 +39,9 @@ use risingwave_pb::stream_service::streaming_control_stream_response::ResetParti
3839
use tracing::{debug, warn};
3940

4041
use crate::barrier::cdc_progress::CdcProgress;
41-
use crate::barrier::checkpoint::independent_job::IndependentCheckpointJobControl;
42+
use crate::barrier::checkpoint::independent_job::{
43+
BatchRefreshJobTriggerContext, IndependentCheckpointJobControl,
44+
};
4245
use crate::barrier::checkpoint::recovery::{
4346
DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
4447
RecoveringStateAction,
@@ -482,11 +485,83 @@ impl CheckpointControl {
482485
},
483486
)
484487
}
488+
489+
// ── Batch refresh trigger helpers (delegating to DatabaseCheckpointControl) ──
490+
491+
pub(crate) fn get_batch_refresh_trigger_info(
492+
&self,
493+
database_id: DatabaseId,
494+
job_id: JobId,
495+
) -> u64 {
496+
let database = self
497+
.databases
498+
.get(&database_id)
499+
.and_then(|s| s.running_state())
500+
.expect("database should be running for batch refresh trigger");
501+
database.get_batch_refresh_trigger_info(job_id)
502+
}
503+
504+
pub(crate) fn start_batch_refresh_run(
505+
&mut self,
506+
database_id: DatabaseId,
507+
job_id: JobId,
508+
context: &BatchRefreshJobTriggerContext,
509+
worker_nodes: &HashMap<WorkerId, WorkerNode>,
510+
actor_id_counter: &AtomicU32,
511+
adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
512+
partial_graph_manager: &mut PartialGraphManager,
513+
) -> MetaResult<bool> {
514+
let database = self
515+
.databases
516+
.get_mut(&database_id)
517+
.and_then(|s| s.running_state_mut())
518+
.expect("database should be running");
519+
database.start_batch_refresh_run(
520+
job_id,
521+
context,
522+
worker_nodes,
523+
actor_id_counter,
524+
adaptive_parallelism_strategy,
525+
partial_graph_manager,
526+
)
527+
}
528+
529+
pub(crate) fn apply_batch_refresh_fragment_infos(
530+
&mut self,
531+
database_id: DatabaseId,
532+
job_id: JobId,
533+
) {
534+
let database = self
535+
.databases
536+
.get_mut(&database_id)
537+
.and_then(|s| s.running_state_mut())
538+
.expect("database should be running");
539+
let br_job = match database
540+
.independent_checkpoint_job_controls
541+
.get(&job_id)
542+
.expect("job should exist")
543+
{
544+
IndependentCheckpointJobControl::BatchRefresh(job) => job,
545+
_ => panic!("expected batch refresh job"),
546+
};
547+
if let Some(fragment_infos) = br_job.fragment_infos() {
548+
database
549+
.database_info
550+
.shared_actor_infos
551+
.upsert(database_id, fragment_infos.values().map(|f| (f, job_id)));
552+
}
553+
}
485554
}
486555

487556
pub(crate) enum CheckpointControlEvent<'a> {
488557
EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
489558
EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
559+
/// A batch refresh job is idle and its upstream has advanced past the refresh interval.
560+
/// Carries owned values so the async handler can call into context without borrowing self.
561+
BatchRefreshTrigger {
562+
database_id: DatabaseId,
563+
job_id: JobId,
564+
},
490565
}
491566

492567
impl CheckpointControl {
@@ -504,7 +579,12 @@ impl CheckpointControl {
504579
.remove(&independent_job_id)
505580
{
506581
Some(independent_job) => {
507-
independent_job.on_partial_graph_reset();
582+
if let Some(job) = independent_job.on_partial_graph_reset() {
583+
// Batch refresh job transitioning to idle — re-insert.
584+
database
585+
.independent_checkpoint_job_controls
586+
.insert(independent_job_id, job);
587+
}
508588
}
509589
None => {
510590
if cfg!(debug_assertions) {
@@ -551,7 +631,25 @@ impl CheckpointControl {
551631
};
552632
for (&database_id, database_status) in &mut this_mut.databases {
553633
match database_status {
554-
DatabaseCheckpointControlStatus::Running(_) => {}
634+
DatabaseCheckpointControlStatus::Running(database) => {
635+
// Check if any idle batch refresh job should start a refresh run.
636+
if let Some(committed_epoch) = database.committed_epoch {
637+
for (job_id, job) in &database.independent_checkpoint_job_controls {
638+
if let IndependentCheckpointJobControl::BatchRefresh(br_job) = job
639+
&& br_job.should_start_refresh(committed_epoch)
640+
{
641+
let job_id = *job_id;
642+
let _ = this.take().expect("checked Some");
643+
return Poll::Ready(
644+
CheckpointControlEvent::BatchRefreshTrigger {
645+
database_id,
646+
job_id,
647+
},
648+
);
649+
}
650+
}
651+
}
652+
}
555653
DatabaseCheckpointControlStatus::Recovering(state) => {
556654
let poll_result = state.poll_next_event(cx);
557655
if let Poll::Ready(action) = poll_result {
@@ -1284,4 +1382,48 @@ impl DatabaseCheckpointControl {
12841382

12851383
Ok(())
12861384
}
1385+
1386+
// ── Batch refresh trigger helpers ────────────────────────────────────────
1387+
1388+
/// Get the last committed epoch for a batch refresh job.
1389+
pub(crate) fn get_batch_refresh_trigger_info(&self, job_id: JobId) -> u64 {
1390+
let job = self
1391+
.independent_checkpoint_job_controls
1392+
.get(&job_id)
1393+
.expect("batch refresh job should exist");
1394+
match job {
1395+
IndependentCheckpointJobControl::BatchRefresh(br_job) => br_job
1396+
.last_committed_epoch()
1397+
.expect("idle job must have a last_committed_epoch"),
1398+
_ => panic!("job {} should be a batch refresh job", job_id),
1399+
}
1400+
}
1401+
1402+
/// Whether the batch refresh job already has its cached context populated.
1403+
/// Start a batch refresh logstore consumption run.
1404+
/// Returns true if a run was started, false if no log epochs to consume.
1405+
pub(crate) fn start_batch_refresh_run(
1406+
&mut self,
1407+
job_id: JobId,
1408+
context: &BatchRefreshJobTriggerContext,
1409+
worker_nodes: &HashMap<WorkerId, WorkerNode>,
1410+
actor_id_counter: &AtomicU32,
1411+
adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
1412+
partial_graph_manager: &mut PartialGraphManager,
1413+
) -> MetaResult<bool> {
1414+
let job = self
1415+
.independent_checkpoint_job_controls
1416+
.get_mut(&job_id)
1417+
.expect("batch refresh job should exist");
1418+
match job {
1419+
IndependentCheckpointJobControl::BatchRefresh(br_job) => br_job.start_refresh_run(
1420+
context,
1421+
worker_nodes,
1422+
actor_id_counter,
1423+
adaptive_parallelism_strategy,
1424+
partial_graph_manager,
1425+
),
1426+
_ => panic!("job {} should be a batch refresh job", job_id),
1427+
}
1428+
}
12871429
}

0 commit comments

Comments
 (0)