Sets and inter-process communication in Flower

Inter-process communication is an essential part of Flower. In many cases it is required to pass data between processes that are already running, wait for some event to occur in another process or to provide a process with a large dataset that cannot be passed in one piece. For all such purposes sets can be used.

A set in Flower have a corresponding directory entry that denote the root of the set (or the root subset) and store metadata. Any set can be divided into subsets. A set contain messages which are data-contract-serialized CLR objects of the message type of the set. Messages can be iterated in various orders, so a set can be used as a stack as well as a queue. Processes can be blocked while reading from an empty set or writing to a set of a limited size when it's full. This allows using sets as pipe-like producer-consumer blocking collections.

There are two types of sets - shared and local. Shared sets must be created in the directory under '/Sets/Shared' by administrator. These sets are not explicitly dedicated to any process. Local sets are defined in workflow classes and each process has its own instance of them.

Strictly speaking, a set is just a definition, while messages are contained by subsets. Subsets are the child folders of sets and subsets. A set itself, however, is also a subset - the root. Suppose you have a set '/Sets/Shared/MySet'. You can add messages to it directly, but you can also specify any nested path as the message destination and it will be a subset. If you add messages say by the path '/Sets/Shared/MySet/Level1/Level2/Level3', the folders Level1, Level2, Level3 will be created automatically as the first message arrives and they will be deleted with the last message removed. So, 'Level1/Level2/Level3' is a subset of '/Sets/Shared/MySet'. Capacity limits apply to subsets, not the whole set. So, if the capacity is N, then you may have not more than N messages in each subset, but the number of subsets within a set (and their nesting depth) is unlimited.

Creating sets

To create a shared set, use createOrUpdateSet utility function.

createOrUpdateSet({
    at: '/Sets/Shared/MySet', // Path of the set. 
                              // (There may be nested folders under
                              // '/Sets/Shared' playing role of namespaces.)
    messageType: 'MyTypeName, MyAssembly', // Assembly qualified name
                                           // of the message type.
    capacity: 10 // Optional. Maximum number of messages
                 // in any of subsets of the set.
});


To define local sets, use Flower.Workflow.ISetsBuilder passed to the DefineLocalSets method of a workflow class.

public void DefineLocalSets(ISetsBuilder bld)
{
    bld
        .Set("MySet", typeof(MessageType), 10)
        .Set("Namespace/MyNestedSet", typeof(MessageType));
}

Each time a process of this workflow starts, these sets are created under '/Sets/Local/<pid>'. For example, if pid is #123456, the sets from the example above will be '/Sets/Local/#123456/MySet', '/Sets/Local/#123456/Namespace/MyNestedSet'. Thus, knowing the pid of a process, other processes and applications can put messages into its local sets.

Using sets from applications

Messages can be put, obtained and removed by applications via methods of Flower.Client.IFlowerClient.

To put a single message or multiple messages, use PutMessage and PutMessages accordingly. Putting a message, you can specify its name. This name becomes the name of the corresponding directory entry. If the message name is not specified, the id becomes the name of the message.

To obtain a single message by its path or id, you can use the GetMessage method, but, in most of the cases, you will wish to iterate all or a part of the messages of a subset. By using the GetMessages method you can load messages of a subset sorted by name, creation timestamp, last update timestamp or unsorted.

The RemoveMessage method allows to remove a message by its path or id.

Using sets from processes

In a workflow, you can put a message to a set by using Put statement.

...
.Put
(
    _ => "/Sets/Shared/MySubset", // Path to a subset.
    _ => "MessageName",           // Name of the message (may be null).
    _ => new MessageType { ... }, // The message itself.
    _ => MessageFormat.BinXml,    // The format of the message.
    null
)
...

If the set has limited capacity and it's full, the statement blocks the process until a message arrives.

The ForEach statement allows to iterate messages in a subset.

...
.ForEach<MessageType>
(
    _ => "/Sets/Shared/MySubset", // Path to a subset.
    _ => MessageProperty.Default, // Sort order: Default, Name, UpdateTimestamp
                                  // or CreationTimestamp.
                                  // The default sort order is the order close
                                  // to that in which messages were put
                                  // into the set. It should be used when
                                  // the order is not important.
    _ => SortOrder.Asc,           // The sort direction.
    _ => 100,                     // How many items should be loaded per
                                  // round trip. For small messages it 
                                  // should be greater, for large messages
                                  // it should be less.
    _ => false,                   // Whether to wait for messages if the set
                                  // is empty.
    (messageName, message) =>     // The action invoked for each iterated 
    {                             // message.
        ...                       // In most of the cases in this lambda 
        ...                       // you will copy the message to a
        ...                       // process variable.
    }
)
    ...
.End()
...

The ForEach statement opens a block which is executed for each message. The statements Break and Continue work in ForEach as in other loops. Moreover, you can update or remove the current scope message in the directory by using RemoveCurrent and UpdateCurrent statements.

ForEach can work in blocking and non-blocking mode. In non-blocking mode ForEach breaks when the subset is empty; in blocking mode ForEach will wait for messages to arrive, so it will work infinitely if you don't invoke Break eventually.

Using sets as queues

Flower provides methods for using sets as queues. The Enqueue statement works like Put, but doesn't allow to specify message name. The Dequeue statement waits for a message, loads it and removes from the set.

...
.Dequeue<MessageType>
(
    _ => "/Sets/Shared/MySubset", // Path to a subset.
    message =>                    // The action invoked for the message.
    {
        ...
    }
)
...

Last edited Jun 28, 2013 at 7:52 AM by dbratus, version 4

Comments

No comments yet.