SmellyQueue (Durable Queue)

This project was a result of a quick Spike I did for a personal project surround durable binary queuing which is firewall friendly and xcopy deployable. Although this was built for my purpose it can easily be reused. As such you are welcome to this early version of the code if you're interested.  

Implementing this is pretty easy... Here is a quick walk-through:

To make use of one or more queues you will need to perform the following:

  • Build the main libraries
  • Configure Client and Server Queues  (sample provided in solution)
  • Create message structure to be queued  (sample provided in solution)
  • Implement Server and Enqueue Message  (sample provided in solution)
  • Implement Client and Listen for messages  (sample provided in solution)

Build the main libraries

1.  Download the source code (SmellyQueues.zip (325.73 kb)) and compile the solution

2.  In the consuming application, add a reference to "Smelser.Storage.dll", "Smelser.Queues", and "Smelser.Container.dll"

Configure Client and Server Queues

3.  In the consuming application config file, register the configuration sections for the queue, storage manager, log4net, and the IoC Container  (sample provided in solution)

<configSections>
  <section name="smelser.queues"
           type="Smelser.Queues.Configuration.QueuesConfigSection, Smelser.Queues" />
  <section name="smelser.container"
           type="Smelser.Container.Configuration.ContainerConfigSection, Smelser.Container" />
  <section name="smelser.storage"
           type="Smelser.Storage.Configuration.StorageConfigSection, Smelser.Storage" />
  <section name="log4net"
           type="log4net.Config.Log4NetConfigurationSectionHandler, Log4net"/>
</configSections>

4.  Register your custom concrete implementations of queue and storage components or use the defaults (sample provided in solution)

<smelser.container>
  <components resolver="Smelser.Container.DependencyResolver, Smelser.Container">
  <!-- Queue components -->
  <component contract="Smelser.Queues.IQueue, Smelser.Queues"
             implementation="Smelser.Queues.Queue, Smelser.Queues" />
  <component contract="Smelser.Queues.Transports.Receiver.IMessageReceiver, Smelser.Queues"
             implementation="Smelser.Queues.Transports.Receiver.MessageReceiver, Smelser.Queues" />
  <component contract="Smelser.Queues.Transports.Sender.IMessageSender, Smelser.Queues"
             implementation="Smelser.Queues.Transports.Sender.MessageSender, Smelser.Queues" />
  <!-- Storage components -->
  <component contract="Smelser.Storage.IStore, Smelser.Storage"
             implementation="Smelser.Storage.Store, Smelser.Storage" />
  <component contract="Smelser.Storage.IStoredItem, Smelser.Storage"
             implementation="Smelser.Storage.StoredItem, Smelser.Storage" />
  <component contract="Smelser.Storage.IIndex, Smelser.Storage"
             implementation="Smelser.Storage.Index, Smelser.Storage" />
  <component contract="Smelser.Storage.IItemIndex, Smelser.Storage"
             implementation="Smelser.Storage.ItemIndex, Smelser.Storage" />
  </components>
</smelser.container>

5.  Configure the queue parameters. The configuration needs to be the same for both the sender and receiver with the exception of the sender attribute (sample provided in solution)

<smelser.queues>
  <!-- Defines the default provider to use if unspecified -->
  <queuemanagers defaultProvider="manager">
    <providers>
      <clear />
      <add name="manager"
           type="Smelser.Queues.QueueProvider, Smelser.Queues"
           defaultqueuename="Foo" />
    </providers>
  </queuemanagers>
  <!-- Defines all configured queues
    - routable (reserved)
    - includequeuedtime (reserved)
    - includesenttime (reserved)
  -->

  <queues routable="false" includequeuedtime="false" includesenttime="false"
    <!-- A queue instance
      - name (unique name of the queue)
      - durable (slower, provides guaranteed delivery of messages)
      - sender (indicates the endpoint is a sender or receiver)
      - maxsize (max size of the queue backlog in bytes before it is stopped)
    -->

    <queue name="Foo" durable="false" sender="false" maxsize="10000000" retryintervalms="10"
           maxretryattempts="5" retryfailureaction="FailQueue">
      <!-- The IP details of the queue -->
      <endpoint address="127.0.0.1" port="10107" />
    </queue>
    <queue name="Bar" durable="false" sender="false" maxsize="10000000" retryintervalms="10"
           maxretryattempts="5" retryfailureaction="FailQueue">
      <endpoint address="127.0.0.1" port="10108" />
    </queue>
    <queue name="Async" durable="false" sender="false" maxsize="10000000" retryintervalms="10"
           maxretryattempts="5" retryfailureaction="FailQueue">
      <endpoint address="127.0.0.1" port="10109" />
    </queue>
  </queues>
</smelser.queues>

6.  If you want to use the durable store (slower), then you will need to set the max store size

<smelser.storage maxStoreSizeInBytes="3000000000" />

7.  Import the namespaces into the consuming class file  (sample provided in solution)

using Smelser.Queues;
using Smelser.Queues.Configuration;
using Smelser.Queues.Messages;

Create message structure to be queued

8.  Create a message to transport. This can be done by implementing the Smelser.Queues.Messages.IMessage interface. This message Type must be understood by both the sender and the receiver. Create messages in a common library and deploy with both parties  (sample provided in solution)

using Smelser.Queues.Messages;

[Serializable]
public sealed class AMessage : IMessage
{   
    public bool Foo { get; set; }
    public int Id { get; set; }
}
/* Alternatively send object using message wrapper around your */
/* type so you don't have to create your own. This also uses an */
/* overload of the extension method to name a queue */
IMessage messageWrapper = new Message<int> { Value = theValue };

Implement Server and Enqueue Message

9.  On the Sender side, create a message and Send using the extension method specifying the target queue name. Sending a message will automatically start the queue on the sender side. Several overloads are available   (sample provided in solution)

/* Send implementation of the IMessage interface */ IMessage aMessage = new AMessage { Foo = true, Id = theValue }; aMessage.Send("TheQueueName");

Implement Client and Listen for messages

10.  On the Receiver side, start the listener for a queue specifying a delegate to process received messages. This can be done using the configuration section  (sample provided in solution)

private static readonly QueuesConfigSection _configuration = QueuesConfigSection.Current; foreach (IQueueProvider provider in Queues.Providers) { foreach (QueueConfigElement queueConfig in _configuration.Queues) { provider.ListenForMessages(queueConfig.Name, MessageArrival); } }

11.  In the delegate, match a message to a type, cast, and use as needed  (sample provided in solution)

private static void MessageArrival(IQueue queue, IMessage message) { if (message is AMessage) { var msg = (AMessage)message; Console.WriteLine(string.Format("Received message on queue {0} with the Id \t {1}", queue.Name, msg.Id)); } }
 

Thats it... Check out the sample applications for more options.