Skip to content

Commit f483cc9

Browse files
committed
Stabilize Spark timing-sensitive tests with unique table names.
Use per-test randomized table identifiers in OperationsTest and SparkMoRFunctionalTest to reduce cross-test table collisions and improve determinism under parallel execution. Made-with: Cursor
1 parent efce672 commit f483cc9

File tree

2 files changed

+49
-39
lines changed

2 files changed

+49
-39
lines changed

apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkMoRFunctionalTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.Arrays;
1010
import java.util.Collections;
1111
import java.util.List;
12+
import java.util.UUID;
1213
import java.util.function.BiFunction;
1314
import java.util.stream.Collectors;
1415
import java.util.stream.IntStream;
@@ -38,7 +39,7 @@
3839

3940
@Slf4j
4041
public class SparkMoRFunctionalTest extends OpenHouseSparkITest {
41-
static final String tableName = "db.test_data_compaction";
42+
private String tableName;
4243

4344
final BiFunction<Operations, Table, RewriteDataFiles.Result> rewriteFunc =
4445
(ops, table) ->
@@ -58,13 +59,18 @@ public class SparkMoRFunctionalTest extends OpenHouseSparkITest {
5859
@BeforeEach
5960
public void setUp() throws Exception {
6061
ops = Operations.withCatalog(getSparkSession(), null);
62+
tableName = uniqueTableName("db.test_data_compaction");
6163
}
6264

6365
@AfterEach
6466
public void cleanUp() throws Exception {
6567
sql("DROP TABLE IF EXISTS %s", tableName);
6668
}
6769

70+
private static String uniqueTableName(String baseName) {
71+
return baseName + "_" + UUID.randomUUID().toString().replace("-", "");
72+
}
73+
6874
protected void initTable() {
6975
sql(
7076
"ALTER TABLE %s SET TBLPROPERTIES ('write.delete.mode'='merge-on-read', 'write.update.mode'='merge-on-read', 'write.merge.mode'='merge-on-read', 'write.delete.distribution-mode'='range');",

apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Arrays;
2727
import java.util.List;
2828
import java.util.Map;
29+
import java.util.UUID;
2930
import java.util.function.BiFunction;
3031
import java.util.stream.Collectors;
3132
import lombok.extern.slf4j.Slf4j;
@@ -53,7 +54,7 @@ public class OperationsTest extends OpenHouseSparkITest {
5354

5455
@Test
5556
public void testRetentionSparkApp() throws Exception {
56-
final String tableName = "db.test_retention_sql";
57+
final String tableName = uniqueTableName("db.test_retention_sql");
5758
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
5859
prepareTableWithRetentionAndSharingPolicies(ops, tableName, "1d", true);
5960
populateTable(ops, tableName, 3);
@@ -66,12 +67,12 @@ public void testRetentionSparkApp() throws Exception {
6667

6768
@Test
6869
public void testRetentionSparkAppWithStringPartitionColumns() throws Exception {
69-
final String tableName1 = "db.test_retention_string_partition1";
70-
final String tableName2 = "db.test_retention_string_partition2";
71-
final String tableName3 = "db.test_retention_string_partition3";
72-
final String tableName4 = "db.test_retention_string_partition4";
73-
final String tableName5 = "db.test_retention_string_partition5";
74-
final String tableName6 = "db.test_retention_string_partition6";
70+
final String tableName1 = uniqueTableName("db.test_retention_string_partition1");
71+
final String tableName2 = uniqueTableName("db.test_retention_string_partition2");
72+
final String tableName3 = uniqueTableName("db.test_retention_string_partition3");
73+
final String tableName4 = uniqueTableName("db.test_retention_string_partition4");
74+
final String tableName5 = uniqueTableName("db.test_retention_string_partition5");
75+
final String tableName6 = uniqueTableName("db.test_retention_string_partition6");
7576

7677
List<String> rowValue = new ArrayList<>();
7778
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
@@ -133,9 +134,7 @@ public void testRetentionSparkAppWithStringPartitionColumns() throws Exception {
133134
rowValue.add(currentDatesFormatMismatched.get(0).get(0).toString());
134135
runRetentionJobWithStringPartitionColumns(
135136
ops, tableName5, rowValue, "datePartition", "yyyy-MM.dd.HH", "day");
136-
ops.spark()
137-
.sql("select * from openhouse.db.test_retention_string_partition5")
138-
.collectAsList();
137+
ops.spark().sql(String.format("select * from %s", tableName5)).collectAsList();
139138
verifyRowCount(ops, tableName5, 0);
140139
rowValue.clear();
141140

@@ -164,7 +163,7 @@ private void runRetentionJobWithStringPartitionColumns(
164163

165164
@Test
166165
public void testRetentionCreatesSnapshotsOnNoOpDelete() throws Exception {
167-
final String tableName = "db_test.test_retention_sql";
166+
final String tableName = uniqueTableName("db_test.test_retention_sql");
168167
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
169168
prepareTable(ops, tableName);
170169
populateTable(ops, tableName, 4);
@@ -182,7 +181,7 @@ public void testRetentionCreatesSnapshotsOnNoOpDelete() throws Exception {
182181
public void testRetentionDataManifestWithStringDatePartitionedTable() throws Exception {
183182
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
184183
// set up
185-
String tableName = "db.test_string_partition";
184+
String tableName = uniqueTableName("db.test_string_partition");
186185
String columnName = "datepartition";
187186
String columnPattern = "yyyy-MM-dd-HH";
188187
String granularity = "DAY";
@@ -260,7 +259,7 @@ public void testRetentionDataManifestWithStringDatePartitionedTable() throws Exc
260259
public void testRetentionDataManifestWithTimestampPartitionedTable() throws Exception {
261260
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
262261
// set up
263-
String tableName = "db.test_time_partition";
262+
String tableName = uniqueTableName("db.test_time_partition");
264263
String columnName = "ts";
265264
String columnPattern = "";
266265
String granularity = "DAY";
@@ -311,7 +310,7 @@ public void testRetentionDataManifestWithTimestampPartitionedTable() throws Exce
311310

312311
@Test
313312
public void testOrphanFilesDeletionJavaAPI() throws Exception {
314-
final String tableName = "db.test_ofd_java";
313+
final String tableName = uniqueTableName("db.test_ofd_java");
315314
final String testOrphanFileName = "data/test_orphan_file.orc";
316315
final int numInserts = 3;
317316
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
@@ -338,7 +337,7 @@ public void testOrphanFilesDeletionJavaAPI() throws Exception {
338337

339338
@Test
340339
public void testOrphanFilesDeletionIgnoresFilesInBackupDir() throws Exception {
341-
final String tableName = "db.test_ofd_java";
340+
final String tableName = uniqueTableName("db.test_ofd_java");
342341
final String testOrphanFileName = "data/test_orphan_file.orc";
343342
final int numInserts = 3;
344343
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
@@ -365,7 +364,7 @@ public void testOrphanFilesDeletionIgnoresFilesInBackupDir() throws Exception {
365364

366365
@Test
367366
public void testOrphanFilesDeletionDeleteNonDataFiles() throws Exception {
368-
final String tableName = "db.test_ofd";
367+
final String tableName = uniqueTableName("db.test_ofd");
369368
final String testOrphanFileName = "metadata/test_orphan_file.avro";
370369
final int numInserts = 3;
371370
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
@@ -392,7 +391,7 @@ public void testOrphanFilesDeletionDeleteNonDataFiles() throws Exception {
392391

393392
@Test
394393
public void testOrphanFilesDeletionBackupDisabled() throws Exception {
395-
final String tableName = "db.test_ofd";
394+
final String tableName = uniqueTableName("db.test_ofd");
396395
final String testOrphanFileName = "data/test_orphan_file.orc";
397396
final int numInserts = 3;
398397
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
@@ -419,7 +418,7 @@ public void testOrphanFilesDeletionBackupDisabled() throws Exception {
419418

420419
@Test
421420
public void testOrphanFilesDeletionDeleteDataWhenDataManifestNotExists() throws Exception {
422-
final String tableName = "db.test_ofd_java";
421+
final String tableName = uniqueTableName("db.test_ofd_java");
423422
final String testOrphanFileName = "data/test_orphan_file.orc";
424423
final int numInserts = 3;
425424
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
@@ -444,7 +443,7 @@ public void testOrphanFilesDeletionDeleteDataWhenDataManifestNotExists() throws
444443

445444
@Test
446445
public void testSnapshotsExpirationMaxAge() throws Exception {
447-
final String tableName = "db.test_es_maxage_java";
446+
final String tableName = uniqueTableName("db.test_es_maxage_java");
448447
final int numInserts = 3;
449448
final int maxAge = 0;
450449
final String timeGranularity = "DAYS";
@@ -475,7 +474,7 @@ public void testSnapshotsExpirationMaxAge() throws Exception {
475474

476475
@Test
477476
public void testSnapshotsExpirationMaxAgeNoop() throws Exception {
478-
final String tableName = "db.test_es_maxage_noop_java";
477+
final String tableName = uniqueTableName("db.test_es_maxage_noop_java");
479478
final int numInserts = 3;
480479
final int maxAge = 3;
481480
final String timeGranularity = "DAYS";
@@ -505,7 +504,7 @@ public void testSnapshotsExpirationMaxAgeNoop() throws Exception {
505504

506505
@Test
507506
public void testSnapshotsExpirationVersionsNoop() throws Exception {
508-
final String tableName = "db.test_es_versions_noop_java";
507+
final String tableName = uniqueTableName("db.test_es_versions_noop_java");
509508
final int numInserts = 3;
510509
final int versionsToKeep = 5; // Should keep all versions given that there are fewer versions
511510
final int maxAge = 3;
@@ -535,7 +534,7 @@ public void testSnapshotsExpirationVersionsNoop() throws Exception {
535534

536535
@Test
537536
public void testSnapshotsExpirationVersions() throws Exception {
538-
final String tableName = "db.test_es_versions_java";
537+
final String tableName = uniqueTableName("db.test_es_versions_java");
539538
final int numInserts = 3;
540539
final int versionsToKeep = 2;
541540
final int maxAge = 3;
@@ -569,7 +568,7 @@ public void testSnapshotsExpirationVersions() throws Exception {
569568

570569
@Test
571570
public void testSnapshotsExpirationBothAgeAndVersions() throws Exception {
572-
final String tableName = "db.test_es_age_and_versions_java";
571+
final String tableName = uniqueTableName("db.test_es_age_and_versions_java");
573572
final int numInserts = 3;
574573
final int maxAge = 3;
575574
final String timeGranularity = "DAYS";
@@ -603,7 +602,7 @@ public void testSnapshotsExpirationBothAgeAndVersions() throws Exception {
603602

604603
@Test
605604
public void testSnapshotsExpirationPrioritizeAge() throws Exception {
606-
final String tableName = "db.test_es_age_prioritization_java";
605+
final String tableName = uniqueTableName("db.test_es_age_prioritization_java");
607606
final int numInserts = 3;
608607
final int maxAge = 20;
609608
final String timeGranularity =
@@ -644,7 +643,7 @@ public void testSnapshotsExpirationPrioritizeAge() throws Exception {
644643

645644
@Test
646645
public void testSnapshotExpirationWithHoursDaysMonthsYears() throws Exception {
647-
final String tableName = "db.test_es_age_policy";
646+
final String tableName = uniqueTableName("db.test_es_age_policy");
648647
final int numInserts = 3;
649648
final int maxAge = 20;
650649
final int versionsToKeep = 5;
@@ -676,7 +675,7 @@ public void testSnapshotExpirationWithHoursDaysMonthsYears() throws Exception {
676675

677676
@Test
678677
public void testStagedFilesDelete() throws Exception {
679-
final String tableName = "db.test_staged_delete";
678+
final String tableName = uniqueTableName("db.test_staged_delete");
680679
final int numInserts = 3;
681680
final String testOrphanFile1 = "data/test_orphan_file1.orc";
682681
final String testOrphanFile2 = "data/test_orphan_file2.orc";
@@ -714,7 +713,7 @@ public void testStagedFilesDelete() throws Exception {
714713

715714
@Test
716715
public void testDataCompactionPartialProgressNonPartitionedTable() throws Exception {
717-
final String tableName = "db.test_data_compaction";
716+
final String tableName = uniqueTableName("db.test_data_compaction");
718717
final int numInserts = 3;
719718

720719
BiFunction<Operations, Table, RewriteDataFiles.Result> rewriteFunc =
@@ -771,7 +770,7 @@ public void testDataCompactionPartialProgressNonPartitionedTable() throws Except
771770

772771
@Test
773772
public void testDataCompactionPartialProgressPartitionedTable() throws Exception {
774-
final String tableName = "db.test_data_compaction_partitioned";
773+
final String tableName = uniqueTableName("db.test_data_compaction_partitioned");
775774
final int numInsertsPerPartition = 3;
776775
final int numDailyPartitions = 10;
777776
final int maxCommits = 5;
@@ -857,7 +856,7 @@ public void testDataCompactionPartialProgressPartitionedTable() throws Exception
857856
public void testOrphanDirsDeletionJavaAPI() throws Exception {
858857
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
859858
// test orphan delete
860-
Path tbLoc = prepareOrphanTableDirectory(ops, "db1.test_odd_orphan");
859+
Path tbLoc = prepareOrphanTableDirectory(ops, uniqueTableName("db1.test_odd_orphan"));
861860

862861
long timeThreshold = System.currentTimeMillis();
863862
List<Path> matchingFilesBefore = new ArrayList<>();
@@ -912,7 +911,7 @@ private static Path prepareOrphanTableDirectory(Operations ops, String tableName
912911

913912
@Test
914913
public void testCollectEarliestPartitionDateStat() throws Exception {
915-
final String tableName = "db.test_collect_earliest_partition_date";
914+
final String tableName = uniqueTableName("db.test_collect_earliest_partition_date");
916915
List<String> rowValue = new ArrayList<>();
917916

918917
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
@@ -958,7 +957,7 @@ public void testCollectEarliestPartitionDateStat() throws Exception {
958957

959958
@Test
960959
public void testCollectTableStatsWithEmptyPartitions() throws Exception {
961-
final String tableName = "db.test_empty_partitions";
960+
final String tableName = uniqueTableName("db.test_empty_partitions");
962961

963962
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
964963
// Create table with partition but no data
@@ -980,7 +979,7 @@ public void testCollectTableStatsWithEmptyPartitions() throws Exception {
980979

981980
@Test
982981
public void testCollectTablePolicyStats() throws Exception {
983-
final String tableName = "db.test_collect_table_stats_with_policy";
982+
final String tableName = uniqueTableName("db.test_collect_table_stats_with_policy");
984983
List<String> rowValue = new ArrayList<>();
985984

986985
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
@@ -1050,7 +1049,7 @@ public void testCollectTablePolicyStats() throws Exception {
10501049

10511050
@Test
10521051
public void testCollectTableStats() throws Exception {
1053-
final String tableName = "db.test_collect_table_stats";
1052+
final String tableName = uniqueTableName("db.test_collect_table_stats");
10541053
final int numInserts = 3;
10551054
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
10561055
prepareTable(ops, tableName);
@@ -1096,7 +1095,8 @@ public void testCollectTableStats() throws Exception {
10961095

10971096
@Test
10981097
public void testCollectHistoryPolicyStatsWithSnapshots() throws Exception {
1099-
final String tableName = "db.test_collect_table_stats_with_history_policy_snapshots";
1098+
final String tableName =
1099+
uniqueTableName("db.test_collect_table_stats_with_history_policy_snapshots");
11001100

11011101
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
11021102
// Test table with both retention and sharing policies
@@ -1139,6 +1139,10 @@ private static void verifyRowCount(Operations ops, String tableName, int expecte
11391139
Assertions.assertEquals(expectedRowCount, resultRows.size());
11401140
}
11411141

1142+
private static String uniqueTableName(String tableName) {
1143+
return tableName + "_" + UUID.randomUUID().toString().replace("-", "");
1144+
}
1145+
11421146
private static void populateTable(
11431147
Operations ops, String tableName, int numRows, int dayLag, long timestampSeconds) {
11441148
String timestampEntry =
@@ -1396,7 +1400,7 @@ private static List<Triple<String, String, Long>> getDataFiles(Operations ops, S
13961400

13971401
@Test
13981402
public void testCollectPartitionStatsForPartitionedTable() throws Exception {
1399-
final String tableName = "db.test_partition_stats_partitioned";
1403+
final String tableName = uniqueTableName("db.test_partition_stats_partitioned");
14001404

14011405
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
14021406
// Setup: Create partitioned table
@@ -1438,7 +1442,7 @@ public void testCollectPartitionStatsForPartitionedTable() throws Exception {
14381442

14391443
@Test
14401444
public void testCollectPartitionStatsForUnpartitionedTable() throws Exception {
1441-
final String tableName = "db.test_partition_stats_unpartitioned";
1445+
final String tableName = uniqueTableName("db.test_partition_stats_unpartitioned");
14421446

14431447
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
14441448
// Setup: Create unpartitioned table
@@ -1478,7 +1482,7 @@ public void testCollectPartitionStatsForUnpartitionedTable() throws Exception {
14781482

14791483
@Test
14801484
public void testCollectPartitionStatsWithMultipleCommitsToSamePartition() throws Exception {
1481-
final String tableName = "db.test_partition_stats_multiple_commits";
1485+
final String tableName = uniqueTableName("db.test_partition_stats_multiple_commits");
14821486

14831487
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
14841488
// Setup: Create partitioned table
@@ -1516,7 +1520,7 @@ public void testCollectPartitionStatsWithMultipleCommitsToSamePartition() throws
15161520

15171521
@Test
15181522
public void testCollectPartitionStatsEmptyTable() throws Exception {
1519-
final String tableName = "db.test_partition_stats_empty";
1523+
final String tableName = uniqueTableName("db.test_partition_stats_empty");
15201524

15211525
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
15221526
// Setup: Create empty partitioned table (no data)

0 commit comments

Comments
 (0)