I've been using a library called Rebus to handle everything in regard to message communication between services, that are using Azure Servicebus as their transport layer. Rebus makes it super easy to implement a Command/Event architecture by exposing an easy pattern for sending messages on a bus and listening for messages on a bus.
Each service gets its own input and error queues, like a mailbox. Then you just deliver the messages to the right mailbox, and the service will handle it.
Rebus has a variety of transport mechanisms for the messages, we are heavily using Azure Servicebus for transport, but Rebus makes the transport mechanism an implementation detail that you don't have to worry about in your daily development.
This post is just a tiny wrap up of an issue that I've encountered a few times. here goes:
Rebus is processing messages vi Handler methods, for instance this will handle the message CreateInfrastructureCommand
public async Task Handle(CreateInfrastructureCommand msg) {
await _infrastructureService.CreateWebsite();
await _infrastructureService.CreateDatabase();
await _infrastructureService.CreateHostnames();
await _infrastructureService.ConfigureCdn();
}
The above is something that one of our handlers could be doing. Then we decided that we want some feedback in between each step, so for instance a SignalR service or the like could listen and show some progress, then the code would look like this:
public async Task Handle(CreateInfrastructureCommand msg) {
await _infrastructureService.CreateWebsite();
await _bus.Publish(new ReportProgressEvent("WebsiteCreated"));
await _infrastructureService.CreateDatabase();
await _bus.Publish(new ReportProgressEvent("DatabaseCreated"));
await _infrastructureService.CreateHostnames();
await _bus.Publish(new ReportProgressEvent("HostnamesCreated"));
await _infrastructureService.ConfigureCdn();
await _bus.Publish(new ReportProgressEvent("CndConfigured"));
}
Nice right? Well here comes the problem. Rebus handles everything that happens within a Handle method in the same scope, kind of like a transaction, where it won't continue with new messages on the bus(the published messages), until it knows the current handler has finished without errors.
This makes good sense because else messages might interfere with each other and create some really weird concurrency issues and other errors.
The result of that behavior is that the ReportProgressEvents, won't be sent when the line of code has been executed, but they will be sent once the Handle method is done. This means that all 4 _bus.Publish will be put on the bus at the exact same time. This ruins the idea of the messages being used to follow progress.
To fix it - there wasn't any clear suggestions on the Rebus wikis, but looking through StackOverflow and Rebus's issues, a solution could be found.
We have to break out of the current transaction, if we want the messages to be sent immediately. This is taken directly from Rebus's issue tracker (and the authors comments). at https://github.com/rebus-org/Rebus/issues/779 - so I can't take any credit
public class SuppressRebusTransactionContext : IDisposable
{
readonly ITransactionContext transactionContext = AmbientTransactionContext.Current;
public SuppressRebusTransactionContext() => AmbientTransactionContext.SetCurrent(null);
public void Dispose() => AmbientTransactionContext.SetCurrent(transactionContext);
}
The code can then be used by adding a using statement around the SuppressRebusTransactionContext, where you place the _bus.Publish within. For the initial code example, it ends up like this
public async Task Handle(CreateInfrastructureCommand msg) {
await _infrastructureService.CreateWebsite();
await SendProgress("WebsiteCreated");
await _infrastructureService.CreateDatabase();
await SendProgress("DatabaseCreated");
await _infrastructureService.CreateHostnames();
await SendProgress("HostnamesCreated");
await _infrastructureService.ConfigureCdn();
await SendProgress("CndConfigured");
}
private async Task SendProgress(string message) {
// this being the trick
using(new SuppressRebusTransactionContext()) {
await _bus.Publish(new ReportProgressEvent(message));
}
}
With the above in place, the messages will be sent when we expect them to be sent, and we can setup subscribers that can follow the progress.
A note on the implementation - use with care! - There is a reason for the design and how Rebus handles the messages. For instance if the messages are processed by the same service you are in, this would be a bad idea. In our case we are publishing the event to a topic, which a few notification services will pick up - for instance SignalR that in term notifies a UI.