Changeset 413
- Timestamp:
- 01/22/03 19:32:13 (6 years ago)
- Files:
-
- trunk/RBMessaging/RBMessaging/Model.py (modified) (1 diff)
- trunk/RBTelepathy/RBTelepathy/Connection.py (modified) (2 diffs)
- trunk/RBTelepathy/RBTelepathy/Packet/Builder.py (modified) (2 diffs)
- trunk/RBTelepathy/RBTelepathy/Stream/Protocol.py (modified) (5 diffs)
- trunk/RBTelepathy/RBTelepathy/Stream/SocketAdaptor.py (modified) (4 diffs)
- trunk/RBTelepathy/RBTelepathy/Stream/SocketServer.py (modified) (4 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/RBMessaging/RBMessaging/Model.py
r412 r413 24 24 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 25 25 26 import Connection 27 from RBFoundation import SmartSelect 28 26 29 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 27 30 #~ Definitions 28 31 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 29 32 33 class MessagingModel(object): 34 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 35 #~ Public Methods 36 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 30 37 38 def __init__(self): 39 self.Connections = {} 40 self.Routers = {} 41 42 def Process(self, timeout): 43 return self.SmartSelectCollection.Process(timeout) 44 45 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 46 #~ Properties 47 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 48 49 _SmartSelectCollection = None 50 def _GetSmartSelectCollection(self): 51 if self._SmartSelectCollection is None: 52 self._SmartSelectCollection = SmartSelect.SmartSelectList() 53 return self._SmartSelectCollection 54 def _SetSmartSelectCollection(self, value): 55 self._SmartSelectCollection = value 56 def _DelSmartSelectCollection(self): 57 del self._SmartSelectCollection 58 SmartSelectCollection = property(_GetSmartSelectCollection, None, _DelSmartSelectCollection) 59 trunk/RBTelepathy/RBTelepathy/Connection.py
r412 r413 49 49 50 50 def SendPacket(self, *args, **kw): 51 self.protocol.SendPacket(*args, **kw)51 raise NotImplementedError 52 52 53 53 def OnRoutedPacket(self, packet, *args, **kw): … … 79 79 return packethandler.OnStreamPacket(self, packet, *args, **kw) 80 80 81 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 81 def OnStreamShutdown(self, how): 82 self.log.debug('Stream shutdown by "%s"', how) 82 83 83 class ServerConnection(Connection):84 # TODO:85 # Load Packet.ErrorHandler86 # Load Packet.AuthenticationHandler for Server Connection87 #88 # TODO On Authenticated:89 # Load Packet.ConfigureHandler for Server Connection90 # Load Packet.MessageHandler for Server Connection91 #92 # This process should be controllable by an xml loader93 94 def OnAuthenticated(self, succeeded):95 if succeeded:96 self.log.info('Successful authentication from client')97 else:98 self.log.warn('Failed authentication from client')99 100 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~101 102 class ClientConnection(Connection):103 # TODO:104 # Load Packet.ErrorHandler105 # Load Packet.AuthenticationHandler for Client Connection106 #107 # This process should be controllable by an xml loader108 109 def OnAuthenticated(self, succeeded):110 if succeeded:111 self.log.info('Successfully authenticated to server')112 else:113 self.log.warn('Failed authentication to server')114 trunk/RBTelepathy/RBTelepathy/Packet/Builder.py
r412 r413 27 27 from RBFoundation.XMLClassBuilder import XMLClassBuilder 28 28 from RBFoundation.XMLClassBuilder import ElementFactory as EF 29 from RBMessaging import ErrorTypes 29 30 30 31 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ … … 57 58 58 59 class StreamPacketBuilder(XMLClassBuilder): 60 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 61 #~ Constants / Variables / Etc. 62 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 63 59 64 NamespaceSynonyms = RBNamespaceSynonyms 60 65 DefaultNamespace = RBNamespaceSynonyms[None] 61 BuildPacket = XMLClassBuilder.Parse 66 67 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 68 #~ Public Methods 69 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 70 71 def BuildPacket(self, *args, **kw): 72 try: 73 return self.Parse(*args, **kw) 74 except ErrorTypes.RBMessagingError, e: 75 # Pass these along... 76 raise 77 except StandardError, e: 78 # Change these to PacketErrors 79 raise ErrorTypes.PacketError(str(e)) 80 81 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 82 #~ Protected Methods 83 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 84 85 def _xmlChildFactory(self, owner, parent, node, attributes, namespacemap): 86 try: 87 XMLClassBuilder._xmlChildFactory(self, owner, parent, node, attributes, namespacemap) 88 except KeyError, e: 89 # Well, we don't seem to have a packet for that... Error! =) 90 raise ErrorTypes.PacketHandlerError(str(e)) 62 91 63 92 def _GetOwner(self): trunk/RBTelepathy/RBTelepathy/Stream/Protocol.py
r412 r413 109 109 110 110 if error.shutdown or forceshutdown: 111 self.stream.shutdown( )111 self.stream.shutdown('local') 112 112 else: 113 113 self.log.critical(str(error), exc_info=1) … … 117 117 self.stream.write('''<error xmlns='%s' %s>%s</error>%s''' % (namespace, errortype, errortext, self.delimiter)) 118 118 if forceshutdown: 119 self.stream.shutdown( )119 self.stream.shutdown('local') 120 120 121 121 def OnStreamPacket(self, packet): 122 122 """Called when a packet has completed building. 123 123 Override or reassign to capture message.""" 124 pass 125 126 def OnStreamShutdown(self, how): 124 127 pass 125 128 … … 134 137 if self.currentpacket is None: 135 138 # We expected a packet, so not getting one is an error 136 raise StreamProtocolError, 'Incomplete XML header'139 raise ErrorTypes.StreamProtocolError, 'Incomplete XML header' 137 140 except ErrorTypes.RBMessagingError, rbError: 138 141 if self.StreamError(rbError) is False: … … 143 146 144 147 def _BuildData(self): 145 try: 146 self.data = self.currentpacket.OnStreamData(self.data, self._OnStreamPacket) 147 except ErrorTypes.RBMessagingError, rbError: 148 if self.StreamError(rbError) is False: 149 raise 150 except Exception, error: 151 if self.StreamError(error, True) is False: 152 raise 148 if self.currentpacket is not None: 149 try: 150 self.data = self.currentpacket.OnStreamData(self.data, self._OnStreamPacket) 151 except ErrorTypes.RBMessagingError, rbError: 152 if self.StreamError(rbError) is False: 153 raise 154 except Exception, error: 155 if self.StreamError(error, True) is False: 156 raise 153 157 154 158 def _OnStreamPacket(self, packet): … … 163 167 raise 164 168 169 def _OnStreamShutdown(self, how): 170 return self.OnStreamShutdown(how) 171 trunk/RBTelepathy/RBTelepathy/Stream/SocketAdaptor.py
r412 r413 68 68 write = send 69 69 70 def shutdown(self, how= 0):70 def shutdown(self, how='local', allowread=False, allowwrite=True): 71 71 try: 72 self._socket.shutdown(how) 73 self.isshutdown = True 72 if not self.isshutdown: 73 self.OnShutdown(how) 74 self.log.info('Connection closed by "%s" on %s', how, self._socket.getpeername()) 75 self.isshutdown |= (allowread << 1) | (allowwrite << 0) 76 if self.isshutdown > 0: 77 self._socket.shutdown(self.isshutdown-1) 74 78 except socket.error, exc: 75 if not self._SetSocketError(exc): 76 raise 79 self.log.exception("Socket error on shutdown") 80 81 def OnRecvStreamData(self, data): 82 """Called when data is received from the socket. 83 Override or reassign to capture message.""" 84 pass 85 86 def OnShutdown(self, how): 87 """Called when shutdown of the socket has occured. 88 Override or reassign to capture message. 89 how will be one of ['local', 'remote', 'error']""" 90 pass 77 91 78 92 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ … … 100 114 data = self._SocketRecv(*args, **kw) 101 115 if not data: 102 self.isshutdown = True103 self. log.debug('Connection closed by remote connection')104 self. protocol.OnRecvStreamData(data)116 # The remote side shutdown the connection 117 self.shutdown('remote', False, True) 118 self.OnRecvStreamData(data) 105 119 106 120 def _NeedsWrite(self): … … 110 124 nSent = self._SocketSend(self._sendData, *args, **kw) 111 125 self._sendData = self._sendData[nSent:] 126 if self.isshutdown and not self._sendData: 127 # Ok, we're shutdown, but were still sending data 128 # Now that we are done, shutdown the connection. 129 self.shutdown('local', True, False) 112 130 113 131 def _NeedsError(self): … … 135 153 def _SetSocketError(self, exc): 136 154 self.log.debug(exc, exc_info=1) 137 self.isshutdown = True138 155 del self._sendData 139 156 del exc 157 self.shutdown('error', False, False) 140 158 # We've logged it and taken care of the stream, 141 159 # therefore don't propigate. If you want to do trunk/RBTelepathy/RBTelepathy/Stream/SocketServer.py
r398 r413 24 24 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 25 25 26 import weakref27 26 from RBFoundation import SmartSelect 28 from RBFoundation.AOSubjectObserver.StandardSubjects import Subject29 27 30 28 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ … … 39 37 def __init__(self, socket): 40 38 self.serversocket = socket 41 self.OnNewConnection = Subject() 39 40 def OnNewConnection(self, server, socket, info): 41 """Called when a new socket is received from the server socket. 42 Override or reassign to capture message.""" 43 pass 42 44 43 45 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ … … 52 54 def _ProcessRead(self, *args, **kw): 53 55 socket, info = self.serversocket.accept() 54 self.OnNewConnection .Update(self, socket, info)56 self.OnNewConnection(self, socket, info) 55 57 56 58 def _NeedsWrite(self): … … 81 83 socket.shutdown(0) 82 84 83 server.OnNewConnection .Add(OnNewConnection)85 server.OnNewConnection = OnNewConnection 84 86 85 87 try:
