Message Sequence

In many scenarios, it is considered good practice to have an event-driven architecture where message streams of subsequent publish and subscribe moves the business transactions forward. However, there are scenarios where this is not an option. One example is handling web requestes, where the caller synchronously waits for a response.

Alternative to RPC

Consider a user login scenario that is handled with UserLoginRequest and UserLoginResponse.

// normal rpc response
client.RespondAsync<UserLoginRequest, UserLoginResponse>(async (request, context) =>
{
    var result = await Authenticate();
    return new UserLoginResponse {Token = result};
});

// normal rpc request
var respons = await client.RequestAsync<UserLoginRequest, UserLoginResponse>();

There are a few drawbacks of using this pattern. The way RPC is implemented with a private response queue, alternativly a direct-rpc queue, makes the calls private between the requester and responder. This is where the MessageSequence extension can be useful.

// normal subscribe
client.SubscribeAsync<UserLoginRequest>(async (msg, context) =>
{
    var result = await Authenticate();
    await client.PublishAsync(new UserLoginResponse { Token = result}, context.GlobalMessageId);
});

// equivalent message sequence
var sequence = _client.ExecuteSequence(c => c
    .PublishAsync<UserLoginRequest>()
    .Complete<UserLoginResponse>()
);

The return object is a MessageSequence<TComplete> where <TComplete> is the generic type of the .Complete<TComplete> call. The sequence has a Task<TComplete> that completes as the UserLoginResponse is published. The major difference is that the message sequence rely on the message context’s GlobalRequestId to match the response to the request, rather than having a private response queue or correlation id. The recieving end of the UserLoginRequest looks like this

One of the benifits is that the message sequence “response” is actually a publish that is published on the exchange according to the registered INamingConvention. That means that any other subscribers of the LoginResponse can act upon the message.

Multi-message sequence

The MessageSequence extension provides methods to act upon multiple events.

var chain = _client.ExecuteSequence(c => c
    .PublishAsync<UserLoginAttempted>()
    .When<UserGeograficPosition>((msg, ctx) => ActOnGeograficPosition(msg.Position))
    .When<UserContactDetail>((msg, ctx) => ActOnContactDetails(msg.Details))
    .Complete<UserLoggoedIn>()
);

Optional messages in chain

The When call has an optional parameter that can be used to mark a step in the sequence as optional, meaning that if a message that corresponds to a step later in the sequence is recieved, it skips that step.

var chain = _client.ExecuteSequence(c => c
    .PublishAsync<UserLoginAttempted>()
    .When<UserPasswordIsWeak>(
        (msg, ctx) => PromptChangePassword(),
        (cfg) => cfg.IsOptional())
    .Complete<UserLoggoedIn>()
);

Abort sequence premature

The optional parameter for the When also have a method to indicate that if the messagee is recieved, it aborts the execution of the sequence. All handlers that are marked as aborting execution is by default optional.

var chain = _client.ExecuteSequence(c => c
    .PublishAsync<UserLoginAttempted>()
    .When<UserLoginFailed>(
        (msg, ctx) => PromptChangePassword(),
        (cfg) => cfg.AbortsExecution())
    .Complete<UserLoggoedIn>()
);