Skip to content

Commit 09b8dbd

Browse files
committed
refactor(meta): extract independent ckpt job enum
1 parent 59783d9 commit 09b8dbd

11 files changed

Lines changed: 392 additions & 263 deletions

File tree

src/meta/src/barrier/checkpoint/control.rs

Lines changed: 164 additions & 84 deletions
Large diffs are not rendered by default.

src/meta/src/barrier/checkpoint/creating_job/barrier_control.rs renamed to src/meta/src/barrier/checkpoint/independent_job/creating_job/barrier_control.rs

File renamed without changes.

src/meta/src/barrier/checkpoint/creating_job/mod.rs renamed to src/meta/src/barrier/checkpoint/independent_job/creating_job/mod.rs

Lines changed: 29 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,23 @@ use risingwave_common::catalog::{DatabaseId, TableId};
2525
use risingwave_common::id::JobId;
2626
use risingwave_common::metrics::LabelGuardedIntGauge;
2727
use risingwave_common::util::epoch::Epoch;
28-
use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29-
use risingwave_meta_model::{DispatcherType, WorkerId};
28+
use risingwave_meta_model::WorkerId;
3029
use risingwave_pb::ddl_service::PbBackfillType;
3130
use risingwave_pb::hummock::HummockVersionStats;
3231
use risingwave_pb::id::{ActorId, FragmentId, PartialGraphId};
3332
use risingwave_pb::stream_plan::barrier::PbBarrierKind;
3433
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
35-
use risingwave_pb::stream_plan::stream_node::NodeBody;
3634
use risingwave_pb::stream_plan::{AddMutation, StopMutation};
3735
use risingwave_pb::stream_service::BarrierCompleteResponse;
3836
use status::CreatingStreamingJobStatus;
3937
use tracing::{debug, info};
4038

41-
use super::state::RenderResult;
39+
use super::super::state::RenderResult;
40+
use super::IndependentCheckpointJobControl;
4241
use crate::MetaResult;
4342
use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
44-
use crate::barrier::checkpoint::creating_job::barrier_control::CreatingStreamingJobBarrierStats;
45-
use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
43+
use crate::barrier::checkpoint::independent_job::creating_job::barrier_control::CreatingStreamingJobBarrierStats;
44+
use crate::barrier::checkpoint::independent_job::creating_job::status::CreateMviewLogStoreProgressTracker;
4645
use crate::barrier::command::PostCollectCommand;
4746
use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
4847
use crate::barrier::edge_builder::FragmentEdgeBuildResult;
@@ -88,36 +87,9 @@ pub(crate) struct CreatingStreamingJobControl {
8887
upstream_lag: LabelGuardedIntGauge,
8988
}
9089

91-
fn fragment_has_online_unreschedulable_scan(fragment: &InflightFragmentInfo) -> bool {
92-
let mut has_unreschedulable_scan = false;
93-
visit_stream_node_cont(&fragment.nodes, |node| {
94-
if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
95-
let scan_type = stream_scan.stream_scan_type();
96-
if !scan_type.is_reschedulable(true) {
97-
has_unreschedulable_scan = true;
98-
return false;
99-
}
100-
}
101-
true
102-
});
103-
has_unreschedulable_scan
104-
}
105-
106-
fn collect_fragment_upstream_fragment_ids(
107-
fragment: &InflightFragmentInfo,
108-
upstream_fragment_ids: &mut HashSet<FragmentId>,
109-
) {
110-
visit_stream_node_cont(&fragment.nodes, |node| {
111-
if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref() {
112-
upstream_fragment_ids.insert(merge.upstream_fragment_id);
113-
}
114-
true
115-
});
116-
}
117-
11890
impl CreatingStreamingJobControl {
119-
pub(super) fn new<'a>(
120-
entry: hash_map::VacantEntry<'a, JobId, Self>,
91+
pub(crate) fn new<'a>(
92+
entry: hash_map::VacantEntry<'a, JobId, IndependentCheckpointJobControl>,
12193
create_info: CreateSnapshotBackfillJobCommandInfo,
12294
notifiers: Vec<Notifier>,
12395
snapshot_backfill_upstream_tables: HashSet<TableId>,
@@ -213,19 +185,21 @@ impl CreatingStreamingJobControl {
213185

214186
let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
215187

216-
let job = entry.insert(Self {
217-
partial_graph_id,
218-
job_id,
219-
snapshot_backfill_upstream_tables,
220-
max_committed_epoch: None,
221-
snapshot_epoch,
222-
status: CreatingStreamingJobStatus::PlaceHolder, // filled in later code
223-
upstream_lag: GLOBAL_META_METRICS
224-
.snapshot_backfill_lag
225-
.with_guarded_label_values(&[&format!("{}", job_id)]),
226-
node_actors,
227-
state_table_ids,
228-
});
188+
let IndependentCheckpointJobControl::CreatingStreamingJob(job) = entry.insert(
189+
IndependentCheckpointJobControl::CreatingStreamingJob(Self {
190+
partial_graph_id,
191+
job_id,
192+
snapshot_backfill_upstream_tables,
193+
max_committed_epoch: None,
194+
snapshot_epoch,
195+
status: CreatingStreamingJobStatus::PlaceHolder, // filled in later code
196+
upstream_lag: GLOBAL_META_METRICS
197+
.snapshot_backfill_lag
198+
.with_guarded_label_values(&[&format!("{}", job_id)]),
199+
node_actors,
200+
state_table_ids,
201+
}),
202+
);
229203

230204
let mut graph_adder = partial_graph_manager.add_partial_graph(
231205
partial_graph_id,
@@ -557,15 +531,6 @@ impl CreatingStreamingJobControl {
557531
})
558532
}
559533

560-
pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
561-
self.status
562-
.fragment_infos()
563-
.map(|fragment_infos| {
564-
!InflightFragmentInfo::contains_worker(fragment_infos.values(), worker_id)
565-
})
566-
.unwrap_or(true)
567-
}
568-
569534
pub(crate) fn gen_backfill_progress(&self) -> BackfillProgress {
570535
let progress = match &self.status {
571536
CreatingStreamingJobStatus::ConsumingSnapshot {
@@ -650,7 +615,7 @@ impl CreatingStreamingJobControl {
650615
Ok(())
651616
}
652617

653-
pub(super) fn start_consume_upstream(
618+
pub(crate) fn start_consume_upstream(
654619
&mut self,
655620
partial_graph_manager: &mut PartialGraphManager,
656621
barrier_info: &BarrierInfo,
@@ -684,7 +649,7 @@ impl CreatingStreamingJobControl {
684649
Ok(info)
685650
}
686651

687-
pub(super) fn on_new_upstream_barrier(
652+
pub(crate) fn on_new_upstream_barrier(
688653
&mut self,
689654
partial_graph_manager: &mut PartialGraphManager,
690655
barrier_info: &BarrierInfo,
@@ -740,7 +705,7 @@ impl CreatingStreamingJobControl {
740705
self.should_merge_to_upstream()
741706
}
742707

743-
pub(super) fn should_merge_to_upstream(&self) -> bool {
708+
pub(crate) fn should_merge_to_upstream(&self) -> bool {
744709
if let CreatingStreamingJobStatus::ConsumingLogStore {
745710
log_store_progress_tracker,
746711
barriers_to_inject,
@@ -757,7 +722,7 @@ impl CreatingStreamingJobControl {
757722
}
758723

759724
impl CreatingStreamingJobControl {
760-
pub(super) fn start_completing(
725+
pub(crate) fn start_completing(
761726
&mut self,
762727
partial_graph_manager: &mut PartialGraphManager,
763728
min_upstream_inflight_epoch: Option<u64>,
@@ -852,56 +817,8 @@ impl CreatingStreamingJobControl {
852817
}
853818
}
854819

855-
pub fn fragment_infos_with_job_id(
856-
&self,
857-
) -> impl Iterator<Item = (&InflightFragmentInfo, JobId)> + '_ {
858-
self.status
859-
.fragment_infos()
860-
.into_iter()
861-
.flat_map(|fragments| fragments.values().map(|fragment| (fragment, self.job_id)))
862-
}
863-
864-
pub(super) fn collect_reschedule_blocked_fragment_ids(
865-
&self,
866-
blocked_fragment_ids: &mut HashSet<FragmentId>,
867-
) {
868-
let Some(info) = self.status.creating_job_info() else {
869-
return;
870-
};
871-
872-
for (fragment_id, fragment) in &info.fragment_infos {
873-
if fragment_has_online_unreschedulable_scan(fragment) {
874-
blocked_fragment_ids.insert(*fragment_id);
875-
collect_fragment_upstream_fragment_ids(fragment, blocked_fragment_ids);
876-
}
877-
}
878-
}
879-
880-
pub(super) fn collect_no_shuffle_fragment_relations(
881-
&self,
882-
no_shuffle_relations: &mut Vec<(FragmentId, FragmentId)>,
883-
) {
884-
let Some(info) = self.status.creating_job_info() else {
885-
return;
886-
};
887-
888-
for (upstream_fragment_id, downstreams) in &info.upstream_fragment_downstreams {
889-
no_shuffle_relations.extend(
890-
downstreams
891-
.iter()
892-
.filter(|downstream| downstream.dispatcher_type == DispatcherType::NoShuffle)
893-
.map(|downstream| (*upstream_fragment_id, downstream.downstream_fragment_id)),
894-
);
895-
}
896-
897-
for (fragment_id, downstreams) in &info.downstreams {
898-
no_shuffle_relations.extend(
899-
downstreams
900-
.iter()
901-
.filter(|downstream| downstream.dispatcher_type == DispatcherType::NoShuffle)
902-
.map(|downstream| (*fragment_id, downstream.downstream_fragment_id)),
903-
);
904-
}
820+
pub(crate) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
821+
self.status.fragment_infos()
905822
}
906823

907824
pub fn into_tracking_job(self) -> TrackingJob {
@@ -969,7 +886,7 @@ impl CreatingStreamingJobControl {
969886
}
970887
}
971888

972-
pub(super) fn reset(self) -> bool {
889+
pub(crate) fn reset(self) -> bool {
973890
match self.status {
974891
CreatingStreamingJobStatus::ConsumingSnapshot { .. }
975892
| CreatingStreamingJobStatus::ConsumingLogStore { .. }

src/meta/src/barrier/checkpoint/creating_job/status.rs renamed to src/meta/src/barrier/checkpoint/independent_job/creating_job/status.rs

Lines changed: 6 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use risingwave_pb::stream_service::barrier_complete_response::{
3030
};
3131
use tracing::warn;
3232

33-
use crate::barrier::checkpoint::creating_job::CreatingJobInfo;
33+
use crate::barrier::checkpoint::independent_job::creating_job::CreatingJobInfo;
3434
use crate::barrier::notifier::Notifier;
3535
use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
3636
use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
@@ -308,37 +308,11 @@ impl CreatingStreamingJobStatus {
308308
pending_non_checkpoint_barriers: &mut Vec<u64>,
309309
kind: PbBarrierKind,
310310
) -> BarrierInfo {
311-
{
312-
{
313-
let prev_epoch =
314-
TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
315-
*prev_epoch_fake_physical_time += 1;
316-
let curr_epoch =
317-
TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
318-
let kind = match kind {
319-
PbBarrierKind::Unspecified => {
320-
unreachable!()
321-
}
322-
PbBarrierKind::Initial => {
323-
assert!(pending_non_checkpoint_barriers.is_empty());
324-
BarrierKind::Initial
325-
}
326-
PbBarrierKind::Barrier => {
327-
pending_non_checkpoint_barriers.push(prev_epoch.value().0);
328-
BarrierKind::Barrier
329-
}
330-
PbBarrierKind::Checkpoint => {
331-
pending_non_checkpoint_barriers.push(prev_epoch.value().0);
332-
BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
333-
}
334-
};
335-
BarrierInfo {
336-
prev_epoch,
337-
curr_epoch,
338-
kind,
339-
}
340-
}
341-
}
311+
super::super::new_fake_barrier(
312+
prev_epoch_fake_physical_time,
313+
pending_non_checkpoint_barriers,
314+
kind,
315+
)
342316
}
343317

344318
pub(super) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
@@ -354,16 +328,4 @@ impl CreatingStreamingJobStatus {
354328
}
355329
}
356330
}
357-
358-
pub(super) fn creating_job_info(&self) -> Option<&CreatingJobInfo> {
359-
match self {
360-
CreatingStreamingJobStatus::ConsumingSnapshot { info, .. }
361-
| CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => Some(info),
362-
CreatingStreamingJobStatus::Finishing(..)
363-
| CreatingStreamingJobStatus::Resetting(_) => None,
364-
CreatingStreamingJobStatus::PlaceHolder => {
365-
unreachable!()
366-
}
367-
}
368-
}
369331
}

0 commit comments

Comments
 (0)