Running Rebus in a .Net Worker Service

Posted on

In this article we are going to show how to run a Rebus service bus inside of .Net Worker.

Introduction to Rebus #

Rebus is an open-source, distributed, asynchronous and durable .Net Service Bus that is both very a powerful way to build applications and microservices, and it is a real pleasure to develop with. It is similar to other .Net Service Buses such as NServiceBus, Mass Transit or EasyNetQ.

Rebus has a focus on simplicity and is very easy to embed into any part of your application (with the possible exception of mobile apps and embedded). If you are not already familiar, here are some links to help you get started:

You might also find my previous article useful, Long-running business processes in C# with Rebus on Azure. There is an equivalent article using F# here.

Introduction to .Net worker services #

.Net Workers are a way to host a background service in .Net. Typically, you would use them for building long-running processes like Windows Services or a Linux deamons. You can also run .Net workers in console apps, desktop apps, and ASP.Net background tasks. Here are some links for further reading:

Source code #

The source code for this article can be found in GitHub here:

https://github.com/seankearon/RebusBackgroundService

Creating the solution #

# Create the solution folder
mkdir RebusBackgroundService
cd RebusBackgroundService

# Create the solution
dotnet new solution --name RebusBackgroundService

# Create the worker project and add it to the solution
dotnet new worker --name RebusWorkerService
dotnet sln RebusBackgroundService.sln add ./RebusWorkerService/RebusWorkerService.csproj

At this point you will have a basic .Net Worker. Inside Program.cs you will see that it constructs a host and registers a worker service:

using RebusWorkerService;

var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddHostedService<Worker>();

var host = builder.Build();
host.Run();

There is a Worker class derived from BackgroundService that is registered into the host and which writes to the log every second.

namespace RebusWorkerService;

public class Worker : BackgroundService
{
    private readonly ILogger<Worker> _logger;

    public Worker(ILogger<Worker> logger)
    {
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            if (_logger.IsEnabled(LogLevel.Information))
            {
                _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
            }
            await Task.Delay(1000, stoppingToken);
        }
    }
}

You can run the project now, and it starts as a console app. As the default logger is the console, you will see output similar to this:

Basic worker logging to the console

So, for our setup we need to include a couple more projects:

  • A simple console that we can use to test our Rebus service.
  • A shared library to hold our Rebus message types.

To do that, run the following from the solution folder:

# Create the shared message types project and add it to the solution
dotnet new classlib --name Messages
dotnet sln RebusBackgroundService.sln add ./Messages/Messages.csproj

# Create the CLI project and add it to the solution
dotnet new console --name Cli
dotnet sln RebusBackgroundService.sln add ./Cli/Cli.csproj

After that, your solution should look like this:

Initial solution structure

Setting up the Rebus bus #

We're focusing on hosting Rebus in a .Net Worker, so we're going to keep our Rebus implementation nice and simple. We're going to:

  • Process a message that just has some simple text for content.
  • When the message is processed by Rebus, it will just write the message's text to the console.

To make it easy to see the messages as they are processed by Rebus, we are going to:

  • Use the file system as the transport mechanism for our messages.
  • Use an audit queue so that we can easily inspect the processed messages.

(If you want to go deeper into setting up Rebus, try this article.)

So, let's get this set up so we can see Rebus doing something in our Worker Service!

Define the message type #

First, we define our simple message class that will carry the text content.

Remove Class1.cs from the Messages project, add a new C# file named SimpleText.cs with the following content:

namespace Messages;

public class SimpleText
{
    public string Message { get; set; }
}

Define the Rebus bus #

The plan here is to create a class to hole our Rebus IBus instance and have a single instance of this running inside our BackgroundService.

Add a reference to Rebus' ServiceProvider package #

We will add a reference to the Rebus NuGet package to our Worker Service project. To do that, run the following from the folder of the RebusWorkerService project:

dotnet add package Rebus.ServiceProvider

This brings in everything we need from rebus for this simple example. For production instances you likely will use other components such as Rebus' Azure ServiceBus etc.

Create a Rebus bus in our Worker Service #

We are going to create and run an instance of Rebus IBus directly inside our Worker service. This follows the approach suggested by Mogens here.

To do that, we first create an IServiceCollection instance in our Worker service:

private readonly IServiceCollection _services = new ServiceCollection();

Then, we use the helpers from the Rebus.ServiceProvider to do our Rebus configuration, which we will do directly inside the Worker service's constructor:

public Worker()
{
    _services.AddLogging(logging => logging.AddConsole());

    _services.AddRebus(
        rebus => rebus
           .Logging  (l => l.Console())
           .Transport(t => t.UseFileSystem("c:/rebus", "MainQueue"))
           .Options  (t => t.RetryStrategy(errorQueueName: "ErrorQueue"))
           .Options  (t => t.EnableMessageAuditing(auditQueue: "AuditQueue"))
    );

    _services.AutoRegisterHandlersFromAssemblyOf<Worker>();
}

To keep things simple, we hard code the file system location we want Rebus' FileSystemTransport to use when processing messages. We route failed messages to an error queue, and we also add message auditing just so that we can see all the messages we process when running this small example. (You would probably not have auditing turned on production.)

The last thing we need to do is to create and start a Rebus IBus instance like so:

await using var provider = _services.BuildServiceProvider();
provider.StartRebus();

Putting that all together makes our Worker class look like this:

using Rebus.Auditing.Messages;
using Rebus.Config;
using Rebus.Retry.Simple;
using Rebus.Transport.FileSystem;

namespace RebusWorkerService;

public class Worker : BackgroundService
{
    private readonly IServiceCollection _services = new ServiceCollection();

    public Worker()
    {
        _services.AddLogging(logging => logging.AddConsole());

        _services.AddRebus(
            rebus => rebus
               .Logging  (l => l.Console())
               .Transport(t => t.UseFileSystem("c:/rebus", "MainQueue"))
               .Options  (t => t.RetryStrategy(errorQueueName: "ErrorQueue"))
               .Options  (t => t.EnableMessageAuditing(auditQueue: "AuditQueue"))
        );

        _services.AutoRegisterHandlersFromAssemblyOf<Worker>();
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await using var provider = _services.BuildServiceProvider();
        provider.StartRebus();

        while (!stoppingToken.IsCancellationRequested)
        {
            await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
        }
    }
}

Handle the messages #

To give Rebus something to do, let's add a simple handler for our SimpleText messages. The handler will just create a short delay, then log the message contents to the console.

Add a file named SimpleTextHandler.cs to the RebusServiceWorker project. The contents of which will be:

using Messages;
using Rebus.Handlers;

namespace RebusWorkerService;

public class SimpleTextHandler: IHandleMessages<SimpleText>
{
    public async Task Handle(SimpleText m)
    {
        await Task.Delay(10000); // Pretend we're doing some work!
        Console.WriteLine(m.Message);
    }
}

Remember that we already told Rebus to find and register all handlers from this project using the following line:

public Worker()
{
    ...snip...
    _services.AutoRegisterHandlersFromAssemblyOf<Worker>();
}

So, we now have a working bus - job done!

Build the CLI to send our messages #

Now, we just need a way to send messages to Rebus. Let's update the Cli project we added earlier to build a quick and dirty console app to send our messages.

Let's add the Rebus.ServiceProvider NuGet package to the project by running the following from the folder of the Cli project:

dotnet add package Rebus.ServiceProvider

Add the following code to the Program.cs file@

using Messages;
using Microsoft.Extensions.DependencyInjection;
using Rebus.Bus;
using Rebus.Config;
using Rebus.Retry.Simple;
using Rebus.Routing.TypeBased;
using Rebus.Transport.FileSystem;

var services = new ServiceCollection();

services.AddRebus(
    rebus => rebus
       .Logging  (l => l.Console())
       .Routing(r => r.TypeBased().Map<SimpleText>("MainQueue"))
       .Transport(t => t.UseFileSystemAsOneWayClient("c:/rebus"))
       .Options  (t => t.RetryStrategy(errorQueueName: "ErrorQueue"))
);

var provider = services.BuildServiceProvider();
provider.StartRebus();
var bus = provider.GetRequiredService<IBus>();

Console.WriteLine("Enter the message text (type 'Q' to quit):");

while (true)
{
    var input = Console.ReadLine();
    if (input?.ToUpper() == "Q")
    {
        break;
    }

    bus.Send(new SimpleText{Message = input});
}

Console.WriteLine("Goodbye!");

Send messages to Rebus #

Now we can run the Cli project, type a message, and press Enter to send the message to our Rebus message queue. As we're using the FileSystemTransport, that will result in a file being saved into the MainQueue folder which corresponds to the name of our queue.

Message in a folder

The following line in our configuration was responsible for the routing:

.Routing(r => r.TypeBased().Map<SimpleText>("MainQueue"))

If you look into the message, you'll see the metadata that Rebus has wrapped around the message sent by our code.

Message content

Our message content has been encoded into Body:

Message body deserialsed

Process messages in our Worker Service #

Start the RebusWorkerService project and the message will be picked up and processed by our background worker, resulting in the message text being written to the console output.

I had entered "This is the message we want to send" when I created the message in the CLI, which we can see below:

Message output in the console

As expected, Rebus has removed the message it processed, and we can see the audit queue contains what we have processed.

Message removed after processing

Message in the audit queue