Skip to content

Commit 006fa25

Browse files
committed
Upgrade AWS SQS to v4
Signed-off-by: Tomasz Maruszak <[email protected]>
1 parent 750f1d2 commit 006fa25

File tree

9 files changed

+36
-25
lines changed

9 files changed

+36
-25
lines changed

src/Host.Plugin.Properties.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<Import Project="Common.NuGet.Properties.xml" />
55

66
<PropertyGroup>
7-
<Version>3.3.3</Version>
7+
<Version>3.3.4</Version>
88
</PropertyGroup>
99

1010
</Project>

src/SlimMessageBus.Host.AmazonSQS/ClientFactory/AbstractTemporaryCredentialsSqsClientProvider.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,16 @@ public async Task EnsureClientAuthenticated()
6363

6464
var assumeRoleResponse = await _stsClient.AssumeRoleAsync(assumeRoleRequest);
6565

66+
var expiration = assumeRoleResponse.Credentials.Expiration
67+
?? throw new InvalidOperationException("STS AssumeRole response did not include an Expiration. This should never happen against real AWS — check test/mocked responses.");
68+
6669
var temporaryCredentials = new SessionAWSCredentials(
6770
assumeRoleResponse.Credentials.AccessKeyId,
6871
assumeRoleResponse.Credentials.SecretAccessKey,
6972
assumeRoleResponse.Credentials.SessionToken
7073
);
7174

72-
var clientCredentialsExpiry = assumeRoleResponse.Credentials.Expiration.AddMinutes(-5); // Renew 5 mins before expiry
75+
var clientCredentialsExpiry = expiration.AddMinutes(-3); // Renew 3 mins before expiry
7376

7477
var client = CreateClient(temporaryCredentials, _clientConfig);
7578
return (client, clientCredentialsExpiry);

src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ private async Task<bool> DeleteMessageBatchByUrl(string queueUrl, IReadOnlyColle
8888
var deleteResponse = await _clientProvider.Client.DeleteMessageBatchAsync(deleteRequest, CancellationToken);
8989

9090
// ToDo: capture failed messages
91-
return deleteResponse.Failed.Count > 0;
91+
return deleteResponse.Failed != null && deleteResponse.Failed.Count > 0;
9292
}
9393

9494
protected override Task OnStart()
@@ -117,22 +117,24 @@ protected async Task Run()
117117
try
118118
{
119119
var messages = await ReceiveMessagesByUrl(queueUrl).ConfigureAwait(false);
120-
foreach (var message in messages)
120+
if (messages != null)
121121
{
122-
Logger.LogDebug("Received message on Queue: {Queue}, MessageId: {MessageId}, Payload: {MessagePayload}", Path, message.MessageId, message.Body);
123-
124-
GetPayloadAndHeadersFromMessage(message, out var messagePayload, out var messageHeaders);
125-
126-
var r = await MessageProcessor.ProcessMessage(new(message, messagePayload), messageHeaders, cancellationToken: CancellationToken).ConfigureAwait(false);
127-
if (r.Exception != null)
122+
foreach (var message in messages)
128123
{
129-
Logger.LogError(r.Exception, "Message processing error - Queue: {Queue}, MessageId: {MessageId}", Path, message.MessageId);
130-
// ToDo: DLQ handling
131-
break;
124+
Logger.LogDebug("Received message on Queue: {Queue}, MessageId: {MessageId}, Payload: {MessagePayload}", Path, message.MessageId, message.Body);
125+
126+
GetPayloadAndHeadersFromMessage(message, out var messagePayload, out var messageHeaders);
127+
128+
var r = await MessageProcessor.ProcessMessage(new(message, messagePayload), messageHeaders, cancellationToken: CancellationToken).ConfigureAwait(false);
129+
if (r.Exception != null)
130+
{
131+
Logger.LogError(r.Exception, "Message processing error - Queue: {Queue}, MessageId: {MessageId}", Path, message.MessageId);
132+
// ToDo: DLQ handling
133+
break;
134+
}
135+
messagesToDelete.Add(message);
132136
}
133-
messagesToDelete.Add(message);
134137
}
135-
136138
if (messagesToDelete.Count > 0)
137139
{
138140
await DeleteMessageBatchByUrl(queueUrl, messagesToDelete).ConfigureAwait(false);

src/SlimMessageBus.Host.AmazonSQS/Services/SqsTopologyCache.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ public async Task<SqsPathMeta> LookupTopic(string path, CancellationToken cancel
120120
{
121121
// proceed to create the queue
122122
}
123+
catch (ArgumentNullException)
124+
{
125+
// ToDo: See issue in v4 SDK: https://github.com/awsdocs/aws-doc-sdk-examples/issues/7615
126+
}
123127
return null;
124128
}
125-
}
129+
}

src/SlimMessageBus.Host.AmazonSQS/Services/SqsTopologyService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ private async Task EnsureSubscription(string topic,
225225

226226
var subscriptions = await _clientProviderSns.Client.ListSubscriptionsByTopicAsync(topicMeta.Arn, cancellationToken);
227227

228-
var subscription = subscriptions.Subscriptions.FirstOrDefault(x => x.Endpoint == queueMeta.Arn && x.Protocol == "sqs");
228+
var subscription = subscriptions.Subscriptions?.FirstOrDefault(x => x.Endpoint == queueMeta.Arn && x.Protocol == "sqs");
229229
if (subscription != null)
230230
{
231231
return; // it exists

src/SlimMessageBus.Host.AmazonSQS/SlimMessageBus.Host.AmazonSQS.csproj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
</PropertyGroup>
1212

1313
<ItemGroup>
14-
<PackageReference Include="AWSSDK.SecurityToken" Version="3.7.401.82" />
15-
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="3.7.400.133" />
16-
<PackageReference Include="AWSSDK.SQS" Version="3.7.400.133" />
14+
<PackageReference Include="AWSSDK.SecurityToken" Version="4.0.2.6" />
15+
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="4.0.1.6" />
16+
<PackageReference Include="AWSSDK.SQS" Version="4.0.1.8" />
1717
</ItemGroup>
1818

1919
<ItemGroup>

src/Tests/SlimMessageBus.Host.AmazonSQS.Test/ClientFactory/SqsCacheServiceTest.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public async Task When_GetMetaWithPreloadOrException_Then_PerformsLookupAndCache
139139

140140
private static GetQueueAttributesResponse CreateQueueAttributesResponse(string queueArn) => new()
141141
{
142-
Attributes = { [SQSConstants.ATTRIBUTE_QUEUE_ARN] = queueArn }
142+
Attributes = new Dictionary<string, string> { [SQSConstants.ATTRIBUTE_QUEUE_ARN] = queueArn }
143143
};
144144

145145
[Fact]

src/Tests/SlimMessageBus.Host.AmazonSQS.Test/SqsMessageBusIt.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,20 @@ void AdditionalSqsSetup(SqsMessageBusSettings cfg)
4141
cfg.TopologyProvisioning.CreateQueueOptions = opts =>
4242
{
4343
// Tag the queue with the creation date
44+
opts.Tags ??= [];
4445
opts.Tags.Add(CreatedDateTag, today);
4546
};
4647
cfg.TopologyProvisioning.CreateTopicOptions = opts =>
4748
{
4849
// Tag the queue with the creation date
50+
opts.Tags ??= [];
4951
opts.Tags.Add(new() { Key = CreatedDateTag, Value = today });
5052
};
5153
cfg.TopologyProvisioning.OnProvisionTopology = async (clientSqs, clientSns, provision, cancellationToken) =>
5254
{
5355
// Remove all older test queues (SQS does not support queue auto deletion)
5456
var r = await clientSqs.ListQueuesAsync(QueueNamePrefix, cancellationToken);
55-
foreach (var queueUrl in r.QueueUrls)
57+
foreach (var queueUrl in r.QueueUrls ?? [])
5658
{
5759
try
5860
{
@@ -71,7 +73,7 @@ void AdditionalSqsSetup(SqsMessageBusSettings cfg)
7173

7274
// Remove all older test topics
7375
var topicsResponse = await clientSns.ListTopicsAsync(cancellationToken);
74-
foreach (var topic in topicsResponse.Topics)
76+
foreach (var topic in topicsResponse.Topics ?? [])
7577
{
7678
var tagsResponse = await clientSns.ListTagsForResourceAsync(new() { ResourceArn = topic.TopicArn }, cancellationToken);
7779
var createdDateTagValue = tagsResponse.Tags.FirstOrDefault(x => x.Key == CreatedDateTag)?.Value;

src/Tests/SlimMessageBus.Host.AmazonSQS.Test/SqsTopologyServiceTest.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public async Task When_EnsureQueue_Then_QueueIsCreated_Given_QueueDoesNotExistAn
150150

151151
var getQueueAttributesResponse = new GetQueueAttributesResponse
152152
{
153-
Attributes = { { QueueAttributeName.QueueArn, "arn:aws:sqs:us-east-1:123456789012:test-queue" } }
153+
Attributes = new Dictionary<string, string> { { QueueAttributeName.QueueArn, "arn:aws:sqs:us-east-1:123456789012:test-queue" } }
154154
};
155155
_sqsClientMock.Setup(x => x.GetQueueAttributesAsync(createQueueResponse.QueueUrl, It.IsAny<List<string>>(), It.IsAny<CancellationToken>()))
156156
.ReturnsAsync(getQueueAttributesResponse);
@@ -367,7 +367,7 @@ public async Task When_DoProvisionTopology_Then_CreatesAllEntities_Given_Produce
367367

368368
var getQueueAttributesResponse = new GetQueueAttributesResponse
369369
{
370-
Attributes = { { QueueAttributeName.QueueArn, "arn:aws:sqs:us-east-1:123456789012:test-queue" } }
370+
Attributes = new Dictionary<string, string> { { QueueAttributeName.QueueArn, "arn:aws:sqs:us-east-1:123456789012:test-queue" } }
371371
};
372372
_sqsClientMock.Setup(x => x.GetQueueAttributesAsync(It.IsAny<string>(), It.IsAny<List<string>>(), It.IsAny<CancellationToken>()))
373373
.ReturnsAsync(getQueueAttributesResponse);

0 commit comments

Comments
 (0)