Changeset 413

Show
Ignore:
Timestamp:
01/22/03 19:32:13 (6 years ago)
Author:
sholloway
Message:

*** empty log message ***

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/RBMessaging/RBMessaging/Model.py

    r412 r413  
    2424#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    2525 
     26import Connection 
     27from RBFoundation import SmartSelect 
     28 
    2629#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    2730#~ Definitions  
    2831#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    2932 
     33class MessagingModel(object): 
     34    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     35    #~ Public Methods  
     36    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    3037 
     38    def __init__(self): 
     39        self.Connections = {} 
     40        self.Routers = {} 
     41 
     42    def Process(self, timeout): 
     43        return self.SmartSelectCollection.Process(timeout) 
     44 
     45    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     46    #~ Properties 
     47    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     48 
     49    _SmartSelectCollection = None 
     50    def _GetSmartSelectCollection(self): 
     51        if self._SmartSelectCollection is None: 
     52            self._SmartSelectCollection = SmartSelect.SmartSelectList() 
     53        return self._SmartSelectCollection 
     54    def _SetSmartSelectCollection(self, value): 
     55        self._SmartSelectCollection = value 
     56    def _DelSmartSelectCollection(self): 
     57        del self._SmartSelectCollection 
     58    SmartSelectCollection = property(_GetSmartSelectCollection, None, _DelSmartSelectCollection) 
     59 
  • trunk/RBTelepathy/RBTelepathy/Connection.py

    r412 r413  
    4949 
    5050    def SendPacket(self, *args, **kw): 
    51         self.protocol.SendPacket(*args, **kw) 
     51        raise NotImplementedError 
    5252 
    5353    def OnRoutedPacket(self, packet, *args, **kw): 
     
    7979        return packethandler.OnStreamPacket(self, packet, *args, **kw) 
    8080 
    81 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     81    def OnStreamShutdown(self, how): 
     82        self.log.debug('Stream shutdown by "%s"', how) 
    8283 
    83 class ServerConnection(Connection): 
    84     # TODO: 
    85     #     Load Packet.ErrorHandler 
    86     #     Load Packet.AuthenticationHandler for Server Connection 
    87     #  
    88     # TODO On Authenticated: 
    89     #     Load Packet.ConfigureHandler for Server Connection 
    90     #     Load Packet.MessageHandler for Server Connection 
    91     #  
    92     # This process should be controllable by an xml loader 
    93  
    94     def OnAuthenticated(self, succeeded): 
    95         if succeeded: 
    96             self.log.info('Successful authentication from client') 
    97         else: 
    98             self.log.warn('Failed authentication from client') 
    99  
    100 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    101  
    102 class ClientConnection(Connection): 
    103     # TODO: 
    104     #     Load Packet.ErrorHandler 
    105     #     Load Packet.AuthenticationHandler for Client Connection 
    106     #  
    107     # This process should be controllable by an xml loader 
    108  
    109     def OnAuthenticated(self, succeeded): 
    110         if succeeded: 
    111             self.log.info('Successfully authenticated to server') 
    112         else: 
    113             self.log.warn('Failed authentication to server') 
    114  
  • trunk/RBTelepathy/RBTelepathy/Packet/Builder.py

    r412 r413  
    2727from RBFoundation.XMLClassBuilder import XMLClassBuilder  
    2828from RBFoundation.XMLClassBuilder import ElementFactory as EF 
     29from RBMessaging import ErrorTypes 
    2930 
    3031#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     
    5758 
    5859class StreamPacketBuilder(XMLClassBuilder): 
     60    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     61    #~ Constants / Variables / Etc.  
     62    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     63 
    5964    NamespaceSynonyms = RBNamespaceSynonyms 
    6065    DefaultNamespace = RBNamespaceSynonyms[None] 
    61     BuildPacket = XMLClassBuilder.Parse 
     66 
     67    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     68    #~ Public Methods  
     69    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     70 
     71    def BuildPacket(self, *args, **kw): 
     72        try: 
     73            return self.Parse(*args, **kw) 
     74        except ErrorTypes.RBMessagingError, e: 
     75             # Pass these along... 
     76            raise 
     77        except StandardError, e: 
     78            # Change these to PacketErrors 
     79            raise ErrorTypes.PacketError(str(e)) 
     80 
     81    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     82    #~ Protected Methods  
     83    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     84 
     85    def _xmlChildFactory(self, owner, parent, node, attributes, namespacemap): 
     86        try: 
     87            XMLClassBuilder._xmlChildFactory(self, owner, parent, node, attributes, namespacemap) 
     88        except KeyError, e: 
     89            # Well, we don't seem to have a packet for that... Error!  =) 
     90            raise ErrorTypes.PacketHandlerError(str(e)) 
    6291 
    6392    def _GetOwner(self): 
  • trunk/RBTelepathy/RBTelepathy/Stream/Protocol.py

    r412 r413  
    109109 
    110110            if error.shutdown or forceshutdown: 
    111                 self.stream.shutdown(
     111                self.stream.shutdown('local'
    112112        else: 
    113113            self.log.critical(str(error), exc_info=1) 
     
    117117            self.stream.write('''<error xmlns='%s' %s>%s</error>%s''' % (namespace, errortype, errortext, self.delimiter)) 
    118118            if forceshutdown: 
    119                 self.stream.shutdown(
     119                self.stream.shutdown('local'
    120120 
    121121    def OnStreamPacket(self, packet): 
    122122        """Called when a packet has completed building. 
    123123        Override or reassign to capture message.""" 
     124        pass 
     125 
     126    def OnStreamShutdown(self, how): 
    124127        pass 
    125128 
     
    134137            if self.currentpacket is None: 
    135138                # We expected a packet, so not getting one is an error 
    136                 raise StreamProtocolError, 'Incomplete XML header' 
     139                raise ErrorTypes.StreamProtocolError, 'Incomplete XML header' 
    137140        except ErrorTypes.RBMessagingError, rbError: 
    138141            if self.StreamError(rbError) is False: 
     
    143146 
    144147    def _BuildData(self): 
    145         try: 
    146             self.data = self.currentpacket.OnStreamData(self.data, self._OnStreamPacket) 
    147         except ErrorTypes.RBMessagingError, rbError: 
    148             if self.StreamError(rbError) is False: 
    149                 raise 
    150         except Exception, error: 
    151             if self.StreamError(error, True) is False: 
    152                 raise 
     148        if self.currentpacket is not None:  
     149            try: 
     150                self.data = self.currentpacket.OnStreamData(self.data, self._OnStreamPacket) 
     151            except ErrorTypes.RBMessagingError, rbError: 
     152                if self.StreamError(rbError) is False: 
     153                    raise 
     154            except Exception, error: 
     155                if self.StreamError(error, True) is False: 
     156                    raise 
    153157 
    154158    def _OnStreamPacket(self, packet): 
     
    163167                raise 
    164168 
     169    def _OnStreamShutdown(self, how): 
     170        return self.OnStreamShutdown(how) 
     171 
  • trunk/RBTelepathy/RBTelepathy/Stream/SocketAdaptor.py

    r412 r413  
    6868    write = send 
    6969 
    70     def shutdown(self, how=0): 
     70    def shutdown(self, how='local', allowread=False, allowwrite=True): 
    7171        try: 
    72             self._socket.shutdown(how) 
    73             self.isshutdown = True 
     72            if not self.isshutdown: 
     73                self.OnShutdown(how) 
     74                self.log.info('Connection closed by "%s" on %s', how, self._socket.getpeername()) 
     75            self.isshutdown |= (allowread << 1) | (allowwrite << 0) 
     76            if self.isshutdown > 0: 
     77                self._socket.shutdown(self.isshutdown-1) 
    7478        except socket.error, exc: 
    75             if not self._SetSocketError(exc): 
    76                 raise 
     79            self.log.exception("Socket error on shutdown") 
     80 
     81    def OnRecvStreamData(self, data): 
     82        """Called when data is received from the socket. 
     83        Override or reassign to capture message.""" 
     84        pass 
     85 
     86    def OnShutdown(self, how): 
     87        """Called when shutdown of the socket has occured. 
     88        Override or reassign to capture message. 
     89        how will be one of ['local', 'remote', 'error']""" 
     90        pass 
    7791 
    7892    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     
    100114        data = self._SocketRecv(*args, **kw) 
    101115        if not data: 
    102             self.isshutdown = True 
    103             self.log.debug('Connection closed by remote connection'
    104         self.protocol.OnRecvStreamData(data) 
     116            # The remote side shutdown the connection 
     117            self.shutdown('remote', False, True
     118        self.OnRecvStreamData(data) 
    105119 
    106120    def _NeedsWrite(self):  
     
    110124        nSent = self._SocketSend(self._sendData, *args, **kw) 
    111125        self._sendData = self._sendData[nSent:] 
     126        if self.isshutdown and not self._sendData: 
     127            # Ok, we're shutdown, but were still sending data 
     128            # Now that we are done, shutdown the connection. 
     129            self.shutdown('local', True, False) 
    112130 
    113131    def _NeedsError(self):  
     
    135153    def _SetSocketError(self, exc): 
    136154        self.log.debug(exc, exc_info=1) 
    137         self.isshutdown = True 
    138155        del self._sendData 
    139156        del exc 
     157        self.shutdown('error', False, False) 
    140158        # We've logged it and taken care of the stream,  
    141159        # therefore don't propigate.  If you want to do  
  • trunk/RBTelepathy/RBTelepathy/Stream/SocketServer.py

    r398 r413  
    2424#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
    2525 
    26 import weakref 
    2726from RBFoundation import SmartSelect 
    28 from RBFoundation.AOSubjectObserver.StandardSubjects import Subject 
    2927 
    3028#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     
    3937    def __init__(self, socket): 
    4038        self.serversocket = socket 
    41         self.OnNewConnection = Subject() 
     39 
     40    def OnNewConnection(self, server, socket, info): 
     41        """Called when a new socket is received from the server socket. 
     42        Override or reassign to capture message.""" 
     43        pass 
    4244 
    4345    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
     
    5254    def _ProcessRead(self, *args, **kw):  
    5355        socket, info = self.serversocket.accept() 
    54         self.OnNewConnection.Update(self, socket, info) 
     56        self.OnNewConnection(self, socket, info) 
    5557 
    5658    def _NeedsWrite(self):  
     
    8183        socket.shutdown(0) 
    8284 
    83     server.OnNewConnection.Add(OnNewConnection) 
     85    server.OnNewConnection = OnNewConnection 
    8486 
    8587    try: