Changeset 4114
- Timestamp:
- 10/09/07 14:40:12 (1 year ago)
- Files:
-
- trunk/nuxleus/Scripts/llup-pub.sh (modified) (1 diff)
- trunk/nuxleus/src/Nuxleus.Messaging/MessageQueueService.cs (modified) (5 diffs)
- trunk/nuxleus/src/Nuxleus.Messaging/QS/BuckerServerService.cs (modified) (9 diffs)
- trunk/nuxleus/src/Nuxleus.PubSub/Bucker/message.cs (modified) (3 diffs)
- trunk/nuxleus/src/nux.build (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/nuxleus/Scripts/llup-pub.sh
r4107 r4114 42 42 echo "Starting the llup publisher server on port $PORT against queue server at $QUEUE_SERVER and monitoring $2" 43 43 # add -v to the line below to enable logging to the console 44 llup-queue-publisher.py -s $QUEUE_SERVER -q $2 -i $PID_FILE -f 5 .0 &44 llup-queue-publisher.py -s $QUEUE_SERVER -q $2 -i $PID_FILE -f 50.0 & 45 45 ;; 46 46 trunk/nuxleus/src/Nuxleus.Messaging/MessageQueueService.cs
r4112 r4114 19 19 namespace Nuxleus.Messaging { 20 20 public delegate void MessageEventHandler(ISocketConnection sender, IMessage m); 21 public delegate void QueueEventHandler( objectsender);22 public delegate void QueueFailureEventHandler( objectsender, Exception ex);21 public delegate void QueueEventHandler(ISocketConnection sender); 22 public delegate void QueueFailureEventHandler(ISocketConnection sender, Exception ex); 23 23 24 24 public class MessageQueueService : BaseSocketService { … … 48 48 connection.BeginReceive(); 49 49 if(Connected != null) { 50 Connected(this );50 Connected(this.Connection); 51 51 } 52 52 ConnectEvent.Set(); … … 55 55 public override void OnSent(MessageEventArgs e) { 56 56 if(Sent != null) { 57 Sent( this);57 Sent(e.Connection); 58 58 } 59 59 SentEvent.Set(); … … 73 73 public override void OnDisconnected(ConnectionEventArgs e) { 74 74 if(Disconnected != null) { 75 Disconnected(this );75 Disconnected(this.Connection); 76 76 } 77 77 DisconnectEvent.Set(); … … 80 80 public override void OnException(ExceptionEventArgs e) { 81 81 if(Failure != null) { 82 Failure(this , e.Exception);82 Failure(this.Connection, e.Exception); 83 83 } 84 84 ExceptionEvent.Set(); trunk/nuxleus/src/Nuxleus.Messaging/QS/BuckerServerService.cs
r4112 r4114 1 1 // 2 // BuckerServerService.cs: Bucker queue service implementation 3 // See: http://trac.defuze.org/wiki/bucker 4 // 5 // Author: 6 // Sylvain Hellegouarch (sh@3rdandurban.com) 7 // 8 // Copyright (C) 2007, Sylvain Hellegouarch 9 // 2 10 using System; 3 11 using System.Collections; 12 using System.Collections.Generic; 4 13 using Memcached.ClientLibrary; 5 14 using Nuxleus.Bucker; … … 10 19 public class BuckerServerService { 11 20 private static DateTime origin = new DateTime(1970, 1, 1, 0, 0, 0, 0).ToUniversalTime(); 21 private static char[] comma = {','}; 12 22 13 23 private MessageQueueService service = null; 14 24 private MemcachedClient mc = null; 15 25 private string rootQueueId = null; 26 private IList<IntPtr> clientsToDisconnect = new List<IntPtr>(); 16 27 17 28 public BuckerServerService(string[] servers, string rootQueueId) { 18 29 this.rootQueueId = rootQueueId; 19 30 20 SockIOPool pool = SockIOPool.GetInstance( );31 SockIOPool pool = SockIOPool.GetInstance("bucker-queue-server"); 21 32 pool.SetServers(servers); 22 33 pool.InitConnections = 3; … … 37 48 } 38 49 50 public void Close() { 51 SockIOPool pool = SockIOPool.GetInstance("bucker-queue-server"); 52 pool.Shutdown(); 53 } 54 39 55 public MessageQueueService Service { 40 56 get { return service; } … … 42 58 service = value; 43 59 service.Received += new MessageEventHandler(this.MessageReceived); 44 service.Connected += new QueueEventHandler(this.ClientConnected); 60 service.Sent += new QueueEventHandler(this.MessageSent); 61 //service.Connected += new QueueEventHandler(this.ClientConnected); 45 62 service.Failure += new QueueFailureEventHandler(this.FailureRaised); 46 63 } 47 64 } 48 65 49 private void FailureRaised( objectsender, Exception ex) {66 private void FailureRaised(ISocketConnection sender, Exception ex) { 50 67 Console.WriteLine(ex.ToString()); 51 } 52 53 private void ClientConnected(object sender) { 54 Console.WriteLine("connected"); 68 69 // since the client generated an exceptipon we will disconnect it 70 // as soon as the error message below is sent 71 clientsToDisconnect.Add(sender.SocketHandle); 72 73 // we warn the client something happened 74 Nuxleus.Bucker.Message m = new Nuxleus.Bucker.Message(); 75 m.Type = "error"; 76 m.Op = null; 77 m.Error = new Nuxleus.Bucker.Error(); 78 m.Error.Type = "internal-server-error"; 79 m.Error.Code = 500; 80 sender.BeginSend(Nuxleus.Bucker.Message.Serialize(m)); 81 } 82 83 private void ClientConnected(ISocketConnection sender) { 84 // nothing 85 } 86 87 private void MessageSent(ISocketConnection sender) { 88 // In case the last message sent to the client was to warn it an error occured 89 // we should have its handle in there and just close the connection 90 if(clientsToDisconnect.Contains(sender.SocketHandle)) { 91 clientsToDisconnect.Remove(sender.SocketHandle); 92 sender.BeginDisconnect(); 93 } 55 94 } 56 95 … … 179 218 queues = queues.Replace(qid, ""); 180 219 queues = queues.Replace(",,", ""); 181 char[] comma = {','};182 220 queues = queues.Trim(comma); 183 221 mc.Set(rootQueueId, queues); … … 294 332 keys = keys.Replace(m.MessageId, ""); 295 333 keys = keys.Replace(",,", ""); 296 char[] comma = {','};297 334 keys = keys.Trim(comma); 298 335 mc.Set(m.QueueId, keys); … … 338 375 unread = unread.Replace(m.MessageId, ""); 339 376 unread = unread.Replace(",,", ""); 340 char[] comma = {','};341 377 unread = unread.Trim(comma); 342 378 mc.Set(unreadKey, unread); … … 379 415 380 416 private void MessageReceived(ISocketConnection sender, IMessage message) { 417 if(clientsToDisconnect.Contains(sender.SocketHandle)) { 418 // if the client has been scheduled to be closed we don't process any of its 419 // incoming data 420 return; 421 } 422 381 423 Nuxleus.Bucker.Message m = Nuxleus.Bucker.Message.Parse(message.InnerMessage); 382 424 Nuxleus.Bucker.Message responseToSend = null; 383 425 426 Console.WriteLine(m.ToString()); 427 384 428 switch(m.Op.Type) { 429 case OperationType.GetMessage: 430 responseToSend = HandleGetMessageRequest(m); 431 break; 432 case OperationType.ListMessages: 433 responseToSend = HandleListMessagesRequest(m); 434 break; 435 case OperationType.PushMessage: 436 responseToSend = HandlePushMessageRequest(m); 437 break; 438 case OperationType.DeleteMessage: 439 responseToSend = HandleDeleteMessageRequest(m); 440 break; 385 441 case OperationType.NewQueue: 386 442 responseToSend = HandleNewQueueRequest(m); … … 392 448 responseToSend = HandleListQueuesRequest(m); 393 449 break; 394 case OperationType.ListMessages: 395 responseToSend = HandleListMessagesRequest(m); 396 break; 397 case OperationType.PushMessage: 398 responseToSend = HandlePushMessageRequest(m); 399 break; 400 case OperationType.DeleteMessage: 401 responseToSend = HandleDeleteMessageRequest(m); 402 break; 403 case OperationType.GetMessage: 404 responseToSend = HandleGetMessageRequest(m); 450 default: 451 responseToSend = new Nuxleus.Bucker.Message(); 452 responseToSend.Type = "error"; 453 responseToSend.Op = null; 454 responseToSend.Error = new Nuxleus.Bucker.Error(); 455 responseToSend.Error.Type = "operation-not-allowed"; 456 responseToSend.Error.Code = 405; 405 457 break; 406 458 } trunk/nuxleus/src/Nuxleus.PubSub/Bucker/message.cs
r4107 r4114 18 18 { 19 19 public enum OperationType { 20 Unknown, 20 21 ListQueues, 21 22 NewQueue, … … 79 80 } 80 81 81 OperationType type ;82 OperationType type = OperationType.Unknown; 82 83 [XmlIgnoreAttribute()] 83 84 public OperationType Type { … … 85 86 set { type = value; } 86 87 } 88 89 [XmlAnyElement] 90 public XmlElement[] ForeignElements; 87 91 88 92 [XmlElement ("new-queue")] trunk/nuxleus/src/nux.build
r4112 r4114 279 279 </target> 280 280 <target name="Nuxleus.Messaging" depends="init Nuxleus.PubSub Nuxleus.Vendor"> 281 <csc target="library" output="${build.dir}/Nuxleus.Messaging.dll" debug="true">281 <csc target="library" output="${build.dir}/Nuxleus.Messaging.dll"> 282 282 <sources> 283 283 <include name="${source.dir}/Nuxleus.Messaging/**/*.cs"/>
