Skip to content

Commit 710903c

Browse files
authored
Merge pull request #5 from mehyaa/rabbitmq-upgrade
RabbitMQ.Client package upgraded
2 parents 1a9adf4 + b46e9f6 commit 710903c

7 files changed

Lines changed: 112 additions & 86 deletions

File tree

src/Convey.MessageBrokers.RabbitMQ/src/Convey.MessageBrokers.RabbitMQ/Clients/RabbitMqClient.cs

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,16 @@
44
using System;
55
using System.Collections.Concurrent;
66
using System.Collections.Generic;
7+
using System.Threading;
8+
using System.Threading.Tasks;
79

810
namespace Convey.MessageBrokers.RabbitMQ.Clients;
911

1012
internal sealed class RabbitMqClient : IRabbitMqClient
1113
{
1214
private const string EmptyContext = "{}";
1315

14-
private readonly object _lockObject = new();
16+
private readonly SemaphoreSlim _semaphore = new(1, 1);
1517

1618
private readonly AppOptions _appOptions;
1719
private readonly IConnection _connection;
@@ -24,7 +26,7 @@ internal sealed class RabbitMqClient : IRabbitMqClient
2426
private readonly bool _persistMessages;
2527
private readonly int _maxChannels;
2628

27-
private readonly ConcurrentDictionary<int, IModel> _channels = new();
29+
private readonly ConcurrentDictionary<int, IChannel> _channels = new();
2830

2931
private int _channelsCount;
3032

@@ -48,21 +50,24 @@ public RabbitMqClient(
4850
_maxChannels = options.MaxProducerChannels <= 0 ? 1000 : options.MaxProducerChannels;
4951
}
5052

51-
public void Send(
53+
public async Task SendAsync(
5254
object message,
5355
IConvention convention,
5456
string messageId = null,
5557
string correlationId = null,
5658
string spanContext = null,
5759
object messageContext = null,
58-
IDictionary<string, object> headers = null)
60+
IDictionary<string, object> headers = null,
61+
CancellationToken cancellationToken = default)
5962
{
6063
var threadId = Environment.CurrentManagedThreadId;
6164

6265
if (!_channels.TryGetValue(threadId, out var channel))
6366
{
64-
lock (_lockObject)
67+
try
6568
{
69+
await _semaphore.WaitAsync(cancellationToken);
70+
6671
if (_channelsCount >= _maxChannels)
6772
{
6873
throw new InvalidOperationException(
@@ -71,7 +76,7 @@ public void Send(
7176
"Modify `MaxProducerChannels` setting to allow more channels.");
7277
}
7378

74-
channel = _connection.CreateModel();
79+
channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
7580
_channels.TryAdd(threadId, channel);
7681
_channelsCount++;
7782

@@ -84,6 +89,10 @@ public void Send(
8489
_maxChannels);
8590
}
8691
}
92+
finally
93+
{
94+
_semaphore.Release();
95+
}
8796
}
8897
else
8998
{
@@ -97,17 +106,18 @@ public void Send(
97106
}
98107
}
99108

100-
var properties = channel.CreateBasicProperties();
101-
102-
properties.AppId = _appOptions.Service;
103-
properties.ContentEncoding = _serializer.ContentEncoding;
104-
properties.ContentType = _serializer.ContentType;
105-
properties.Persistent = _persistMessages;
106-
properties.MessageId = string.IsNullOrWhiteSpace(messageId) ? Guid.NewGuid().ToString("N") : messageId;
107-
properties.CorrelationId = string.IsNullOrWhiteSpace(correlationId) ? Guid.NewGuid().ToString("N") : correlationId;
108-
properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
109-
properties.Type = convention.Type?.Name;
110-
properties.Headers = new Dictionary<string, object>();
109+
var properties = new BasicProperties
110+
{
111+
AppId = _appOptions.Service,
112+
ContentEncoding = _serializer.ContentEncoding,
113+
ContentType = _serializer.ContentType,
114+
Persistent = _persistMessages,
115+
MessageId = string.IsNullOrWhiteSpace(messageId) ? Guid.NewGuid().ToString("N") : messageId,
116+
CorrelationId = string.IsNullOrWhiteSpace(correlationId) ? Guid.NewGuid().ToString("N") : correlationId,
117+
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()),
118+
Type = convention.Type?.Name,
119+
Headers = new Dictionary<string, object>()
120+
};
111121

112122
if (_contextEnabled)
113123
{
@@ -144,11 +154,22 @@ public void Send(
144154

145155
var body = _serializer.Serialize(message);
146156

147-
channel.BasicPublish(convention.Exchange, convention.RoutingKey, properties, body.ToArray());
157+
await channel.BasicPublishAsync(
158+
exchange: convention.Exchange,
159+
routingKey: convention.RoutingKey,
160+
mandatory: false,
161+
basicProperties: properties,
162+
body: body.ToArray(),
163+
cancellationToken: cancellationToken);
148164
}
149165

150166
private void IncludeMessageContext(object context, IBasicProperties properties)
151167
{
168+
if (properties?.Headers is null)
169+
{
170+
return;
171+
}
172+
152173
if (context is not null)
153174
{
154175
properties.Headers.Add(_contextProvider.HeaderName, _serializer.Serialize(context).ToArray());

src/Convey.MessageBrokers.RabbitMQ/src/Convey.MessageBrokers.RabbitMQ/Convey.MessageBrokers.RabbitMQ.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="8.0.1" />
1414
<PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.1" />
1515
<PackageReference Include="Polly" Version="8.5.2" />
16-
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
16+
<PackageReference Include="RabbitMQ.Client" Version="7.1.2" />
1717
</ItemGroup>
1818

1919
<ItemGroup>

src/Convey.MessageBrokers.RabbitMQ/src/Convey.MessageBrokers.RabbitMQ/Extensions.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ public static IConveyBuilder AddRabbitMq(
100100
SocketWriteTimeout = options.SocketWriteTimeout,
101101
RequestedChannelMax = options.RequestedChannelMax,
102102
RequestedFrameMax = options.RequestedFrameMax,
103-
DispatchConsumersAsync = true,
104103
ContinuationTimeout = options.ContinuationTimeout,
105104
HandshakeContinuationTimeout = options.HandshakeContinuationTimeout,
106105
NetworkRecoveryInterval = options.NetworkRecoveryInterval,
@@ -113,8 +112,8 @@ public static IConveyBuilder AddRabbitMq(
113112
connectionFactoryConfigurator?.Invoke(connectionFactory);
114113

115114
logger.LogDebug("Connecting to RabbitMQ: '{Hostnames}'...", string.Join(", ", options.HostNames));
116-
var consumerConnection = connectionFactory.CreateConnection(options.HostNames.ToList(), $"{options.ConnectionName}_consumer");
117-
var producerConnection = connectionFactory.CreateConnection(options.HostNames.ToList(), $"{options.ConnectionName}_producer");
115+
var consumerConnection = connectionFactory.CreateConnectionAsync(options.HostNames.ToList(), $"{options.ConnectionName}_consumer").GetAwaiter().GetResult();
116+
var producerConnection = connectionFactory.CreateConnectionAsync(options.HostNames.ToList(), $"{options.ConnectionName}_producer").GetAwaiter().GetResult();
118117
logger.LogDebug("Connected to RabbitMQ: '{Hostnames}'", string.Join(", ", options.HostNames));
119118

120119
builder.Services.AddSingleton(new ConsumerConnection(consumerConnection));
Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
using System.Collections.Generic;
2+
using System.Threading;
3+
using System.Threading.Tasks;
24

35
namespace Convey.MessageBrokers.RabbitMQ;
46

57
public interface IRabbitMqClient
68
{
7-
void Send(
9+
Task SendAsync(
810
object message,
911
IConvention convention,
1012
string messageId = null,
1113
string correlationId = null,
1214
string spanContext = null,
1315
object messageContext = null,
14-
IDictionary<string, object> headers = null);
16+
IDictionary<string, object> headers = null,
17+
CancellationToken cancellationToken = default);
1518
}

src/Convey.MessageBrokers.RabbitMQ/src/Convey.MessageBrokers.RabbitMQ/Initializers/RabbitMqExchangeInitializer.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,37 +31,39 @@ public RabbitMqExchangeInitializer(
3131
_loggerEnabled = _options.Logger?.Enabled == true;
3232
}
3333

34-
public Task InitializeAsync(CancellationToken cancellationToken)
34+
public async Task InitializeAsync(CancellationToken cancellationToken)
3535
{
3636
var exchanges =
3737
AppDomain.CurrentDomain
3838
.GetAssemblies()
3939
.SelectMany(a => a.GetTypes())
4040
.Where(t => t.IsDefined(typeof(MessageAttribute), false))
41-
.Select(t => t.GetCustomAttribute<MessageAttribute>().Exchange)
41+
.Select(t => t.GetCustomAttribute<MessageAttribute>()?.Exchange)
4242
.Distinct()
4343
.ToArray();
4444

45-
using var channel = _connection.CreateModel();
45+
await using var channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
4646

4747
if (_options.Exchange?.Declare == true)
4848
{
4949
Log(_options.Exchange.Name, _options.Exchange.Type);
5050

51-
channel.ExchangeDeclare(
51+
await channel.ExchangeDeclareAsync(
5252
_options.Exchange.Name,
5353
_options.Exchange.Type,
5454
_options.Exchange.Durable,
55-
_options.Exchange.AutoDelete);
55+
_options.Exchange.AutoDelete,
56+
cancellationToken: cancellationToken);
5657

5758
if (_options.DeadLetter?.Enabled is true &&
5859
_options.DeadLetter?.Declare is true)
5960
{
60-
channel.ExchangeDeclare(
61+
await channel.ExchangeDeclareAsync(
6162
$"{_options.DeadLetter.Prefix}{_options.Exchange.Name}{_options.DeadLetter.Suffix}",
6263
ExchangeType.Direct,
6364
_options.Exchange.Durable,
64-
_options.Exchange.AutoDelete);
65+
_options.Exchange.AutoDelete,
66+
cancellationToken: cancellationToken);
6567
}
6668
}
6769

@@ -74,12 +76,10 @@ public Task InitializeAsync(CancellationToken cancellationToken)
7476

7577
Log(exchange, DefaultType);
7678

77-
channel.ExchangeDeclare(exchange, DefaultType, true);
79+
await channel.ExchangeDeclareAsync(exchange, DefaultType, true, cancellationToken: cancellationToken);
7880
}
7981

80-
channel.Close();
81-
82-
return Task.CompletedTask;
82+
await channel.CloseAsync(cancellationToken: cancellationToken);
8383
}
8484

8585
private void Log(string exchange, string type)

0 commit comments

Comments
 (0)