netmq
netmq copied to clipboard
Executing disk I/O operations breaks dealer-socket
Environment
NetMQ Version: 4.0.1.6
Operating System: Win11 12H2 22000.434
.NET Version: .NET6
Expected behaviour
When executing an I/O operation on the disk, I still expect that the dealer-socket is able to send the message to a router-socket
Actual behaviour
When executing an I/O operation on the disk, the dealer-socket cannot send a message to a router-socket. First I thought it was a Threading issue and converter all the code to synchronous code but to no avail.
Steps to reproduce the behaviour
I will abbreviate code samples for readability
Broker-code (router-socket)
public MessageBroker(IStorageProvider storageProvider, MessageBrokerOptions options) {
_logger = Log.Logger;
_storageProvider = storageProvider;
_options = options;
//this line breaks all the code, without it it runs like expected
_queueDefinitions = new HashSet<QueueDefinition>(storageProvider.ConfiguredQueues.Select(tQueue => new QueueDefinition(tQueue, _options)));
}
public void Connect(CancellationToken cancellationToken = default) {
if (_routerSocket == null) {
if (CheckIfPortInUse(_options.Port)) {
_routerSocket = new RouterSocket();
_routerSocket.Options.RouterMandatory = true;
_routerSocket.Bind($"tcp://{_options.Host}:{_options.Port}");
} else {
_routerSocket = new RouterSocket();
_routerSocket.Connect($"tcp://{_options.Host}:{_options.Port}");
}
var poller = new NetMQPoller { _routerSocket };
_logger.Information($"{nameof(Connect)} begin Thread {Environment.CurrentManagedThreadId}");
_routerSocket.ReceiveReady += (sender, eventArgs) => {
var frames = eventArgs.Socket.ReceiveMultipartMessage();
var queueMessage = frames.Last.ConvertToString();
var queueNameFrame = Encoding.UTF8.GetString(eventArgs.Socket.Options.Identity);
_logger.Information($"Received for queue {queueNameFrame} message => {queueMessage}");
var queueDefinition = _queueDefinitions.SingleOrDefault(x => x.Queue.QueueName.Equals(queueNameFrame));
if (queueDefinition == null) {
throw new ArgumentException(string.Format(Messages.QueueDefinition_Publish_QueueNotFound, queueNameFrame));
}
queueDefinition.Queue.Enqueue(JsonConvert.DeserializeObject<Message>(queueMessage));
};
poller.RunAsync();
_logger.Information($"{nameof(Connect)} after poller Thread {Environment.CurrentManagedThreadId}");
}
_logger.Information($"{nameof(Connect)} end Thread {Environment.CurrentManagedThreadId}");
}
Queue code (dealer-socket)
public QueueDefinition(Queue queue, MessageBrokerOptions messageBrokerOptions) {
_queue = queue;
_dealerSocket = new DealerSocket();
_dealerSocket.Options.Identity = Encoding.UTF8.GetBytes(queue.QueueName);
_messageBrokerOptions = messageBrokerOptions;
}
public Queue Queue { get { return _queue; } }
public void RunQueue() {
_queue.RunAsync();
_dealerSocket.Connect($"tcp://{_messageBrokerOptions.Host}:{_messageBrokerOptions.Port}");
}
public bool Publish<T>(T body, CancellationToken cancellationToken = default) {
Message tMessage = MessageFactory.CreateNewMessage(body, _queue.QueueName, _messageBrokerOptions.MessageExpiry, new JsonSerializerSettings());
return Publish(tMessage, cancellationToken);
}
internal bool Publish(Message tMessage, CancellationToken cancellationToken = default) {
var retryPolicy = Policy
.HandleResult(false)
.WaitAndRetry(_messageBrokerOptions.RetryCount, retryAttempt => _messageBrokerOptions.RetryDelay, (result, timeSpan, retryCount, context) => {
_logger.Warning($"{nameof(Publish)} failed for queue {Queue.QueueName}. Waiting {timeSpan} before next retry. Retry attempt {retryCount}");
if (retryCount == _messageBrokerOptions.RetryCount) {
_logger.Error($"{nameof(Publish)} failed for queue {Queue.QueueName}.");
}
});
var result = retryPolicy.Execute(() => {
try {
_logger.Information($"{nameof(Publish)} start Thread {Environment.CurrentManagedThreadId}");
var message = new NetMQMessage(3);
byte[] messagePayload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(tMessage));
message.Append(messagePayload);
_logger.Information($"Sending NetMQ Message {message}");
var sendComplete = _dealerSocket.TrySendMultipartMessage(message);
return sendComplete;
} catch (Exception ex) {
_logger.Error(ex, $"{nameof(Publish)} failed with {ex.Message}");
return false;
}
});
return result;
}
StorageProvider
public class JsonStorageProvider : IStorageProvider
{
private readonly ILogger _logger;
private readonly string _storagePath;
private readonly string _queueStoragePath;
private readonly JsonSerializerSettings _jsonSerializerSettings = new() {
NullValueHandling = NullValueHandling.Ignore
};
private List<Queue> _configuredQueues = new();
public List<Queue> ConfiguredQueues { get => _configuredQueues; private set => _configuredQueues = value; }
public JsonStorageProvider(ILogger logger) {
_storagePath = Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData);
_queueStoragePath = Path.Combine(_storagePath, "db\\queue.tmq");
_logger = logger;
InitStorage();
}
private void InitStorage() {
_configuredQueues = ReadQueueConfiguration();
}
public List<Queue> ReadQueueConfiguration() {
var serializedQueueConfigurations = ReadJsonFile(_queueStoragePath);
var queues = JsonConvert.DeserializeObject<List<Queue>>(serializedQueueConfigurations, _jsonSerializerSettings);
return queues ?? new List<Queue>();
}
public string ReadJsonFile(string filePath) {
string jsonFile;
using (StreamReader sr = new(filePath)) {
jsonFile = sr.ReadToEnd();
sr.Close();
}
return jsonFile;
}