Queue services, application callbacks and process-to-process communication

Although sets provide a common simple mechanism of communication between Flower components, in large scale scenarios they may become a bottleneck. Sets use the same storage as the directory therefore the intensity of communication between processors and applications affects overall Flower performance. Queue services allow to avoid this problem making Flower more scalable.

The contract of a queue service is defined by Flower.Services.IQueueService interface. As any queue, it provides two main operations - Enqueue and Dequeue. However, Flower queues differ from the traditional queues in that they synchronize their state with the Flower directory. Dequeue operation optionally accepts a processor pid. If the queue is empty, the queue service enlists the provided pid as a message waiter and switches the process to the waiting state (everything happens in a transaction scope including the saving of the process state, so the queue, the directory and the process state are kept in sync). As a message arrives, the queue service switches one waiter (if any) to the pending state. Such behavior allows to use Enqueue/Dequeue workflow statements over queue services.

A single instance of a queue service is logically not a single queue. An instance sorts messages by recipients identified by arbitrary strings unique within that instance.

Communication via general queues

Queue services may be deployed on an arbitrary host and registered in Flower as any other service. The Flower client API provides a remote WCF proxy for a queue service - Flower.Client.QueueServiceClient.

To use a queue service from workflows, define its client in any service container. Then you can access the queue service using Enqueue/Dequeue as you do with sets.

Suppose you have a service container '/Services/MyContainer' and there is a queue service 'MyQueue' defined in it. Also, suppose in both sender and recipient processes you have a variable 'recipient' containing the recipient id. Then your Enqueue/Dequeue workflow statements will look like this:

.Enqueue(
    _ => "/Services/MyContainer/MyQueue/" + recipient,
    _ => msg,
    _ => MessageFormat.BinXml
)

.Dequeue<MessageType>(
    _ => "/Services/MyContainer/MyQueue/" + recipient,
    (_, msg) => { ... }
)

Default queues

A processor may have a default queue. The default queue is a queue service specified in the processor container configuration (where the processor's callback client is defined). Any queue service may be used as a default queue, however installing a default queue locally on each processor host gives the maximum scalability that can be achieved with Flower. In this case process-to-process communication on high loads will work without interaction with the directory.

To enable a default queue, simply setup its endpoint in the processor's configuration under the id 'queue':

<?xml version="1.0" encoding="utf-8" ?>
<objects xmlns="http://www.springframework.net">
    <object id="processor" type="Flower.Directory.Host.Default.ProcessorClient, Flower.Directory.Host.Default">
        ...
    </object>
    
    <object id="queue" type="Flower.Client.QueueClient, Flower.Client">
        <constructor-arg name="binding">
            <object type="System.ServiceModel.NetTcpBinding">
                <!-- Binding properties. -->
            </object>
        </constructor-arg>
        <constructor-arg name="remoteAddress">
            <object type="System.ServiceModel.EndpointAddress">
                <constructor-arg>net.tcp://host-name:10002/Flower/QueueService</constructor-arg>
            </object>
        </constructor-arg>
    </object>
</objects>


The default queue endpoint must be setup always as remote, even if the service is installed locally on the processor host, because this endpoint is used by all clients - local and remote to the processor.

If all your processors have default queues, you can enqueue messages specifying only the destination process pid and dequeue without specifying the source.

.Enqueue(
    //This is the variable storing the pid of the recipient process.
    _ => recipientPid, 
    _ => msg,
    _ => MessageFormat.BinXml
)

//Waiting for a message in the default queue.
.Dequeue<MessageType>(
    (_, msg) => { ... }
)

In this cases Flower will infer the processor from the pid, obtain its default queue endpoint and send the message to the recipient via the queue service. The recipient will wait for the message on its default queue.

The recipients within the queue service are identified by the pid and the full name of the message type, so if a process waits for a message of the different type than the one been sent, the process simply will not receive the message. However, a process can wait for messages of different types simultaneously using WaitAny construct.

Whether to use sets or queue services?

Sets and queues seem to provide duplicating functionality, however there are significant differences between them. Sets are easy to setup (just a single line of JavaScript or no setup at all for local sets). However, queue services are more scalable. If you are going to host a single application with few queues in a single Flower instance and you have only one vertically scalable database server, you don't need queue services. Use queue services if you need to scale-out. Also, queue services is a convenient way to communicate across multiple Flower instances. Processes of one Flower instance can enqueue messages into the queues of another instance that is impossible with sets.

Application callbacks

Flower client is able to open a queue service endpoint that can be used by the client application to receive callbacks from processes.

To receive callbacks, set EnableCallbacks property of the client settings to true.

var settings = new FlowerClientSettings
{
    EnableCallbaks = true
}

using (var flowerClient = new FlowerClient(directory, settings))
{
    ...
}

Then call AddRecipient method of the Flower client to add message handler.

flowerClient.AddRecipient<MessageType>(
    "RecipientId",
    msg => { ... } //Message handler.
);

The service endpoint of the type Flower.Client.CallbackQueue must be defined in the application's configuration file. A client connected to this endpoint must be registered in the directory as a service. Then you can enqueue (but not dequeue) the callback queue as any other queue service.

If you don't need a recipient anymore, you can remove it by calling RemoveRecipient.

flowerClient.RemoveRecipient("RecipientId");

There may be two communication patterns:
  • An application may register some permanent well-known recipients which will be hard-coded in the destination paths of the Enqueue statements.
  • An application may generate unique recipient ids per-request and send these ids to the processes. The processes will concatenate the ids with the paths of the callback service. The application will register the recipient before starting the process (or sending a message) and unregister as the response message received or timeout exceeded (if a message arrives when the recipient is removed, it is ignored).

If the transaction flow is enabled for the callback queue endpoint, the message handler is executed within the transaction initiated by the processor.

Last edited Oct 26, 2013 at 11:10 AM by dbratus, version 1

Comments

No comments yet.