Changeset 408

Show
Ignore:
Timestamp:
01/20/03 20:04:19 (6 years ago)
Author:
sholloway
Message:

*** empty log message ***

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/RBTelepathy/RBTelepathy/Packet/Elements.py

    r406 r408  
    5959        self.OnUpdateContent() 
    6060        return BaseObjectifiedXML._toXML(self, *args, **kw) 
    61  
    62 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    63  
    64 class Error(PacketElementBase): 
    65     pass 
    6661 
    6762#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
  • trunk/RBTelepathy/RBTelepathy/Packet/StreamElements.py

    r402 r408  
    114114        return self.StreamContent + self.terminator 
    115115 
     116#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     117 
     118class StreamFormatFactory(object): 
     119    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     120    #~ Constants / Variables / Etc.  
     121    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     122 
     123    StreamFormatTable = { 
     124        'terminated': TerminatedStream, 
     125        'terminator': TerminatedStream, 
     126        'term': TerminatedStream, 
     127 
     128        'length': LengthStream, 
     129        'len': LengthStream, 
     130        None: LengthStream, 
     131        } 
     132 
     133    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     134    #~ Public Methods  
     135    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     136 
     137    def __call__(self, owner, parent, node, attributes, namespacemap): 
     138        format = attributes.get('format') 
     139        return self.StreamFormatTable[format] 
     140 
  • trunk/RBTelepathy/RBTelepathy/Packet/__init__.py

    r406 r408  
    3535#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    3636 
     37RBMessagingNamespace = 'http://namespaces.runeblade.com/RBMessaging' 
    3738StandardNamespaceSynonyms = { 
    38     'RBMessaging': 'http://namespaces.runeblade.com/RBMessaging'
    39     None: 'http://namespaces.runeblade.com/RBMessaging'
     39    'RBMessaging': RBMessagingNamespace
     40    None: RBMessagingNamespace
    4041    } 
    4142 
     
    4445#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    4546 
    46 class StreamFormatFactory(object): 
    47     #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    48     #~ Constants / Variables / Etc.  
    49     #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    50  
    51     StreamFormatTable = { 
    52         'length': StreamElements.LengthStream, 
    53         'len': StreamElements.LengthStream, 
    54  
    55         'terminated': StreamElements.TerminatedStream, 
    56         'terminator': StreamElements.TerminatedStream, 
    57         'term': StreamElements.TerminatedStream, 
    58  
    59         None: StreamElements.LengthStream, 
    60         } 
    61  
    62     #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    63     #~ Public Methods  
    64     #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    65  
    66     def __call__(self, owner, parent, node, attributes, namespacemap): 
    67         format = attributes.get('format') 
    68         return self.StreamFormatTable[format] 
    69  
    70 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    71  
    72 class MessageErrorFactory(object): 
     47class ChildELementErrorFactory(object): 
    7348    def __call__(self, owner, parent, node, attributes, namespacemap): 
    7449        raise KeyError, '"%s" is not a valid subchild of "%s"' % (node[1], parent.__node__) 
     
    7651#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    7752 
    78 class StandardMessage(MessageElements.Message, XMLClassBuilder.XMLClassBuilderBaseMixin): 
     53class RootMessageErrorFactory(object): 
     54    def __call__(self, owner, parent, node, attributes, namespacemap): 
     55        raise KeyError, '"%s" is not a valid topl level element' % (node[1], ) 
     56 
     57#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     58 
     59class RootMessage(MessageElements.Message, XMLClassBuilder.XMLClassBuilderBaseMixin): 
     60    NamespaceSynonyms = StandardNamespaceSynonyms 
     61    DefaultNamespace = NamespaceSynonyms[None] 
    7962    ElementFactories = XMLClassBuilder.ElementFactorySet({ 
    80             ('http://namespaces.runeblade.com/RBMessaging', 'stream'): StreamFormatFactory(), 
    81             ('http://namespaces.runeblade.com/RBMessaging', 'to'): EF.Static(MessageElements.URIAddressElement), 
    82             ('http://namespaces.runeblade.com/RBMessaging', 'from'): EF.Static(MessageElements.URIAddressElement), 
    83             ('http://namespaces.runeblade.com/RBMessaging', ): MessageErrorFactory(), 
     63            (RBMessagingNamespace, 'stream'): StreamElements.StreamFormatFactory(), 
     64            (RBMessagingNamespace, 'to'): EF.Static(MessageElements.URIAddressElement), 
     65            (RBMessagingNamespace, 'from'): EF.Static(MessageElements.URIAddressElement), 
     66            (RBMessagingNamespace, ): ChildELementErrorFactory(), 
    8467            None: EF.Static(ObjectifiedXML), 
    8568        }) 
    86  
    87     NamespaceSynonyms = StandardNamespaceSynonyms 
    88     DefaultNamespace = NamespaceSynonyms[None] 
    8969 
    9070    def _xmlChildFactory(self, owner, parent, node, attributes, namespacemap): 
     
    9272 
    9373#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    94 #~ Definitions  
    95 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    9674 
    9775class StandardStreamPacketBuilder(Builder.StreamPacketBuilder): 
     76    NamespaceSynonyms = StandardNamespaceSynonyms 
     77    DefaultNamespace = NamespaceSynonyms[None] 
    9878    ElementFactories = XMLClassBuilder.ElementFactorySet({ 
    99             ('http://namespaces.runeblade.com/RBMessaging', 'message'): EF.Static(StandardMessage), 
    100             ('http://namespaces.runeblade.com/RBMessaging', 'error'): EF.Static(MessageElements.Error), 
     79            (RBMessagingNamespace, 'message'): EF.Static(RootMessage), 
     80            (RBMessagingNamespace, 'error'): EF.Static(RootMessage), 
     81            (RBMessagingNamespace, ): RootMessageErrorFactory(), 
     82            None: EF.Static(RootMessage), 
    10183        }) 
    10284 
    103     NamespaceSynonyms = StandardNamespaceSynonyms 
    104     DefaultNamespace = NamespaceSynonyms[None] 
    105  
  • trunk/RBTelepathy/RBTelepathy/Stream/Protocol.py

    r406 r408  
    2424#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    2525 
     26from RBMessaging import ErrorTypes 
    2627from RBMessaging.Packet import StandardStreamPacketBuilder 
    2728from xml.sax.saxutils import escape as xmlescape 
     
    4041    delimiter = '\x1F' # ASCII unit seperator 
    4142    data = None 
     43    stream = None 
    4244    currentpacket = None 
    4345    packetbuilder = None 
    44     stream = None 
    4546    log = logging.getLogger('.'.join((__name__, 'StreamProtocol'))) 
    4647 
     
    5152    def __init__(self, PacketBuilderClass=StandardStreamPacketBuilder): 
    5253        self.packetbuilder = PacketBuilderClass() 
    53         self.log.debug('Created') 
    5454 
    55     def __del__(self): 
    56         self.log.debug('Destroyed') 
    57  
    58     def SendPacket(self, packet): 
     55    def OnPacket(self, packet): 
     56        """Called from Connection to convert a packet to stream data via this protocol. 
     57        Packet is required to implement GetStreamHeader and GetStreamData for this method to work. 
     58        Optionally, packet can be the raw header string.""" 
    5959        if isinstance(packet, (str, unicode)): 
    6060            streamdata = packet + self.delimiter 
    6161        else: 
    6262            streamdata = ''.join((packet.GetStreamHeader(), self.delimiter, packet.GetStreamData())) 
    63         self.log.debug('Sent %d bytes', len(streamdata)) 
    6463        self.stream.write(streamdata) 
    6564 
    6665    def OnRecvStreamData(self, streamdata): 
    67         self.log.debug('Received %d bytes', len(streamdata)) 
     66        """Called from Stream object to introduce more raw stream bytes into the system""" 
    6867        if self.data:  
    6968            self.data += streamdata 
     
    7170            self.data = streamdata 
    7271 
    73         while self.data: 
     72        while self.ProcessData(): 
     73            # TODO: We need a way to exit out of this loop in case we get jammed with data 
    7474            # While there is still data to be processed 
    75             if self.currentpacket is None: 
    76                 try:  
    77                     # Try to seperate the information packet from the data packet 
    78                     info, self.data = self.data.split(self.delimiter, 1) 
    79                 except ValueError:  
    80                     # We don't have a complete info packet yet.  
    81                     # Break out of the while self.data loop 
    82                     break  
    83                 else: 
    84                     # We now have our information packet in string form 
    85                     try: 
    86                         # Build the packet from the string 
    87                         self.currentpacket = self.packetbuilder.BuildPacket(info) 
    88                         if self.currentpacket is None: 
    89                             # We expected a packet, so not getting one is an error 
    90                             self.StreamError('Incomplete XML header') 
    91                         else: 
    92                             # Good... send the rest of the data as potential stream data 
    93                             self.data = self.currentpacket.OnStreamData(self.data, self._OnPacketComplete) 
    94                     except Exception, e: 
    95                         # Something happened, report to the client 
    96                         self.StreamError('%s: %s' % (e.__class__.__name__, str(e))) 
    97                         self.log.error(e) 
    98                         #raise 
     75            pass 
     76 
     77    def ProcessData(self): 
     78        """Returns True if the loop needs to process more data, False otherwise.""" 
     79 
     80        if self.currentpacket is None: 
     81            try:  
     82                # Try to seperate the information packet from the data packet 
     83                info, self.data = self.data.split(self.delimiter, 1) 
     84            except ValueError:  
     85                # We don't have a complete info packet yet.  
     86                return False 
    9987            else: 
    100                 self.data = self.currentpacket.OnStreamData(self.data, self._OnPacketComplete) 
     88                # We now have our information packet in string form 
     89                self._BuildHeader(info) 
     90                # Good... send the rest of the data as potential stream data 
     91                self._BuildData() 
     92        else: 
     93            # We're partial through the existing data 
     94            self._BuildData() 
    10195 
    102     def StreamError(self, error='Unknown Error', errortype='stream', shutdown=True, namespace=None): 
    103         if namespace is None: 
    104             try: 
     96        return self.data and True or False 
     97 
     98    def StreamError(self, error, forceshutdown=False): 
     99        if isinstance(error, ErrorTypes.RBMessagingError): 
     100            self.log.log(error.logtype, error.message, exc_info=error.logtraceback) 
     101 
     102            if error.report: 
    105103                namespace = self.packetbuilder.DefaultNamespace 
    106             except AttributeError: 
    107                 namespace = StandardStreamPacketBuilder.DefaultNamespace 
    108         namespace = 'xmlns=%s' % (xmlquoteattr(namespace),
     104                errortext = xmlescape(error.message) 
     105                errortype = 'type=%s' % (xmlquoteattr(error.errortype),) 
     106                self.stream.write('''<error xmlns='%s' %s>%s</error>%s''' % (namespace, errortype, errortext, self.delimiter)
    109107 
    110         if errortype is not None: 
    111             errortype = 'type=%s' % (xmlquoteattr(errortype),) 
    112         else: errortype = '' 
     108            if error.shutdown or forceshutdown: 
     109                self.stream.shutdown() 
     110        else: 
     111            self.log.critical(str(error), exc_info=1) 
     112            namespace = self.packetbuilder.DefaultNamespace 
     113            errortext = xmlescape(str(error)) 
     114            errortype = 'type="StreamProtocolError"' 
     115            self.stream.write('''<error xmlns='%s' %s>%s</error>%s''' % (namespace, errortype, errortext, self.delimiter)) 
     116            if forceshutdown: 
     117                self.stream.shutdown() 
    113118 
    114         error = xmlescape(error) 
    115  
    116         self.log.info('Stream error from client connection.  Shutting down.') 
    117         self.stream.write('''<error %s %s>%s</error>%s''' % (namespace, errortype, error, self.delimiter)) 
    118  
    119         if shutdown: 
    120             self.stream.shutdown() 
    121  
    122     def OnPacketComplete(self, packet): 
     119    def OnStreamPacket(self, packet): 
     120        """Called when a packet has completed building. 
     121        Override or reassign to capture message.""" 
    123122        pass 
    124123 
     
    127126    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    128127 
    129     def _OnPacketComplete(self, packet): 
     128    def _BuildHeader(self, info): 
     129        try: 
     130            # Build the packet from the string 
     131            self.currentpacket = self.packetbuilder.BuildPacket(info) 
     132            if self.currentpacket is None: 
     133                # We expected a packet, so not getting one is an error 
     134                raise StreamProtocolError, 'Incomplete XML header' 
     135        except ErrorTypes.RBMessagingError, rbError: 
     136            if self.StreamError(rbError) is False: 
     137                raise 
     138        except Exception, error: 
     139            if self.StreamError(error, True) is False: 
     140                raise 
     141 
     142    def _BuildData(self): 
     143        try: 
     144            self.data = self.currentpacket.OnStreamData(self.data, self._OnStreamPacket) 
     145        except ErrorTypes.RBMessagingError, rbError: 
     146            if self.StreamError(rbError) is False: 
     147                raise 
     148        except Exception, error: 
     149            if self.StreamError(error, True) is False: 
     150                raise 
     151 
     152    def _OnStreamPacket(self, packet): 
    130153        del self.currentpacket 
    131154        try: 
    132             self.OnPacketComplete(packet) 
    133         except Exception, e: 
    134             # Something happened, report to the client 
    135             self.StreamError('%s: %s' % (e.__class__.__name__, str(e)), 'warning', shutdown=False) 
    136             self.log.error(e) 
    137             #raise 
     155            self.OnStreamPacket(packet) 
     156        except ErrorTypes.RBMessagingError, rbError: 
     157            if self.StreamError(rbError) is False: 
     158                raise 
     159        except Exception, error: 
     160            if self.StreamError(error, True) is False: 
     161                raise 
    138162 
  • trunk/RBTelepathy/RBTelepathy/Stream/SocketAdaptor.py

    r407 r408  
    101101        if not data: 
    102102            self.isshutdown = True 
    103             self.log.info('Connection closed by remote connection') 
     103            self.log.debug('Connection closed by remote connection') 
    104104        self.protocol.OnRecvStreamData(data) 
    105105 
     
    134134 
    135135    def _SetSocketError(self, exc): 
    136         self.log.error(exc
     136        self.log.debug(exc, exc_info=1
    137137        self.isshutdown = True 
    138138        del self._sendData 
  • trunk/RBTelepathy/test/test_streamProtocol.py

    r402 r408  
    4343        self.protocol = Protocol.StreamProtocol() 
    4444        self.protocol.stream = self 
    45         self.protocol.OnPacketComplete = self.OnPacketComplete 
     45        self.protocol.OnStreamPacket = self.OnStreamPacket 
    4646 
    4747    def tearDown(self): 
    4848        del self.protocol.stream  
    49         del self.protocol.OnPacketComplete  
     49        del self.protocol.OnStreamPacket  
    5050        del self.protocol 
    5151        try: del self.isshutdown 
     
    7171 
    7272    packet = None 
    73     def OnPacketComplete(self, packet): 
     73    def OnStreamPacket(self, packet): 
    7474        self.packet = packet 
    7575