Changeset 408
- Timestamp:
- 01/20/03 20:04:19 (6 years ago)
- Files:
-
- trunk/RBTelepathy/RBTelepathy/Connection.py (added)
- trunk/RBTelepathy/RBTelepathy/ErrorTypes.py (added)
- trunk/RBTelepathy/RBTelepathy/Packet/Elements.py (modified) (1 diff)
- trunk/RBTelepathy/RBTelepathy/Packet/StreamElements.py (modified) (1 diff)
- trunk/RBTelepathy/RBTelepathy/Packet/__init__.py (modified) (4 diffs)
- trunk/RBTelepathy/RBTelepathy/Stream/Protocol.py (modified) (5 diffs)
- trunk/RBTelepathy/RBTelepathy/Stream/SocketAdaptor.py (modified) (2 diffs)
- trunk/RBTelepathy/test/all.py (added)
- trunk/RBTelepathy/test/test_streamProtocol.py (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/RBTelepathy/RBTelepathy/Packet/Elements.py
r406 r408 59 59 self.OnUpdateContent() 60 60 return BaseObjectifiedXML._toXML(self, *args, **kw) 61 62 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~63 64 class Error(PacketElementBase):65 pass66 61 67 62 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ trunk/RBTelepathy/RBTelepathy/Packet/StreamElements.py
r402 r408 114 114 return self.StreamContent + self.terminator 115 115 116 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 117 118 class StreamFormatFactory(object): 119 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 120 #~ Constants / Variables / Etc. 121 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 122 123 StreamFormatTable = { 124 'terminated': TerminatedStream, 125 'terminator': TerminatedStream, 126 'term': TerminatedStream, 127 128 'length': LengthStream, 129 'len': LengthStream, 130 None: LengthStream, 131 } 132 133 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 134 #~ Public Methods 135 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 136 137 def __call__(self, owner, parent, node, attributes, namespacemap): 138 format = attributes.get('format') 139 return self.StreamFormatTable[format] 140 trunk/RBTelepathy/RBTelepathy/Packet/__init__.py
r406 r408 35 35 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 36 36 37 RBMessagingNamespace = 'http://namespaces.runeblade.com/RBMessaging' 37 38 StandardNamespaceSynonyms = { 38 'RBMessaging': 'http://namespaces.runeblade.com/RBMessaging',39 None: 'http://namespaces.runeblade.com/RBMessaging',39 'RBMessaging': RBMessagingNamespace, 40 None: RBMessagingNamespace, 40 41 } 41 42 … … 44 45 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 45 46 46 class StreamFormatFactory(object): 47 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 48 #~ Constants / Variables / Etc. 49 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 50 51 StreamFormatTable = { 52 'length': StreamElements.LengthStream, 53 'len': StreamElements.LengthStream, 54 55 'terminated': StreamElements.TerminatedStream, 56 'terminator': StreamElements.TerminatedStream, 57 'term': StreamElements.TerminatedStream, 58 59 None: StreamElements.LengthStream, 60 } 61 62 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 63 #~ Public Methods 64 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 65 66 def __call__(self, owner, parent, node, attributes, namespacemap): 67 format = attributes.get('format') 68 return self.StreamFormatTable[format] 69 70 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 71 72 class MessageErrorFactory(object): 47 class ChildELementErrorFactory(object): 73 48 def __call__(self, owner, parent, node, attributes, namespacemap): 74 49 raise KeyError, '"%s" is not a valid subchild of "%s"' % (node[1], parent.__node__) … … 76 51 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 77 52 78 class StandardMessage(MessageElements.Message, XMLClassBuilder.XMLClassBuilderBaseMixin): 53 class RootMessageErrorFactory(object): 54 def __call__(self, owner, parent, node, attributes, namespacemap): 55 raise KeyError, '"%s" is not a valid topl level element' % (node[1], ) 56 57 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 58 59 class RootMessage(MessageElements.Message, XMLClassBuilder.XMLClassBuilderBaseMixin): 60 NamespaceSynonyms = StandardNamespaceSynonyms 61 DefaultNamespace = NamespaceSynonyms[None] 79 62 ElementFactories = XMLClassBuilder.ElementFactorySet({ 80 ( 'http://namespaces.runeblade.com/RBMessaging', 'stream'):StreamFormatFactory(),81 ( 'http://namespaces.runeblade.com/RBMessaging', 'to'): EF.Static(MessageElements.URIAddressElement),82 ( 'http://namespaces.runeblade.com/RBMessaging', 'from'): EF.Static(MessageElements.URIAddressElement),83 ( 'http://namespaces.runeblade.com/RBMessaging', ): MessageErrorFactory(),63 (RBMessagingNamespace, 'stream'): StreamElements.StreamFormatFactory(), 64 (RBMessagingNamespace, 'to'): EF.Static(MessageElements.URIAddressElement), 65 (RBMessagingNamespace, 'from'): EF.Static(MessageElements.URIAddressElement), 66 (RBMessagingNamespace, ): ChildELementErrorFactory(), 84 67 None: EF.Static(ObjectifiedXML), 85 68 }) 86 87 NamespaceSynonyms = StandardNamespaceSynonyms88 DefaultNamespace = NamespaceSynonyms[None]89 69 90 70 def _xmlChildFactory(self, owner, parent, node, attributes, namespacemap): … … 92 72 93 73 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 94 #~ Definitions95 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~96 74 97 75 class StandardStreamPacketBuilder(Builder.StreamPacketBuilder): 76 NamespaceSynonyms = StandardNamespaceSynonyms 77 DefaultNamespace = NamespaceSynonyms[None] 98 78 ElementFactories = XMLClassBuilder.ElementFactorySet({ 99 ('http://namespaces.runeblade.com/RBMessaging', 'message'): EF.Static(StandardMessage), 100 ('http://namespaces.runeblade.com/RBMessaging', 'error'): EF.Static(MessageElements.Error), 79 (RBMessagingNamespace, 'message'): EF.Static(RootMessage), 80 (RBMessagingNamespace, 'error'): EF.Static(RootMessage), 81 (RBMessagingNamespace, ): RootMessageErrorFactory(), 82 None: EF.Static(RootMessage), 101 83 }) 102 84 103 NamespaceSynonyms = StandardNamespaceSynonyms104 DefaultNamespace = NamespaceSynonyms[None]105 trunk/RBTelepathy/RBTelepathy/Stream/Protocol.py
r406 r408 24 24 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 25 25 26 from RBMessaging import ErrorTypes 26 27 from RBMessaging.Packet import StandardStreamPacketBuilder 27 28 from xml.sax.saxutils import escape as xmlescape … … 40 41 delimiter = '\x1F' # ASCII unit seperator 41 42 data = None 43 stream = None 42 44 currentpacket = None 43 45 packetbuilder = None 44 stream = None45 46 log = logging.getLogger('.'.join((__name__, 'StreamProtocol'))) 46 47 … … 51 52 def __init__(self, PacketBuilderClass=StandardStreamPacketBuilder): 52 53 self.packetbuilder = PacketBuilderClass() 53 self.log.debug('Created')54 54 55 def __del__(self):56 self.log.debug('Destroyed')57 58 def SendPacket(self, packet):55 def OnPacket(self, packet): 56 """Called from Connection to convert a packet to stream data via this protocol. 57 Packet is required to implement GetStreamHeader and GetStreamData for this method to work. 58 Optionally, packet can be the raw header string.""" 59 59 if isinstance(packet, (str, unicode)): 60 60 streamdata = packet + self.delimiter 61 61 else: 62 62 streamdata = ''.join((packet.GetStreamHeader(), self.delimiter, packet.GetStreamData())) 63 self.log.debug('Sent %d bytes', len(streamdata))64 63 self.stream.write(streamdata) 65 64 66 65 def OnRecvStreamData(self, streamdata): 67 self.log.debug('Received %d bytes', len(streamdata))66 """Called from Stream object to introduce more raw stream bytes into the system""" 68 67 if self.data: 69 68 self.data += streamdata … … 71 70 self.data = streamdata 72 71 73 while self.data: 72 while self.ProcessData(): 73 # TODO: We need a way to exit out of this loop in case we get jammed with data 74 74 # While there is still data to be processed 75 if self.currentpacket is None: 76 try: 77 # Try to seperate the information packet from the data packet 78 info, self.data = self.data.split(self.delimiter, 1) 79 except ValueError: 80 # We don't have a complete info packet yet. 81 # Break out of the while self.data loop 82 break 83 else: 84 # We now have our information packet in string form 85 try: 86 # Build the packet from the string 87 self.currentpacket = self.packetbuilder.BuildPacket(info) 88 if self.currentpacket is None: 89 # We expected a packet, so not getting one is an error 90 self.StreamError('Incomplete XML header') 91 else: 92 # Good... send the rest of the data as potential stream data 93 self.data = self.currentpacket.OnStreamData(self.data, self._OnPacketComplete) 94 except Exception, e: 95 # Something happened, report to the client 96 self.StreamError('%s: %s' % (e.__class__.__name__, str(e))) 97 self.log.error(e) 98 #raise 75 pass 76 77 def ProcessData(self): 78 """Returns True if the loop needs to process more data, False otherwise.""" 79 80 if self.currentpacket is None: 81 try: 82 # Try to seperate the information packet from the data packet 83 info, self.data = self.data.split(self.delimiter, 1) 84 except ValueError: 85 # We don't have a complete info packet yet. 86 return False 99 87 else: 100 self.data = self.currentpacket.OnStreamData(self.data, self._OnPacketComplete) 88 # We now have our information packet in string form 89 self._BuildHeader(info) 90 # Good... send the rest of the data as potential stream data 91 self._BuildData() 92 else: 93 # We're partial through the existing data 94 self._BuildData() 101 95 102 def StreamError(self, error='Unknown Error', errortype='stream', shutdown=True, namespace=None): 103 if namespace is None: 104 try: 96 return self.data and True or False 97 98 def StreamError(self, error, forceshutdown=False): 99 if isinstance(error, ErrorTypes.RBMessagingError): 100 self.log.log(error.logtype, error.message, exc_info=error.logtraceback) 101 102 if error.report: 105 103 namespace = self.packetbuilder.DefaultNamespace 106 except AttributeError:107 namespace = StandardStreamPacketBuilder.DefaultNamespace108 namespace = 'xmlns=%s' % (xmlquoteattr(namespace),)104 errortext = xmlescape(error.message) 105 errortype = 'type=%s' % (xmlquoteattr(error.errortype),) 106 self.stream.write('''<error xmlns='%s' %s>%s</error>%s''' % (namespace, errortype, errortext, self.delimiter)) 109 107 110 if errortype is not None: 111 errortype = 'type=%s' % (xmlquoteattr(errortype),) 112 else: errortype = '' 108 if error.shutdown or forceshutdown: 109 self.stream.shutdown() 110 else: 111 self.log.critical(str(error), exc_info=1) 112 namespace = self.packetbuilder.DefaultNamespace 113 errortext = xmlescape(str(error)) 114 errortype = 'type="StreamProtocolError"' 115 self.stream.write('''<error xmlns='%s' %s>%s</error>%s''' % (namespace, errortype, errortext, self.delimiter)) 116 if forceshutdown: 117 self.stream.shutdown() 113 118 114 error = xmlescape(error) 115 116 self.log.info('Stream error from client connection. Shutting down.') 117 self.stream.write('''<error %s %s>%s</error>%s''' % (namespace, errortype, error, self.delimiter)) 118 119 if shutdown: 120 self.stream.shutdown() 121 122 def OnPacketComplete(self, packet): 119 def OnStreamPacket(self, packet): 120 """Called when a packet has completed building. 121 Override or reassign to capture message.""" 123 122 pass 124 123 … … 127 126 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 128 127 129 def _OnPacketComplete(self, packet): 128 def _BuildHeader(self, info): 129 try: 130 # Build the packet from the string 131 self.currentpacket = self.packetbuilder.BuildPacket(info) 132 if self.currentpacket is None: 133 # We expected a packet, so not getting one is an error 134 raise StreamProtocolError, 'Incomplete XML header' 135 except ErrorTypes.RBMessagingError, rbError: 136 if self.StreamError(rbError) is False: 137 raise 138 except Exception, error: 139 if self.StreamError(error, True) is False: 140 raise 141 142 def _BuildData(self): 143 try: 144 self.data = self.currentpacket.OnStreamData(self.data, self._OnStreamPacket) 145 except ErrorTypes.RBMessagingError, rbError: 146 if self.StreamError(rbError) is False: 147 raise 148 except Exception, error: 149 if self.StreamError(error, True) is False: 150 raise 151 152 def _OnStreamPacket(self, packet): 130 153 del self.currentpacket 131 154 try: 132 self.OnPacketComplete(packet) 133 except Exception, e: 134 # Something happened, report to the client 135 self.StreamError('%s: %s' % (e.__class__.__name__, str(e)), 'warning', shutdown=False) 136 self.log.error(e) 137 #raise 155 self.OnStreamPacket(packet) 156 except ErrorTypes.RBMessagingError, rbError: 157 if self.StreamError(rbError) is False: 158 raise 159 except Exception, error: 160 if self.StreamError(error, True) is False: 161 raise 138 162 trunk/RBTelepathy/RBTelepathy/Stream/SocketAdaptor.py
r407 r408 101 101 if not data: 102 102 self.isshutdown = True 103 self.log. info('Connection closed by remote connection')103 self.log.debug('Connection closed by remote connection') 104 104 self.protocol.OnRecvStreamData(data) 105 105 … … 134 134 135 135 def _SetSocketError(self, exc): 136 self.log. error(exc)136 self.log.debug(exc, exc_info=1) 137 137 self.isshutdown = True 138 138 del self._sendData trunk/RBTelepathy/test/test_streamProtocol.py
r402 r408 43 43 self.protocol = Protocol.StreamProtocol() 44 44 self.protocol.stream = self 45 self.protocol.On PacketComplete = self.OnPacketComplete45 self.protocol.OnStreamPacket = self.OnStreamPacket 46 46 47 47 def tearDown(self): 48 48 del self.protocol.stream 49 del self.protocol.On PacketComplete49 del self.protocol.OnStreamPacket 50 50 del self.protocol 51 51 try: del self.isshutdown … … 71 71 72 72 packet = None 73 def On PacketComplete(self, packet):73 def OnStreamPacket(self, packet): 74 74 self.packet = packet 75 75
