Skip to content

Commit 819f465

Browse files
yaooqinnCopilot
andcommitted
skills: enhance spark-advisor with DataFlint-inspired diagnostics
Add SQL-level, resource utilization, and lakehouse-specific diagnostics: - Small files read/written detection - Broadcast too large / SortMergeJoin→BroadcastHashJoin conversion - Large cross join detection - Long filter condition detection - Full scan on partitioned/clustered tables - Wasted cores / over-provisioned cluster - Executor/driver memory sizing alerts - Iceberg inefficient replace detection - Delta Lake full scan detection Inspired by DataFlint OSS alert system (github.com/dataflint/spark). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 48ce9cb commit 819f465

2 files changed

Lines changed: 233 additions & 0 deletions

File tree

skills/spark-advisor/SKILL.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,53 @@ These are the most impactful things to check. For the full diagnostic ruleset, s
142142
| Bad config | Partition count, executor sizing | `env`, `summary` |
143143
| AQE ineffective | Initial vs final plan difference | `sql-plan <id> --view initial/final` |
144144
| Gluten fallback | Non-Transformer nodes in final plan | `sql-plan <id> --view final` |
145+
| Small files read | Avg file size < 3MB, files > 100 | `sql <exec-id>` node metrics |
146+
| Small files written | Avg file size < 3MB, files > 100 | `sql <exec-id>` node metrics |
147+
| Broadcast too large | Broadcast data > 1GB | `sql <exec-id>` node metrics |
148+
| SMJ→BHJ conversion | SMJ with small input side | `sql-plan <id> --view final` |
149+
| Large cross join | Cross join rows > 10B | `sql <exec-id>` node metrics |
150+
| Long filter condition | Filter condition > 1000 chars | `sql-plan <id> --view final` |
151+
| Full scan on partitioned | Missing partition/cluster filters | `sql-plan <id> --view final` |
152+
| Large partition size | Max partition > 5GB | `stage-summary <id>` |
153+
| Wasted cores | Idle cores > 50% | `executors --all` |
154+
| Memory over-provisioned | Max usage < 70% | `executors --all` |
155+
| Driver memory risk | Driver heap > 95% | `executors --all` |
156+
| Iceberg inefficient replace | Files replaced > 30%, records < 30% | `sql <exec-id>` node metrics |
157+
158+
## SQL Plan Analysis
159+
160+
When diagnosing specific SQL queries, analyze the SQL plan nodes for these patterns:
161+
162+
- **File I/O efficiency**: Check scan/write node metrics for `files read`, `bytes read`, `files written`, `bytes written`. Calculate average file size — small files (< 3MB) are a common hidden bottleneck.
163+
- **Join strategy**: Look for `SortMergeJoin` nodes where one input is significantly smaller than the other. These may benefit from broadcast hints or AQE tuning.
164+
- **Broadcast sizing**: Check `BroadcastExchange` node `data size` metric. Broadcasts > 1 GB cause excessive memory pressure and network overhead.
165+
- **Cross joins**: Identify `BroadcastNestedLoopJoin` or `CartesianProduct` nodes. Calculate total scanned rows from input sizes — cross joins on large tables are extremely dangerous.
166+
- **Filter complexity**: Inspect `Filter` node conditions. Very long conditions (> 1000 chars) with large IN-lists or OR chains should be converted to joins.
167+
- **Partition pruning**: For Delta Lake and Iceberg tables, verify that scan nodes show partition filters being applied. Full scans on partitioned tables waste I/O.
168+
- **Partition sizing**: Check stage task distribution for oversized partitions (> 5GB). These cause OOM risk, long tail tasks, and GC pressure.
169+
170+
Use `sql <exec-id>` for node-level metrics and `sql-plan <exec-id> --view final` for post-AQE plan structure.
171+
172+
## Lakehouse Awareness
173+
174+
When analyzing workloads on Delta Lake or Apache Iceberg tables:
175+
176+
### Delta Lake
177+
- **OPTIMIZE**: Recommend `OPTIMIZE` for tables with small file problems detected in scan metrics
178+
- **Z-ORDER**: Check if queries filter on z-ordered columns; if not, the z-ordering provides no benefit
179+
- **Liquid Clustering**: For Databricks, check if cluster key filters are being applied in scans
180+
- **Full scans**: Flag scans on partitioned Delta tables without partition filters
181+
182+
### Apache Iceberg
183+
- **Copy-on-Write overhead**: For update/delete workloads, check if files replaced >> records changed — this indicates COW overhead
184+
- **Merge-on-Read**: Recommend `write.merge-mode=merge-on-read` for update-heavy tables
185+
- **Table maintenance**: Recommend `rewrite_data_files` for small file compaction
186+
- **Bulk replace detection**: If > 60% of table files are replaced in a single operation, flag potential misuse
187+
188+
### General Lakehouse Checks
189+
- File sizes in scan/write metrics (target ~128MB per file)
190+
- Partition filter pushdown in scan nodes
191+
- Table statistics availability for cost-based optimization
145192

146193
## Gluten/Velox Awareness
147194

skills/spark-advisor/references/diagnostics.md

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,183 @@ When a fallback occurs, data must be converted between columnar (Velox) and row
168168
- `RowToVeloxColumnar` → Spark to native conversion
169169
- These conversions add overhead; minimize them by ensuring contiguous native execution
170170

171+
## SQL-Level Diagnostics
172+
173+
### Small Files Read
174+
**Detection**: From SQL plan node metrics (`files read` and `bytes read`):
175+
- Average file size < 3 MB AND files read > 100 → small files problem
176+
177+
**Root causes**:
178+
- Data written with too many partitions
179+
- High-cardinality partition keys
180+
- Frequent small-batch writes
181+
182+
**Recommendations**:
183+
- Ask data owner to compact/repartition source data
184+
- Reduce executors to amortize small file overhead
185+
- Use table maintenance (OPTIMIZE for Delta, rewrite_data_files for Iceberg)
186+
187+
### Small Files Written
188+
**Detection**: From SQL plan node metrics (`files written` and `bytes written`):
189+
- Average file size < 3 MB AND files written > 100 → writing small files
190+
- Ideal target: ~128 MB per file
191+
- For partitioned writes: check files per partition
192+
193+
**Root causes**:
194+
- Too many output partitions
195+
- High-cardinality partition keys
196+
197+
**Recommendations**:
198+
- For unpartitioned: `.repartition(N)` before write where N = total_bytes / 128MB
199+
- For partitioned: `.repartition("partition_key")` before write
200+
- Choose partition keys with lower cardinality
201+
202+
### Broadcast Too Large
203+
**Detection**: From SQL plan BroadcastExchange node metrics (`data size`):
204+
- Broadcast data size > 1 GB → too large for broadcast
205+
206+
**Root causes**:
207+
- `spark.sql.autoBroadcastJoinThreshold` set too high
208+
- Broadcast hint on large table
209+
210+
**Recommendations**:
211+
- Lower `spark.sql.autoBroadcastJoinThreshold`
212+
- Remove broadcast hint from large DataFrames
213+
- Consider SortMergeJoin for tables > 1 GB
214+
215+
### SortMergeJoin Should Be BroadcastHashJoin
216+
**Detection**: From SQL plan - SortMergeJoin node where one input is much smaller:
217+
- Small table < 10 MB (AQE should have caught this)
218+
- Small table < 100 MB AND large table > 10 GB
219+
- Small table < 1 GB AND large table > 300 GB
220+
- Small table < 5 GB AND large table > 1 TB
221+
222+
**Root causes**:
223+
- AQE disabled or couldn't estimate sizes
224+
- Missing statistics
225+
226+
**Recommendations**:
227+
- Use `broadcast(small_df)` hint
228+
- Increase `spark.sql.autoBroadcastJoinThreshold`
229+
- Ensure AQE is enabled: `spark.sql.adaptive.enabled=true`
230+
231+
### Large Cross Join
232+
**Detection**: From BroadcastNestedLoopJoin or CartesianProduct node metrics:
233+
- Cross Join Scanned Rows > 10 billion → dangerous cross join
234+
235+
**Root causes**:
236+
- Missing join conditions
237+
- Accidental Cartesian product
238+
239+
**Recommendations**:
240+
- Add specific join conditions
241+
- Avoid cross joins on large datasets
242+
- Consider alternatives (window functions, explode + join)
243+
244+
### Long Filter Conditions
245+
**Detection**: From Filter node plan condition:
246+
- Condition string length > 1000 characters → performance risk
247+
248+
**Root causes**:
249+
- Large IN-lists
250+
- Complex OR chains
251+
- Programmatically generated filters
252+
253+
**Recommendations**:
254+
- Convert filter to a join (create DataFrame of filter values, inner join)
255+
- Rewrite filter to be shorter
256+
- Use temp table for large value lists
257+
258+
### Full Scan on Partitioned/Clustered Tables
259+
**Detection**: From scan nodes with Delta Lake / Iceberg metadata:
260+
- Partitioned table scanned without partition filters
261+
- Liquid Clustering table scanned without cluster key filters
262+
- Z-Ordered table scanned without z-order column filters
263+
264+
**Root causes**:
265+
- Missing WHERE clauses on partition/cluster keys
266+
267+
**Recommendations**:
268+
- Add filter on partition key(s)
269+
- Add filter on clustering key(s)
270+
- Review query to ensure predicate pushdown works
271+
272+
### Large Partition Size
273+
**Detection**: From stage task distribution metrics:
274+
- Max partition size > 5 GB (input, output, shuffle read, or shuffle write)
275+
276+
**Root causes**:
277+
- Uneven data distribution
278+
- Too few partitions
279+
280+
**Recommendations**:
281+
- Increase number of partitions
282+
- Use more specific partitioning keys
283+
- Enable AQE auto-coalesce
284+
285+
## Resource Utilization Diagnostics
286+
287+
### Wasted Cores / Over-Provisioned Cluster
288+
**Detection**: From executor metrics:
289+
- Idle cores rate > 50% → cluster over-provisioned
290+
291+
**Root causes**:
292+
- Too many executors/cores for the workload size
293+
294+
**Recommendations**:
295+
- For static allocation: lower `spark.executor.cores` or `spark.executor.instances`
296+
- For dynamic allocation: tune `spark.dynamicAllocation.executorAllocationRatio` or increase `spark.dynamicAllocation.schedulerBacklogTimeout`
297+
298+
### Executor Memory Over-Provisioned
299+
**Detection**: From executor memory metrics:
300+
- Max executor memory usage < 70% → over-provisioned (wasting money)
301+
- Max executor memory usage > 95% → under-provisioned (risk of OOM/spill)
302+
303+
**Root causes**:
304+
- Wrong `spark.executor.memory` sizing
305+
306+
**Recommendations**:
307+
- Over-provisioned: decrease `spark.executor.memory` to max_usage * 1.2
308+
- Under-provisioned: increase `spark.executor.memory` by 20%
309+
310+
### Driver Memory Under-Provisioned
311+
**Detection**: From driver executor heap memory usage:
312+
- Driver heap usage > 95% of Xmx → risk of driver OOM
313+
314+
**Root causes**:
315+
- Large collect() calls
316+
- Too many broadcast variables
317+
- Driver-side aggregations
318+
319+
**Recommendations**:
320+
- Increase `spark.driver.memory`
321+
- Avoid `collect()` on large datasets
322+
- Reduce broadcast variable sizes
323+
324+
## Lakehouse-Specific Diagnostics
325+
326+
### Inefficient Iceberg Table Replace
327+
**Detection**: From Iceberg commit metrics on ReplaceData operations:
328+
- Table files replaced > 30% BUT records changed < 30% → rewriting too many files
329+
330+
**Root causes**:
331+
- Copy-on-write mode rewriting entire files for small updates
332+
333+
**Recommendations**:
334+
- Switch to merge-on-read mode (`write.merge-mode=merge-on-read`)
335+
- Partition table so updates touch fewer partitions
336+
337+
### Replaced Most of Iceberg Table
338+
**Detection**: From Iceberg commit metrics:
339+
- Table files replaced > 60% → potential misuse of Iceberg
340+
341+
**Root causes**:
342+
- Bulk updates/deletes that rewrite most of the table
343+
344+
**Recommendations**:
345+
- Partition table to localize updates
346+
- Consider if Iceberg is the right format for this workload
347+
171348
## Thresholds Summary
172349

173350
| Metric | OK | Warning | Critical |
@@ -179,3 +356,12 @@ When a fallback occurs, data must be converted between columnar (Velox) and row
179356
| Shuffle/Input ratio | < 1x | 1-3x | > 3x |
180357
| Partition size | 64-256MB | 32-512MB | < 16MB or > 1GB |
181358
| Memory per core | 4-8GB | 2-4GB or 8-16GB | < 2GB or > 16GB |
359+
| Avg file size (read/write) | > 64MB | 3-64MB | < 3MB |
360+
| Broadcast data size | < 256MB | 256MB-1GB | > 1GB |
361+
| Cross join rows | < 1B | 1-10B | > 10B |
362+
| Filter condition length | < 500 chars | 500-1000 chars | > 1000 chars |
363+
| Max partition size | < 2GB | 2-5GB | > 5GB |
364+
| Idle cores rate | < 20% | 20-50% | > 50% |
365+
| Executor memory usage | 70-90% | 50-70% or 90-95% | < 50% or > 95% |
366+
| Driver heap usage | < 80% | 80-95% | > 95% |
367+
| Iceberg files replaced | < 30% | 30-60% | > 60% |

0 commit comments

Comments
 (0)