Skip to content

Commit 11c7c48

Browse files
committed
Refactor
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
1 parent 3eefe52 commit 11c7c48

16 files changed

Lines changed: 119 additions & 124 deletions

File tree

src/Host.Plugin.Properties.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<Import Project="Common.NuGet.Properties.xml" />
55

66
<PropertyGroup>
7-
<Version>2.4.0-rc7</Version>
7+
<Version>2.4.0-rc8</Version>
88
</PropertyGroup>
99

1010
</Project>

src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -104,43 +104,42 @@ protected override async Task OnStart()
104104
}
105105
}
106106

107-
protected override async Task<(IReadOnlyCollection<T> Dispatched, Exception Exception)> ProduceToTransport<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
107+
protected override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
108108
{
109109
AssertActive();
110110

111111
var dispatched = new List<T>(envelopes.Count);
112112
try
113113
{
114-
var messages = envelopes
114+
var messagesByPartition = envelopes
115115
.Where(x => x.Message != null)
116-
.Select(
117-
envelope =>
118-
{
119-
var messageType = envelope.Message?.GetType();
120-
var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message);
116+
.Select(envelope =>
117+
{
118+
var messageType = envelope.Message?.GetType();
119+
var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message);
121120

122-
_logger.LogDebug("Producing message {Message} of Type {MessageType} on Path {Path} with Size {MessageSize}", envelope.Message, messageType?.Name, path, messagePayload?.Length ?? 0);
121+
_logger.LogDebug("Producing message {Message} of Type {MessageType} on Path {Path} with Size {MessageSize}", envelope.Message, messageType?.Name, path, messagePayload?.Length ?? 0);
123122

124-
var ev = envelope.Message != null ? new EventData(messagePayload) : new EventData();
123+
var ev = envelope.Message != null ? new EventData(messagePayload) : new EventData();
125124

126-
if (envelope.Headers != null)
125+
if (envelope.Headers != null)
126+
{
127+
foreach (var header in envelope.Headers)
127128
{
128-
foreach (var header in envelope.Headers)
129-
{
130-
ev.Properties.Add(header.Key, header.Value);
131-
}
129+
ev.Properties.Add(header.Key, header.Value);
132130
}
131+
}
133132

134-
var partitionKey = messageType != null
135-
? GetPartitionKey(messageType, envelope.Message)
136-
: null;
133+
var partitionKey = messageType != null
134+
? GetPartitionKey(messageType, envelope.Message)
135+
: null;
137136

138-
return (Envelope: envelope, Message: ev, PartitionKey: partitionKey);
139-
})
140-
.GroupBy(x => x.PartitionKey);
137+
return (Envelope: envelope, Message: ev, PartitionKey: partitionKey);
138+
})
139+
.GroupBy(x => x.PartitionKey);
141140

142141
var producer = _producerByPath[path];
143-
foreach (var partition in messages)
142+
foreach (var partition in messagesByPartition)
144143
{
145144
EventDataBatch batch = null;
146145
try
@@ -184,7 +183,7 @@ protected override async Task OnStart()
184183
batch = null;
185184
}
186185

187-
return (dispatched, null);
186+
return new(dispatched, null);
188187
}
189188
finally
190189
{
@@ -194,10 +193,10 @@ protected override async Task OnStart()
194193
}
195194
catch (Exception ex)
196195
{
197-
return (dispatched, ex);
196+
return new(dispatched, ex);
198197
}
199198

200-
return (dispatched, null);
199+
return new(dispatched, null);
201200
}
202201

203202
#endregion

src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -116,15 +116,14 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso
116116
}
117117
}
118118

119-
protected override async Task<(IReadOnlyCollection<T> Dispatched, Exception Exception)> ProduceToTransport<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
119+
protected override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
120120
{
121-
async Task SendBatchAsync(ServiceBusSender senderClient, ServiceBusMessageBatch batch, CancellationToken cancellationToken)
122-
{
123-
await Retry.WithDelay(
121+
Task SendBatchAsync(ServiceBusSender senderClient, ServiceBusMessageBatch batch, CancellationToken cancellationToken) =>
122+
Retry.WithDelay(
124123
async cancellationToken =>
125124
{
126125
await senderClient.SendMessagesAsync(batch, cancellationToken).ConfigureAwait(false);
127-
_logger.LogDebug("Batch of {BatchSize} message(s) dispatched to {path} ({SizeInBytes} bytes)", batch.Count, path, batch.SizeInBytes);
126+
_logger.LogDebug("Batch of {BatchSize} message(s) dispatched to {Path} ({SizeInBytes} bytes)", batch.Count, path, batch.SizeInBytes);
128127
},
129128
(exception, attempt) =>
130129
{
@@ -135,18 +134,16 @@ await Retry.WithDelay(
135134
_logger.LogWarning("Service bus throttled. Backing off (Attempt: {Attempt}).", attempt);
136135
return true;
137136
}
138-
139137
return false;
140138
},
141-
TimeSpan.FromSeconds(2),
142-
TimeSpan.FromSeconds(1),
139+
delay: TimeSpan.FromSeconds(2),
140+
jitter: TimeSpan.FromSeconds(1),
143141
cancellationToken);
144-
}
145142

146143
AssertActive();
147144

148-
var messages = envelopes.Select(
149-
envelope =>
145+
var messages = envelopes
146+
.Select(envelope =>
150147
{
151148
var messageType = envelope.Message?.GetType();
152149
var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message);
@@ -190,12 +187,12 @@ await Retry.WithDelay(
190187
await senderClient.SendMessageAsync(item.ServiceBusMessage, cancellationToken: cancellationToken).ConfigureAwait(false);
191188
_logger.LogDebug("Delivered item {Message} of type {MessageType} to {Path}", item.Envelope.Message, item.Envelope.MessageType?.Name, path);
192189

193-
return ([item.Envelope], null);
190+
return new([item.Envelope], null);
194191
}
195192
catch (Exception ex)
196193
{
197194
_logger.LogDebug(ex, "Producing message {Message} of type {MessageType} to path {Path} resulted in error {Error}", item.Envelope.Message, item.Envelope.MessageType?.Name, path, ex.Message);
198-
return ([], ex);
195+
return new([], ex);
199196
}
200197
}
201198

@@ -233,12 +230,12 @@ await Retry.WithDelay(
233230
batch = null;
234231
}
235232

236-
return (dispatched, null);
233+
return new(dispatched, null);
237234
}
238235
catch (Exception ex)
239236
{
240237
_logger.LogError(ex, "Producing message batch to path {Path} resulted in error {Error}", path, ex.Message);
241-
return (dispatched, ex);
238+
return new(dispatched, ex);
242239
}
243240
finally
244241
{

src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ protected override async ValueTask DisposeAsyncCore()
114114
}
115115
}
116116

117-
protected override async Task<(IReadOnlyCollection<T> Dispatched, Exception Exception)> ProduceToTransport<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
117+
protected override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
118118
{
119119
AssertActive();
120120

@@ -172,10 +172,10 @@ protected override async ValueTask DisposeAsyncCore()
172172
}
173173
catch (Exception ex)
174174
{
175-
return (dispatched, ex);
175+
return new(dispatched, ex);
176176
}
177177

178-
return (dispatched, null);
178+
return new(dispatched, null);
179179
}
180180

181181
protected byte[] GetMessageKey(ProducerSettings producerSettings, Type messageType, object message, string topic)

src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ private IMessageProcessorQueue CreateMessageProcessorQueue(IMessageProcessor<obj
123123
: new MessageProcessorQueue(messageProcessor, LoggerFactory.CreateLogger<MessageProcessorQueue>(), CancellationToken);
124124
}
125125

126-
protected override Task<(IReadOnlyCollection<T> Dispatched, Exception Exception)> ProduceToTransport<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
127-
=> Task.FromResult<(IReadOnlyCollection<T> Dispatched, Exception Exception)>(([], null)); // Not used
126+
protected override Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
127+
=> Task.FromResult<ProduceToTransportBulkResult<T>>(new([], null)); // Not used
128128

129129
public override Task ProduceResponse(string requestId, object request, IReadOnlyDictionary<string, object> requestHeaders, object response, Exception responseException, IMessageTypeConsumerInvokerSettings consumerInvoker)
130130
=> Task.CompletedTask; // Not used to responses

src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
namespace SlimMessageBus.Host.Mqtt;
22

3-
using System.Security.Cryptography;
4-
53
using MQTTnet.Extensions.ManagedClient;
64

75
public class MqttMessageBus : MessageBusBase<MqttMessageBusSettings>
@@ -103,10 +101,10 @@ private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
103101
return Task.CompletedTask;
104102
}
105103

106-
protected override async Task<(IReadOnlyCollection<T> Dispatched, Exception Exception)> ProduceToTransport<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
104+
protected override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
107105
{
108-
var messages = envelopes.Select(
109-
envelope =>
106+
var messages = envelopes
107+
.Select(envelope =>
110108
{
111109
var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message);
112110

@@ -156,10 +154,10 @@ private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
156154
}
157155
catch (Exception ex)
158156
{
159-
return (dispatched, ex);
157+
return new(dispatched, ex);
160158
}
161159
}
162160

163-
return (dispatched, null);
161+
return new(dispatched, null);
164162
}
165163
}

src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ internal async Task<int> SendMessages(IServiceProvider serviceProvider, IOutboxR
252252
outboxMessage =>
253253
{
254254
var message = bus.Serializer.Deserialize(outboxMessage.MessageType, outboxMessage.MessagePayload);
255-
return new EnvelopeWithId(outboxMessage.Id, message, outboxMessage.MessageType, outboxMessage.Headers ?? new Dictionary<string, object>());
255+
return new OutboxBulkMessage(outboxMessage.Id, message, outboxMessage.MessageType, outboxMessage.Headers ?? new Dictionary<string, object>());
256256
})
257257
.Batch(bulkProducer.MaxMessagesPerTransaction ?? defaultBatchSize);
258258

@@ -271,12 +271,12 @@ internal async Task<int> SendMessages(IServiceProvider serviceProvider, IOutboxR
271271
return count;
272272
}
273273

274-
internal async Task<(bool Success, int Published)> DispatchBatchAsync(IOutboxRepository outboxRepository, IMessageBusBulkProducer producer, IMessageBusTarget messageBusTarget, IReadOnlyCollection<EnvelopeWithId> batch, string busName, string path, CancellationToken cancellationToken)
274+
internal async Task<(bool Success, int Published)> DispatchBatchAsync(IOutboxRepository outboxRepository, IMessageBusBulkProducer producer, IMessageBusTarget messageBusTarget, IReadOnlyCollection<OutboxBulkMessage> batch, string busName, string path, CancellationToken cancellationToken)
275275
{
276276
_logger.LogDebug("Publishing batch of {MessageCount} messages to pathGroup {Path} on {BusName} bus", batch.Count, path, busName);
277277

278278
// TOOD: Enclose in a transaction
279-
var results = await producer.ProduceToTransport(batch, path, messageBusTarget, cancellationToken).ConfigureAwait(false);
279+
var results = await producer.ProduceToTransportBulk(batch, path, messageBusTarget, cancellationToken).ConfigureAwait(false);
280280
if (cancellationToken.IsCancellationRequested && results.Dispatched.Count == 0)
281281
{
282282
// if cancellation has been requested, only return if no messages were published
@@ -311,14 +311,14 @@ private static IMasterMessageBus GetBus(ICompositeMessageBus compositeMessageBus
311311
return null;
312312
}
313313

314-
public record EnvelopeWithId : Envelope
314+
public record OutboxBulkMessage : BulkMessageEnvelope
315315
{
316-
public EnvelopeWithId(Guid id, object Message, Type MessageType, IDictionary<string, object> Headers)
317-
: base(Message, MessageType, Headers)
316+
public Guid Id { get; }
317+
318+
public OutboxBulkMessage(Guid id, object message, Type messageType, IDictionary<string, object> headers)
319+
: base(message, messageType, headers)
318320
{
319321
Id = id;
320322
}
321-
322-
public Guid Id { get; }
323323
}
324324
}

src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ protected override async ValueTask DisposeAsyncCore()
128128
}
129129
}
130130

131-
protected override async Task<(IReadOnlyCollection<T> Dispatched, Exception Exception)> ProduceToTransport<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
131+
protected override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
132132
{
133133
await EnsureInitFinished();
134134

@@ -178,9 +178,9 @@ protected override async ValueTask DisposeAsyncCore()
178178
}
179179
catch (Exception ex)
180180
{
181-
return (dispatched, ex);
181+
return new(dispatched, ex);
182182
}
183183

184-
return (dispatched, null);
184+
return new(dispatched, null);
185185
}
186186
}

src/SlimMessageBus.Host.Redis/RedisMessageBus.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,13 @@ void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor<Me
147147

148148
#region Overrides of MessageBusBase
149149

150-
protected override async Task<(IReadOnlyCollection<T> Dispatched, Exception Exception)> ProduceToTransport<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
150+
protected override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
151151
{
152+
#if NETSTANDARD2_0
152153
if (envelopes is null) throw new ArgumentNullException(nameof(envelopes));
154+
#else
155+
ArgumentNullException.ThrowIfNull(envelopes);
156+
#endif
153157

154158
AssertActive();
155159

@@ -184,10 +188,10 @@ void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor<Me
184188
}
185189
catch (Exception ex)
186190
{
187-
return (dispatched, ex);
191+
return new(dispatched, ex);
188192
}
189193

190-
return (dispatched, null);
194+
return new(dispatched, null);
191195
}
192196

193197
#endregion

src/SlimMessageBus.Host.Sql/GlobalUsings.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
global using System.Data;
2-
3-
global using Microsoft.Data.SqlClient;
1+
global using Microsoft.Data.SqlClient;
2+
global using Microsoft.Extensions.DependencyInjection;
43
global using Microsoft.Extensions.DependencyInjection.Extensions;
54
global using Microsoft.Extensions.Logging;
65

0 commit comments

Comments
 (0)