Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,7 @@ logs/

# Claude Code guidance file (local only)
CLAUDE.md
.claude/
.claude/

# git worktrees for parallel development (local only)
.worktrees/
148 changes: 148 additions & 0 deletions ballista/scheduler/src/state/aqe/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,30 @@ impl ExchangeExec {
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}

/// If this exec's resolved `shuffle_partitions` reference the given
/// executor, clear `shuffle_partitions` back to `None` and return its
/// `stage_id` so the planner can restore cache entries. Returns `None`
/// if the exec is unaffected (no resolved partitions, or none on the
/// lost executor).
pub(crate) fn reset_locations_on_lost_executor(
&self,
executor_id: &str,
) -> Option<usize> {
let mut guard = self.shuffle_partitions.lock();
let affected = match guard.as_ref() {
Some(parts) => parts
.iter()
.any(|locs| locs.iter().any(|loc| loc.executor_meta.id == executor_id)),
None => false,
};
if affected {
*guard = None;
self.stage_id()
} else {
None
}
}
}

impl DisplayAs for ExchangeExec {
Expand Down Expand Up @@ -403,6 +427,29 @@ impl AdaptiveDatafusionExec {
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}

/// If this exec's resolved `shuffle_partitions` reference the given
/// executor, clear `shuffle_partitions` back to `None` and return its
/// `stage_id` so the planner can restore cache entries. Returns `None`
/// if unaffected.
pub(crate) fn reset_locations_on_lost_executor(
&self,
executor_id: &str,
) -> Option<usize> {
let mut guard = self.shuffle_partitions.lock();
let affected = match guard.as_ref() {
Some(parts) => parts
.iter()
.any(|locs| locs.iter().any(|loc| loc.executor_meta.id == executor_id)),
None => false,
};
if affected {
*guard = None;
self.stage_id()
} else {
None
}
}
}

impl DisplayAs for AdaptiveDatafusionExec {
Expand Down Expand Up @@ -486,3 +533,104 @@ impl ExecutionPlan for AdaptiveDatafusionExec {
))
}
}

#[cfg(test)]
mod tests {
use super::*;
use ballista_core::serde::scheduler::{
ExecutorMetadata, ExecutorSpecification, PartitionId, PartitionStats,
};
use datafusion::arrow::datatypes::{Field, Schema};
use datafusion::physical_plan::empty::EmptyExec;

fn loc(executor_id: &str) -> PartitionLocation {
PartitionLocation {
map_partition_id: 0,
partition_id: PartitionId {
job_id: "j".to_string(),
stage_id: 0,
partition_id: 0,
},
executor_meta: ExecutorMetadata {
id: executor_id.to_string(),
host: "h".to_string(),
port: 0,
grpc_port: 0,
specification: ExecutorSpecification { task_slots: 0 },
},
partition_stats: PartitionStats::new(Some(1), None, Some(1)),
file_id: None,
is_sort_shuffle: false,
}
}

fn empty_input() -> Arc<dyn ExecutionPlan> {
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
datafusion::arrow::datatypes::DataType::Int32,
true,
)]));
Arc::new(EmptyExec::new(schema))
}

#[test]
fn exchange_exec_reset_clears_when_affected() {
let exec = ExchangeExec::new(empty_input(), None, 0);
exec.set_stage_id(7);
exec.resolve_shuffle_partitions(vec![vec![loc("ex-1"), loc("ex-2")]]);
assert!(exec.shuffle_created());

let result = exec.reset_locations_on_lost_executor("ex-1");

assert_eq!(result, Some(7));
assert!(!exec.shuffle_created());
}

#[test]
fn exchange_exec_reset_no_op_when_unrelated_executor() {
let exec = ExchangeExec::new(empty_input(), None, 0);
exec.set_stage_id(7);
exec.resolve_shuffle_partitions(vec![vec![loc("ex-1"), loc("ex-2")]]);

let result = exec.reset_locations_on_lost_executor("ex-99");

assert_eq!(result, None);
assert!(exec.shuffle_created());
}

#[test]
fn exchange_exec_reset_no_op_when_unresolved() {
let exec = ExchangeExec::new(empty_input(), None, 0);
exec.set_stage_id(7);

let result = exec.reset_locations_on_lost_executor("ex-1");

assert_eq!(result, None);
assert!(!exec.shuffle_created());
}

#[test]
fn adaptive_datafusion_exec_reset_clears_when_affected() {
let exec = AdaptiveDatafusionExec::new(0, empty_input());
exec.set_stage_id(11);
exec.resolve_shuffle_partitions(vec![vec![loc("ex-1")]]);
assert!(exec.shuffle_created());

let result = exec.reset_locations_on_lost_executor("ex-1");

assert_eq!(result, Some(11));
assert!(!exec.shuffle_created());
}

#[test]
fn adaptive_datafusion_exec_reset_no_op_when_unrelated_executor() {
let exec = AdaptiveDatafusionExec::new(0, empty_input());
exec.set_stage_id(11);
exec.resolve_shuffle_partitions(vec![vec![loc("ex-2")]]);

let result = exec.reset_locations_on_lost_executor("ex-1");

assert_eq!(result, None);
assert!(exec.shuffle_created());
}
}
122 changes: 118 additions & 4 deletions ballista/scheduler/src/state/aqe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ mod test;
///
/// with many limitations, such as:
///
/// - it does not cover executor failure
/// - dynamically coalescing shuffle partitions, not supported yet
/// - does not switch from hash join to sort merge join
/// - does not switch from streaming aggregation to hash aggregation
Expand Down Expand Up @@ -481,9 +480,121 @@ impl AdaptiveExecutionGraph {
reset_stage.extend(rollback_resolved_stages);
reset_stage.extend(rollback_running_stages);
reset_stage.extend(resubmit_successful_stages);

// Synchronize planner state with the graph-level rollback above.
// Without this, re-running stages can't accept
// update_exchange_locations (cache entries were cleared in
// finalise_stage) and the plan tree still treats affected
// exchanges as resolved.
let planner_affected = self
.planner
.reset_on_lost_executor(executor_id)
.map_err(|e| {
BallistaError::Internal(format!(
"Failed to reset AdaptivePlanner state on lost executor {executor_id}: {e}"
))
})?;

// The static-graph stage.inputs walk above is a no-op for AQE
// (`create_resolved_stage` initialises inputs to an empty HashMap).
// Use the planner's affected-stage set to drive the equivalent
// recovery for AQE: rerun Successful stages whose outputs were
// lost, and drop downstream Resolved/Running stages that depend
// on those outputs (the planner regenerates them via
// `actionable_stages` once the upstream stages re-run).
if !planner_affected.is_empty() {
// 1. Rerun Successful stages whose outputs were on the lost
// executor.
let successful_to_rerun: Vec<usize> = planner_affected
.iter()
.copied()
.filter(|stage_id| {
matches!(
self.stages.get(stage_id),
Some(ExecutionStage::Successful(_))
)
})
.collect();
for stage_id in &successful_to_rerun {
if let Some(ExecutionStage::Successful(success)) =
self.stages.get_mut(stage_id)
{
success.reset_tasks(executor_id);
}
if self.rerun_successful_stage(*stage_id) {
reset_stage.insert(*stage_id);
}
}

// 2. Drop Resolved / Running stages whose embedded plan reads
// from any affected stage (their ShuffleReaderExec entries
// contain stale partition locations on the lost executor).
let stages_to_drop: Vec<usize> = self
.stages
.iter()
.filter_map(|(stage_id, stage)| {
let plan = match stage {
ExecutionStage::Resolved(s) => s.plan.clone(),
ExecutionStage::Running(s) => s.plan.clone(),
_ => return None,
};
if Self::plan_reads_from_any(&plan, &planner_affected) {
Some(*stage_id)
} else {
None
}
})
.collect();

for stage_id in stages_to_drop {
if let Some(ExecutionStage::Running(running)) = self.stages.get(&stage_id)
{
let running_tasks = running
.running_tasks()
.into_iter()
.map(|(task_id, stage_id, partition_id, executor_id)| {
RunningTaskInfo {
task_id,
job_id: self.job_id.clone(),
stage_id,
partition_id,
executor_id,
}
})
.collect::<Vec<_>>();
all_running_tasks.extend(running_tasks);
}
self.stages.remove(&stage_id);
reset_stage.insert(stage_id);
}
}

Ok((reset_stage, all_running_tasks))
}

/// Recursively walk a physical plan looking for any `ShuffleReaderExec`
/// whose `stage_id` is in the affected set. Used to identify downstream
/// stages that need rebuilding after an executor loss invalidates an
/// upstream stage's outputs.
fn plan_reads_from_any(
plan: &Arc<dyn ExecutionPlan>,
affected: &HashSet<usize>,
) -> bool {
if let Some(reader) =
plan.as_any()
.downcast_ref::<ballista_core::execution_plans::ShuffleReaderExec>()
&& affected.contains(&reader.stage_id)
{
return true;
}
for child in plan.children() {
if Self::plan_reads_from_any(child, affected) {
return true;
}
}
false
}

/// Clear the stage failure count for this stage if the stage is finally success
fn clear_stage_failure(&mut self, stage_id: usize) {
self.failed_stage_attempts.remove(&stage_id);
Expand Down Expand Up @@ -840,9 +951,12 @@ impl ExecutionGraph for AdaptiveExecutionGraph {
);
}
} else {
return Err(BallistaError::Internal(format!(
"Invalid stage ID {stage_id} for job {job_id}"
)));
// Stage may have been dropped during executor-failure recovery
// (see reset_stages_internal). Late task statuses for such
// stages are expected and benign — log and skip.
warn!(
"Ignoring task status update for unknown stage {job_id}/{stage_id} (stage may have been dropped during recovery)"
);
}
}

Expand Down
Loading