Skip to content

Commit dc0a33f

Browse files
feat(sqs): clear SNS FIFO dedup cache on PurgeQueue
Clear SNS FIFO topic deduplication for topics with SQS subscriptions targeting a purged queue when `clearFifoDeduplicationCacheOnPurge` is enabled.
1 parent 763ea37 commit dc0a33f

7 files changed

Lines changed: 120 additions & 11 deletions

File tree

docs/configuration/application-yml.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ floci:
8484
enabled: true
8585
default-visibility-timeout: 30 # Seconds
8686
max-message-size: 262144 # Bytes (256 KB)
87-
clear-fifo-deduplication-cache-on-purge: false # When true, PurgeQueue also clears the FIFO deduplication cache
87+
clear-fifo-deduplication-cache-on-purge: false # When true, PurgeQueue clears SQS FIFO dedup and SNS FIFO topic dedup for topics subscribed to that queue
8888
8989
s3:
9090
enabled: true
@@ -229,7 +229,7 @@ All keys in this table are declared on `EmulatorConfig` and accept environment v
229229
| `FLOCI_SERVICES_SSM_MAX_PARAMETER_HISTORY` | `5` | Max parameter versions kept |
230230
| `FLOCI_SERVICES_SQS_DEFAULT_VISIBILITY_TIMEOUT` | `30` | Default visibility timeout (seconds) |
231231
| `FLOCI_SERVICES_SQS_MAX_MESSAGE_SIZE` | `262144` | Max message size (bytes) |
232-
| `FLOCI_SERVICES_SQS_CLEAR_FIFO_DEDUPLICATION_CACHE_ON_PURGE` | `false` | When `true`, `PurgeQueue` also clears the FIFO 5-minute deduplication cache for the target queue |
232+
| `FLOCI_SERVICES_SQS_CLEAR_FIFO_DEDUPLICATION_CACHE_ON_PURGE` | `false` | When `true`, `PurgeQueue` clears the FIFO 5-minute deduplication cache for the target queue and matching SNS FIFO topic dedup entries |
233233
| `FLOCI_SERVICES_S3_DEFAULT_PRESIGN_EXPIRY_SECONDS` | `3600` | Pre-signed URL expiry |
234234
| `FLOCI_SERVICES_DOCKER_NETWORK` | *(unset)* | Shared Docker network for Lambda, RDS, ElastiCache containers |
235235
| `FLOCI_SERVICES_ECS_MOCK` | `false` | Skip Docker; tasks go straight to RUNNING (useful for CI) |

docs/services/sqs.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ floci:
3737
enabled: true
3838
default-visibility-timeout: 30 # Seconds
3939
max-message-size: 262144 # 256 KB
40-
clear-fifo-deduplication-cache-on-purge: false # When true, PurgeQueue also clears the FIFO 5-minute deduplication cache
40+
clear-fifo-deduplication-cache-on-purge: false # When true, PurgeQueue clears the FIFO deduplication cache for the queue and for any SNS FIFO topics that subscribe to that queue (SNS in-memory dedup)
4141
```
4242
4343
## Examples

src/main/java/io/github/hectorvent/floci/services/sns/SnsService.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,54 @@ private boolean isDuplicate(String topicArn, String deduplicationId) {
567567
return false;
568568
}
569569

570+
/**
571+
* Removes all FIFO deduplication cache entries for SNS topics that have an SQS subscription
572+
* whose endpoint resolves to the same queue path as {@code queueUrl} (used when purging SQS
573+
* with {@code clearFifoDeduplicationCacheOnPurge}).
574+
*/
575+
public void clearFifoDeduplicationCacheForSqsQueueSubscriptions(String queueUrl, String region) {
576+
String queuePath = extractQueuePathFromUrl(queueUrl);
577+
if (queuePath.isEmpty()) {
578+
return;
579+
}
580+
String subPrefix = "sub::" + region + "::";
581+
subscriptionStore.keys().stream()
582+
.filter(key -> key.startsWith(subPrefix))
583+
.map(key -> subscriptionStore.get(key).orElse(null))
584+
.filter(Objects::nonNull)
585+
.filter(sub -> "sqs".equals(sub.getProtocol()))
586+
.filter(sub -> sqsSubscriptionEndpointMatchesQueuePath(sub.getEndpoint(), queuePath))
587+
.map(Subscription::getTopicArn)
588+
.forEach(topicArn ->
589+
fifoDeduplicationCache.keySet().removeIf(cacheKey -> cacheKey.startsWith(topicArn + ":")));
590+
}
591+
592+
private boolean sqsSubscriptionEndpointMatchesQueuePath(String endpoint, String queuePath) {
593+
if (endpoint == null) {
594+
return false;
595+
}
596+
String asUrl = sqsArnToUrl(endpoint);
597+
return extractQueuePathFromUrl(asUrl).equals(queuePath);
598+
}
599+
600+
/**
601+
* Same path extraction as {@code SqsService} queue URL normalization ({@code /accountId/queueName}).
602+
*/
603+
private static String extractQueuePathFromUrl(String url) {
604+
if (url == null) {
605+
return "";
606+
}
607+
int schemeEnd = url.indexOf("://");
608+
if (schemeEnd < 0) {
609+
return url;
610+
}
611+
int pathStart = url.indexOf('/', schemeEnd + 3);
612+
if (pathStart < 0) {
613+
return url;
614+
}
615+
return url.substring(pathStart);
616+
}
617+
570618
private List<Subscription> subscriptionsByTopic(String topicArn, String region) {
571619
List<Subscription> result = new ArrayList<>();
572620
String prefix = "sub::" + region + "::";
@@ -739,4 +787,4 @@ private static String topicKey(String region, String arn) {
739787
private static String subKey(String region, String subscriptionArn) {
740788
return "sub::" + region + "::" + subscriptionArn;
741789
}
742-
}
790+
}

src/main/java/io/github/hectorvent/floci/services/sqs/SqsService.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.github.hectorvent.floci.core.common.RegionResolver;
66
import io.github.hectorvent.floci.core.storage.StorageBackend;
77
import io.github.hectorvent.floci.core.storage.StorageFactory;
8+
import io.github.hectorvent.floci.services.sns.SnsService;
89
import io.github.hectorvent.floci.services.sqs.model.Message;
910
import io.github.hectorvent.floci.services.sqs.model.Queue;
1011
import com.fasterxml.jackson.core.type.TypeReference;
@@ -41,9 +42,11 @@ private record RedrivePolicy(int maxReceiveCount, String deadLetterTargetArn) {
4142
private final String baseUrl;
4243
private final RegionResolver regionResolver;
4344
private final boolean clearFifoDeduplicationCacheOnPurge;
45+
private final SnsService snsService;
4446

4547
@Inject
46-
public SqsService(StorageFactory storageFactory, EmulatorConfig config, RegionResolver regionResolver) {
48+
public SqsService(StorageFactory storageFactory, EmulatorConfig config, RegionResolver regionResolver,
49+
SnsService snsService) {
4750
this(
4851
storageFactory.create("sqs", "sqs-queues.json",
4952
new TypeReference<Map<String, Queue>>() {
@@ -58,7 +61,8 @@ public SqsService(StorageFactory storageFactory, EmulatorConfig config, RegionRe
5861
config.services().sqs().maxMessageSize(),
5962
config.effectiveBaseUrl(),
6063
regionResolver,
61-
config.services().sqs().clearFifoDeduplicationCacheOnPurge()
64+
config.services().sqs().clearFifoDeduplicationCacheOnPurge(),
65+
snsService
6266
);
6367
}
6468

@@ -68,21 +72,22 @@ public SqsService(StorageFactory storageFactory, EmulatorConfig config, RegionRe
6872
SqsService(StorageBackend<String, Queue> queueStore,
6973
int defaultVisibilityTimeout, int maxMessageSize, String baseUrl) {
7074
this(queueStore, null, null, defaultVisibilityTimeout, maxMessageSize, baseUrl,
71-
new RegionResolver("us-east-1", "000000000000"), false);
75+
new RegionResolver("us-east-1", "000000000000"), false, null);
7276
}
7377

7478
SqsService(StorageBackend<String, Queue> queueStore, StorageBackend<String, List<Message>> messageStore,
7579
StorageBackend<String, Map<String, Long>> dedupStore,
7680
int defaultVisibilityTimeout, int maxMessageSize, String baseUrl,
7781
RegionResolver regionResolver) {
7882
this(queueStore, messageStore, dedupStore, defaultVisibilityTimeout, maxMessageSize, baseUrl,
79-
regionResolver, false);
83+
regionResolver, false, null);
8084
}
8185

8286
SqsService(StorageBackend<String, Queue> queueStore, StorageBackend<String, List<Message>> messageStore,
8387
StorageBackend<String, Map<String, Long>> dedupStore,
8488
int defaultVisibilityTimeout, int maxMessageSize, String baseUrl,
85-
RegionResolver regionResolver, boolean clearFifoDeduplicationCacheOnPurge) {
89+
RegionResolver regionResolver, boolean clearFifoDeduplicationCacheOnPurge,
90+
SnsService snsService) {
8691
this.queueStore = queueStore;
8792
this.messageStore = messageStore;
8893
this.dedupStore = dedupStore;
@@ -91,6 +96,7 @@ public SqsService(StorageFactory storageFactory, EmulatorConfig config, RegionRe
9196
this.baseUrl = baseUrl;
9297
this.regionResolver = regionResolver;
9398
this.clearFifoDeduplicationCacheOnPurge = clearFifoDeduplicationCacheOnPurge;
99+
this.snsService = snsService;
94100
loadPersistedMessages();
95101
loadPersistedDedup();
96102
}
@@ -618,6 +624,9 @@ public void purgeQueue(String queueUrl, String region) {
618624
if (dedupStore != null) {
619625
dedupStore.delete(storageKey);
620626
}
627+
if (snsService != null) {
628+
snsService.clearFifoDeduplicationCacheForSqsQueueSubscriptions(queueUrl, region);
629+
}
621630
}
622631
LOG.infov("Purged queue{0}: {1}",
623632
clearFifoDeduplicationCacheOnPurge ? " (dedup cache cleared)" : "", queueUrl);

src/test/java/io/github/hectorvent/floci/services/sns/SnsSqsFanoutFifoDeliveryTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,34 @@ void publish_duplicateMessage_isDeduplicatedAtTopicLevel() {
164164
assertEquals(1, messages.size());
165165
assertTrue(messages.get(0).getBody().contains("first"));
166166
}
167+
168+
@Test
169+
void clearFifoDedupForSubscribedQueue_thenRepublishWithSameDedupId_deliversAgain() {
170+
RegionResolver regionResolver = new RegionResolver(REGION, ACCOUNT);
171+
SqsService purgeSqsService = SqsServiceFactory.createInMemoryWithFifoDedupPurgeAndSns(
172+
BASE_URL, regionResolver, snsService);
173+
sqsService.createQueue("manual-sns-dedup-queue.fifo", Map.of("FifoQueue", "true"), REGION);
174+
purgeSqsService.createQueue("manual-sns-dedup-queue.fifo", Map.of("FifoQueue", "true"), REGION);
175+
String queueArn = "arn:aws:sqs:" + REGION + ":" + ACCOUNT + ":manual-sns-dedup-queue.fifo";
176+
177+
snsService.createTopic("manual-sns-dedup-topic.fifo", Map.of("FifoTopic", "true"), null, REGION);
178+
String topicArn = "arn:aws:sns:" + REGION + ":" + ACCOUNT + ":manual-sns-dedup-topic.fifo";
179+
snsService.subscribe(topicArn, "sqs", queueArn, REGION, Map.of());
180+
181+
String queueUrl = BASE_URL + "/" + ACCOUNT + "/manual-sns-dedup-queue.fifo";
182+
183+
snsService.publish(topicArn, null, null, "before-clear", null, null, "group-1", "shared-dedup", REGION);
184+
List<Message> first = sqsService.receiveMessage(queueUrl, 10, 30, 0, REGION);
185+
assertEquals(1, first.size());
186+
for (Message m : first) {
187+
sqsService.deleteMessage(queueUrl, m.getReceiptHandle(), REGION);
188+
}
189+
190+
purgeSqsService.purgeQueue(queueUrl, REGION);
191+
192+
snsService.publish(topicArn, null, null, "after-clear", null, null, "group-1", "shared-dedup", REGION);
193+
List<Message> after = sqsService.receiveMessage(queueUrl, 10, 30, 0, REGION);
194+
assertEquals(1, after.size());
195+
assertTrue(after.getFirst().getBody().contains("after-clear"));
196+
}
167197
}

src/test/java/io/github/hectorvent/floci/services/sqs/SqsServiceFactory.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.github.hectorvent.floci.core.common.RegionResolver;
44
import io.github.hectorvent.floci.core.storage.InMemoryStorage;
5+
import io.github.hectorvent.floci.services.sns.SnsService;
56

67
/**
78
* Test helper to create SqsService instances
@@ -12,4 +13,10 @@ public static SqsService createInMemory(String baseUrl, RegionResolver regionRes
1213
return new SqsService(new InMemoryStorage<>(), new InMemoryStorage<>(), new InMemoryStorage<>(),
1314
30, 262144, baseUrl, regionResolver);
1415
}
16+
17+
public static SqsService createInMemoryWithFifoDedupPurgeAndSns(String baseUrl, RegionResolver regionResolver,
18+
SnsService snsService) {
19+
return new SqsService(new InMemoryStorage<>(), new InMemoryStorage<>(), new InMemoryStorage<>(),
20+
30, 262144, baseUrl, regionResolver, true, snsService);
21+
}
1522
}

src/test/java/io/github/hectorvent/floci/services/sqs/SqsServiceTest.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.github.hectorvent.floci.core.common.AwsException;
44
import io.github.hectorvent.floci.core.common.RegionResolver;
55
import io.github.hectorvent.floci.core.storage.InMemoryStorage;
6+
import io.github.hectorvent.floci.services.sns.SnsService;
67
import io.github.hectorvent.floci.services.sqs.model.Message;
78
import io.github.hectorvent.floci.services.sqs.model.Queue;
89
import org.junit.jupiter.api.BeforeEach;
@@ -12,6 +13,8 @@
1213
import java.util.Map;
1314

1415
import static org.junit.jupiter.api.Assertions.*;
16+
import static org.mockito.Mockito.mock;
17+
import static org.mockito.Mockito.verify;
1518

1619
class SqsServiceTest {
1720

@@ -353,7 +356,7 @@ void fifoQueueIgnoresPerMessageDelaySeconds() {
353356
void purgeQueueClearsFifoDeduplicationCacheWhenEnabled() {
354357
final var service = new SqsService(
355358
new InMemoryStorage<>(), new InMemoryStorage<>(), new InMemoryStorage<>(),
356-
30, 262144, BASE_URL, new RegionResolver("us-east-1", "000000000000"), true);
359+
30, 262144, BASE_URL, new RegionResolver("us-east-1", "000000000000"), true, null);
357360

358361
final var queue = service.createQueue("dedup-clear.fifo", Map.of("ContentBasedDeduplication", "true"));
359362

@@ -406,7 +409,7 @@ void purgeQueueClearsDedupStoreWhenEnabled() {
406409
final var dedupStore = new InMemoryStorage<String, Map<String, Long>>();
407410
final var service = new SqsService(
408411
new InMemoryStorage<>(), new InMemoryStorage<>(), dedupStore,
409-
30, 262144, BASE_URL, new RegionResolver("us-east-1", "000000000000"), true);
412+
30, 262144, BASE_URL, new RegionResolver("us-east-1", "000000000000"), true, null);
410413

411414
final var queue = service.createQueue("dedup-store-clear.fifo",
412415
Map.of("ContentBasedDeduplication", "true"));
@@ -421,4 +424,16 @@ void purgeQueueClearsDedupStoreWhenEnabled() {
421424
assertTrue(dedupStore.keys().isEmpty(),
422425
"Dedup store must be empty after purge with clearFifoDeduplicationCacheOnPurge=true");
423426
}
427+
428+
@Test
429+
void purgeQueueWithClearFifoDelegatesToSnsForFifoDedupOnSubscribedTopics() {
430+
final var sns = mock(SnsService.class);
431+
final var service = new SqsService(
432+
new InMemoryStorage<>(), new InMemoryStorage<>(), new InMemoryStorage<>(),
433+
30, 262144, BASE_URL, new RegionResolver("us-east-1", "000000000000"), true, sns);
434+
final var queue = service.createQueue("sns-dedup-delegate.fifo", Map.of("FifoQueue", "true"));
435+
service.purgeQueue(queue.getQueueUrl());
436+
verify(sns).clearFifoDeduplicationCacheForSqsQueueSubscriptions(
437+
queue.getQueueUrl(), "us-east-1");
438+
}
424439
}

0 commit comments

Comments
 (0)