Changeset 4114

Show
Ignore:
Timestamp:
10/09/07 14:40:12 (1 year ago)
Author:
sylvain.hellegouarch
Message:

Cleaned up the queue server service and made it more resilient to faults

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/nuxleus/Scripts/llup-pub.sh

    r4107 r4114  
    4242  echo "Starting the llup publisher server on port $PORT against queue server at $QUEUE_SERVER and monitoring $2" 
    4343  # add -v to the line below to enable logging to the console 
    44    llup-queue-publisher.py -s $QUEUE_SERVER -q $2 -i $PID_FILE -f 5.0 & 
     44   llup-queue-publisher.py -s $QUEUE_SERVER -q $2 -i $PID_FILE -f 50.0 & 
    4545;; 
    4646 
  • trunk/nuxleus/src/Nuxleus.Messaging/MessageQueueService.cs

    r4112 r4114  
    1919namespace Nuxleus.Messaging { 
    2020  public delegate void MessageEventHandler(ISocketConnection sender, IMessage m); 
    21   public delegate void QueueEventHandler(object sender); 
    22   public delegate void QueueFailureEventHandler(object sender, Exception ex); 
     21  public delegate void QueueEventHandler(ISocketConnection sender); 
     22  public delegate void QueueFailureEventHandler(ISocketConnection sender, Exception ex); 
    2323 
    2424  public class MessageQueueService : BaseSocketService { 
     
    4848      connection.BeginReceive(); 
    4949      if(Connected != null) { 
    50         Connected(this); 
     50        Connected(this.Connection); 
    5151      } 
    5252      ConnectEvent.Set(); 
     
    5555    public override void OnSent(MessageEventArgs e) { 
    5656      if(Sent != null) { 
    57         Sent(this); 
     57        Sent(e.Connection); 
    5858      } 
    5959      SentEvent.Set(); 
     
    7373    public override void OnDisconnected(ConnectionEventArgs e) { 
    7474      if(Disconnected != null) { 
    75         Disconnected(this); 
     75        Disconnected(this.Connection); 
    7676      } 
    7777      DisconnectEvent.Set(); 
     
    8080    public override void OnException(ExceptionEventArgs e) { 
    8181      if(Failure != null) { 
    82         Failure(this, e.Exception); 
     82        Failure(this.Connection, e.Exception); 
    8383      } 
    8484      ExceptionEvent.Set(); 
  • trunk/nuxleus/src/Nuxleus.Messaging/QS/BuckerServerService.cs

    r4112 r4114  
    1  
     1// 
     2// BuckerServerService.cs: Bucker queue service implementation 
     3// See: http://trac.defuze.org/wiki/bucker 
     4// 
     5// Author: 
     6//   Sylvain Hellegouarch (sh@3rdandurban.com) 
     7// 
     8// Copyright (C) 2007, Sylvain Hellegouarch 
     9//  
    210using System; 
    311using System.Collections; 
     12using System.Collections.Generic; 
    413using Memcached.ClientLibrary; 
    514using Nuxleus.Bucker; 
     
    1019  public class BuckerServerService { 
    1120    private static DateTime origin = new DateTime(1970, 1, 1, 0, 0, 0, 0).ToUniversalTime(); 
     21    private static char[] comma = {','}; 
    1222 
    1323    private MessageQueueService service = null; 
    1424    private MemcachedClient mc = null; 
    1525    private string rootQueueId = null; 
     26    private IList<IntPtr> clientsToDisconnect = new List<IntPtr>(); 
    1627 
    1728    public BuckerServerService(string[] servers, string rootQueueId) { 
    1829      this.rootQueueId = rootQueueId; 
    1930 
    20       SockIOPool pool = SockIOPool.GetInstance(); 
     31      SockIOPool pool = SockIOPool.GetInstance("bucker-queue-server"); 
    2132      pool.SetServers(servers); 
    2233      pool.InitConnections = 3; 
     
    3748    } 
    3849 
     50    public void Close() { 
     51      SockIOPool pool = SockIOPool.GetInstance("bucker-queue-server"); 
     52      pool.Shutdown(); 
     53    } 
     54 
    3955    public MessageQueueService Service { 
    4056      get { return service; } 
     
    4258        service = value; 
    4359        service.Received += new MessageEventHandler(this.MessageReceived); 
    44         service.Connected += new QueueEventHandler(this.ClientConnected); 
     60        service.Sent += new QueueEventHandler(this.MessageSent); 
     61        //service.Connected += new QueueEventHandler(this.ClientConnected); 
    4562        service.Failure += new QueueFailureEventHandler(this.FailureRaised); 
    4663      } 
    4764    } 
    4865 
    49     private void FailureRaised(object sender, Exception ex) { 
     66    private void FailureRaised(ISocketConnection sender, Exception ex) { 
    5067      Console.WriteLine(ex.ToString()); 
    51     } 
    52  
    53     private void ClientConnected(object sender) { 
    54       Console.WriteLine("connected"); 
     68       
     69      // since the client generated an exceptipon we will disconnect it 
     70      // as soon as the error message below is sent 
     71      clientsToDisconnect.Add(sender.SocketHandle); 
     72 
     73      // we warn the client something happened 
     74      Nuxleus.Bucker.Message m = new Nuxleus.Bucker.Message(); 
     75      m.Type = "error"; 
     76      m.Op = null; 
     77      m.Error = new Nuxleus.Bucker.Error(); 
     78      m.Error.Type = "internal-server-error"; 
     79      m.Error.Code = 500; 
     80      sender.BeginSend(Nuxleus.Bucker.Message.Serialize(m)); 
     81    } 
     82 
     83    private void ClientConnected(ISocketConnection sender) { 
     84      // nothing 
     85    } 
     86 
     87    private void MessageSent(ISocketConnection sender) { 
     88      // In case the last message sent to the client was to warn it an error occured 
     89      // we should have its handle in there and just close the connection 
     90      if(clientsToDisconnect.Contains(sender.SocketHandle)) { 
     91        clientsToDisconnect.Remove(sender.SocketHandle); 
     92        sender.BeginDisconnect(); 
     93      } 
    5594    } 
    5695 
     
    179218        queues = queues.Replace(qid, ""); 
    180219        queues = queues.Replace(",,", ""); 
    181         char[] comma = {','}; 
    182220        queues = queues.Trim(comma); 
    183221        mc.Set(rootQueueId, queues); 
     
    294332        keys = keys.Replace(m.MessageId, ""); 
    295333        keys = keys.Replace(",,", ""); 
    296         char[] comma = {','}; 
    297334        keys = keys.Trim(comma); 
    298335        mc.Set(m.QueueId, keys); 
     
    338375            unread = unread.Replace(m.MessageId, ""); 
    339376            unread = unread.Replace(",,", ""); 
    340             char[] comma = {','}; 
    341377            unread = unread.Trim(comma); 
    342378            mc.Set(unreadKey, unread); 
     
    379415 
    380416    private void MessageReceived(ISocketConnection sender, IMessage message) { 
     417      if(clientsToDisconnect.Contains(sender.SocketHandle)) { 
     418        // if the client has been scheduled to be closed we don't process any of its 
     419        // incoming data 
     420        return; 
     421      } 
     422 
    381423      Nuxleus.Bucker.Message m = Nuxleus.Bucker.Message.Parse(message.InnerMessage); 
    382424      Nuxleus.Bucker.Message responseToSend = null; 
    383425 
     426      Console.WriteLine(m.ToString()); 
     427 
    384428      switch(m.Op.Type) { 
     429      case OperationType.GetMessage: 
     430        responseToSend = HandleGetMessageRequest(m); 
     431        break; 
     432      case OperationType.ListMessages: 
     433        responseToSend = HandleListMessagesRequest(m); 
     434        break; 
     435      case OperationType.PushMessage: 
     436        responseToSend = HandlePushMessageRequest(m); 
     437        break; 
     438      case OperationType.DeleteMessage: 
     439        responseToSend = HandleDeleteMessageRequest(m); 
     440        break; 
    385441      case OperationType.NewQueue: 
    386442        responseToSend = HandleNewQueueRequest(m); 
     
    392448        responseToSend = HandleListQueuesRequest(m); 
    393449        break; 
    394       case OperationType.ListMessages: 
    395         responseToSend = HandleListMessagesRequest(m); 
    396         break; 
    397       case OperationType.PushMessage: 
    398         responseToSend = HandlePushMessageRequest(m); 
    399         break; 
    400       case OperationType.DeleteMessage: 
    401         responseToSend = HandleDeleteMessageRequest(m); 
    402         break; 
    403       case OperationType.GetMessage: 
    404         responseToSend = HandleGetMessageRequest(m); 
     450      default: 
     451        responseToSend = new Nuxleus.Bucker.Message(); 
     452        responseToSend.Type = "error"; 
     453        responseToSend.Op = null; 
     454        responseToSend.Error = new Nuxleus.Bucker.Error(); 
     455        responseToSend.Error.Type = "operation-not-allowed"; 
     456        responseToSend.Error.Code = 405; 
    405457        break; 
    406458      } 
  • trunk/nuxleus/src/Nuxleus.PubSub/Bucker/message.cs

    r4107 r4114  
    1818{ 
    1919  public enum OperationType { 
     20    Unknown, 
    2021    ListQueues, 
    2122    NewQueue, 
     
    7980    } 
    8081 
    81     OperationType type
     82    OperationType type = OperationType.Unknown
    8283    [XmlIgnoreAttribute()] 
    8384    public OperationType Type { 
     
    8586      set { type = value; } 
    8687    } 
     88 
     89    [XmlAnyElement] 
     90    public XmlElement[] ForeignElements; 
    8791 
    8892    [XmlElement ("new-queue")] 
  • trunk/nuxleus/src/nux.build

    r4112 r4114  
    279279  </target> 
    280280  <target name="Nuxleus.Messaging" depends="init Nuxleus.PubSub Nuxleus.Vendor"> 
    281     <csc target="library" output="${build.dir}/Nuxleus.Messaging.dll" debug="true"
     281    <csc target="library" output="${build.dir}/Nuxleus.Messaging.dll"
    282282      <sources> 
    283283        <include name="${source.dir}/Nuxleus.Messaging/**/*.cs"/>