Changeset 442
- Timestamp:
- 02/13/03 13:21:03 (6 years ago)
- Files:
-
- trunk/RBTelepathy/RBTelepathy/Connection.py (modified) (5 diffs)
- trunk/RBTelepathy/RBTelepathy/Packet/AuthenticationHandler.py (modified) (8 diffs)
- trunk/RBTelepathy/RBTelepathy/Packet/Elements.py (modified) (3 diffs)
- trunk/RBTelepathy/RBTelepathy/Packet/ErrorHandler.py (modified) (3 diffs)
- trunk/RBTelepathy/RBTelepathy/Packet/MessageHandler.py (modified) (7 diffs)
- trunk/RBTelepathy/RBTelepathy/Packet/StreamElements.py (modified) (1 diff)
- trunk/RBTelepathy/RBTelepathy/Packet/URIAddress.py (modified) (1 diff)
- trunk/RBTelepathy/RBTelepathy/Routing/KeylistRouterBase.py (added)
- trunk/RBTelepathy/RBTelepathy/Routing/RouterBase.py (modified) (3 diffs)
- trunk/RBTelepathy/RBTelepathy/Routing/SimpleRouter.py (modified) (3 diffs)
- trunk/RBTelepathy/RBTelepathy/SocketConnections.py (modified) (7 diffs)
- trunk/RBTelepathy/RBTelepathy/Stream/Protocol.py (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/RBTelepathy/RBTelepathy/Connection.py
r437 r442 25 25 26 26 import logging 27 import weakref 28 27 29 import ErrorTypes 28 30 … … 32 34 33 35 class Connection(object): 36 """ 37 A Connection is the intermediatary between packet handlers and the 38 protocol. OnStreamPacket distributes inbound packets based on packet type 39 and namespace to the registered packet handler. Packet handlers 40 registrations are managed with the LoadHandler and UnloadHandler methods. 41 In this way, new features and functionality can be added within the 42 framework by simply extending the packet handler table. 43 44 The Connection is also the central repository for sharing state information 45 between packet handlers. So, common information like login id can be found 46 here, or a few steps from here. 47 48 Related Packages: 49 Packet 50 Stream 51 """ 52 34 53 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 35 54 #~ Constants / Variables / Etc. … … 48 67 self.RoutedPacketHandlers = self.RoutedPacketHandlers.copy() 49 68 69 #~ Connection Operations ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 70 50 71 def LoadDefaultHandlers(self): 51 72 pass 73 74 def Shutdown(self, *args, **kw): 75 raise NotImplementedError 52 76 53 77 def SendPacket(self, *args, **kw): 54 78 raise NotImplementedError 55 79 80 #~ Handler Management ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 81 82 def LoadHandler(self, HandlerFactory, *args, **kw): 83 try: 84 connection = weakref.proxy(self) 85 handler = HandlerFactory(connection, *args, **kw) 86 self.protocol.packetbuilder.AddElementFactories(handler.ElementFactories) 87 for key in handler.StreamPacketHandlers: 88 self.StreamPacketHandlers[key] = handler 89 for key in handler.RoutedPacketHandlers: 90 self.RoutedPacketHandlers[key] = handler 91 except ErrorTypes.RBMessagingError, e: 92 raise 93 except: 94 self.log.critical('Unexpected error loading handler %r', HandlerFactory, exc_info=1) 95 raise 96 97 def UnloadHandler(self, handler): 98 try: 99 self.protocol.packetbuilder.RemoveElementFactories(handler.ElementFactories) 100 for key in handler.StreamPacketHandlers: 101 try: del self.StreamPacketHandlers[key] 102 except KeyError: pass 103 for key in handler.RoutedPacketHandlers: 104 try: del self.RoutedPacketHandlers[key] 105 except KeyError: pass 106 except ErrorTypes.RBMessagingError, e: 107 raise 108 except: 109 self.log.critical('Unexpected error unloading handler %r', handler, exc_info=1) 110 raise 111 112 #~ Event Methods ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 113 56 114 def OnRoutedPacket(self, packet, *args, **kw): 57 115 self.log.debug('Received Routed packet (%s, %s)', packet.namespace, packet.node) 58 #print '>>>', packet.toxml()59 116 60 117 # Lookup our packet handler … … 67 124 raise ErrorTypes.RoutedHandlerError("No RoutedPacketHandler for (%s, %s)" % (packet.namespace, packet.node)) 68 125 69 return packethandler.OnRoutedPacket( self,packet, *args, **kw)126 return packethandler.OnRoutedPacket(packet, *args, **kw) 70 127 71 128 def OnStreamPacket(self, packet, *args, **kw): 72 129 self.log.debug('Received Stream packet (%s, %s)', packet.namespace, packet.node) 73 #print '>>>', packet.toxml()74 130 75 131 # Lookup our packet handler … … 82 138 raise ErrorTypes.PacketHandlerError("No StreamPacketHandler for (%s, %s)" % (packet.namespace, packet.node)) 83 139 84 return packethandler.OnStreamPacket( self,packet, *args, **kw)140 return packethandler.OnStreamPacket(packet, *args, **kw) 85 141 86 142 def OnStreamShutdown(self, how): trunk/RBTelepathy/RBTelepathy/Packet/AuthenticationHandler.py
r437 r442 53 53 ElementFactories = {(RBMessagingNamespace, 'authentication'): EF.Static(AuthenticationElement)} 54 54 StreamPacketHandlers = ElementFactories.keys() 55 RoutedPacketHandlers = ()55 RoutedPacketHandlers = {} 56 56 57 57 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ … … 60 60 61 61 def __init__(self, connection): 62 self.connection = connection 62 63 self._ValidTypes = {} 63 64 64 def OnRoutedPacket(self, connection,packet, *args, **kw):65 def OnRoutedPacket(self, packet, *args, **kw): 65 66 raise NotImplementedError, 'Authetication does not accept RoutedPackets yet' 66 67 67 def OnStreamPacket(self, connection,packet, *args, **kw):68 def OnStreamPacket(self, packet, *args, **kw): 68 69 self.log.debug('Received auth type=%r packet', packet.attrs.get('type', )) 69 70 try: … … 74 75 try: 75 76 OnAuthType = self._ValidTypes[AuthType] 76 return OnAuthType(self, connection,packet, *args, **kw)77 return OnAuthType(self, packet, *args, **kw) 77 78 except KeyError: 78 79 raise ErrorTypes.AuthenticationError('Invalid or out of sequence type "%s"' % (AuthType,)) … … 86 87 self._ValidTypes[AuthType] = getattr(klass, '_type_'+AuthType) 87 88 88 def _send_packet(self, connection,AuthType, XMLBody=None):89 def _send_packet(self, AuthType, XMLBody=None): 89 90 xml = "<authentication xmlns='%s' type='%s'" % (RBMessagingNamespace, AuthType) 90 91 if XMLBody: 91 92 xml += ">" + XMLBody + "</authentication>" 92 93 else: xml += "/>" 93 connection.SendPacket(xml)94 self.connection.SendPacket(xml) 94 95 95 96 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ … … 115 116 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 116 117 117 def _type_query(self, connection,packet, *args, **kw):118 self._reply_options( connection,packet, *args, **kw)118 def _type_query(self, packet, *args, **kw): 119 self._reply_options(packet, *args, **kw) 119 120 120 def _reply_options(self, connection,packet, *args, **kw):121 self._send_packet( connection,'options', '<simple/>')121 def _reply_options(self, packet, *args, **kw): 122 self._send_packet('options', '<simple/>') 122 123 123 def _type_select(self, connection,packet, *args, **kw):124 self._reply_challenge( connection,packet, *args, **kw)124 def _type_select(self, packet, *args, **kw): 125 self._reply_challenge(packet, *args, **kw) 125 126 126 def _reply_challenge(self, connection,packet, *args, **kw):127 def _reply_challenge(self, packet, *args, **kw): 127 128 self._EnableType('response') 128 self._send_packet( connection,'challenge', '<simple addr=""/>')129 self._send_packet('challenge', '<simple addr=""/>') 129 130 130 def _type_response(self, connection,packet, *args, **kw):131 def _type_response(self, packet, *args, **kw): 131 132 try: 132 133 simple = packet['simple',][0] … … 135 136 136 137 if simple.addr.authority: 137 connection.OnAuthenticated(True, simple.addr)138 self._reply_success( connection,packet, *args, **kw)138 self.connection.OnAuthenticated(True, simple.addr) 139 self._reply_success(packet, *args, **kw) 139 140 else: 140 connection.OnAuthenticated(False, simple.addr)141 self._reply_failure( connection,packet, *args, **kw)141 self.connection.OnAuthenticated(False, simple.addr) 142 self._reply_failure(packet, *args, **kw) 142 143 143 def _reply_success(self, connection,packet, *args, **kw):144 self._send_packet( connection,'success')144 def _reply_success(self, packet, *args, **kw): 145 self._send_packet('success') 145 146 146 def _reply_failure(self, connection,packet, *args, **kw):147 self._send_packet( connection,'failure')147 def _reply_failure(self, packet, *args, **kw): 148 self._send_packet('failure') 148 149 raise ErrorTypes.AuthenticationError('Invalid response to challenge') 149 150 … … 161 162 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 162 163 163 def AuthorizeAs(self, connection,loginaddr):164 def AuthorizeAs(self, loginaddr): 164 165 self.loginaddr = URIAddress.URIAddress(loginaddr) 165 self._reply_query( connection,None)166 self._reply_query(None) 166 167 167 168 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ … … 169 170 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 170 171 171 def _reply_query(self, connection,packet, *args, **kw):172 def _reply_query(self, packet, *args, **kw): 172 173 self._EnableType('options') 173 self._send_packet( connection,'query')174 self._send_packet('query') 174 175 175 def _type_options(self, connection,packet, *args, **kw):176 self._reply_select( connection,packet, *args, **kw)176 def _type_options(self, packet, *args, **kw): 177 self._reply_select(packet, *args, **kw) 177 178 178 def _reply_select(self, connection,packet, *args, **kw):179 def _reply_select(self, packet, *args, **kw): 179 180 self._EnableType('challenge') 180 self._send_packet( connection,'select', '<simple/>')181 self._send_packet('select', '<simple/>') 181 182 182 def _type_challenge(self, connection,packet, *args, **kw):183 self._reply_response( connection,packet, *args, **kw)183 def _type_challenge(self, packet, *args, **kw): 184 self._reply_response(packet, *args, **kw) 184 185 185 def _reply_response(self, connection,packet, *args, **kw):186 def _reply_response(self, packet, *args, **kw): 186 187 self._EnableType('success') 187 188 self._EnableType('failure') 188 self._send_packet( connection,'response', '<simple addr=%s/>' % (xmlquoteattr(str(self.loginaddr)),))189 self._send_packet('response', '<simple addr=%s/>' % (xmlquoteattr(str(self.loginaddr)),)) 189 190 190 def _type_success(self, connection,packet, *args, **kw):191 connection.OnAuthenticated(True, self.loginaddr)191 def _type_success(self, packet, *args, **kw): 192 self.connection.OnAuthenticated(True, self.loginaddr) 192 193 193 def _type_failure(self, connection,packet, *args, **kw):194 connection.OnAuthenticated(False, self.loginaddr)194 def _type_failure(self, packet, *args, **kw): 195 self.connection.OnAuthenticated(False, self.loginaddr) 195 196 trunk/RBTelepathy/RBTelepathy/Packet/Elements.py
r437 r442 87 87 class StreamRootElement(RootElementBase): 88 88 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 89 #~ Constants / Variables / Etc. 90 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 91 92 __slots__ = ['datastreams', '_OnStreamCurrent'] 93 94 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 89 95 #~ OnStreamData adaptor 90 96 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ … … 119 125 120 126 class RouteableRootElement(RootElementBase): 127 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 128 #~ Constants / Variables / Etc. 129 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 130 131 __slots__ = [] 132 133 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 134 #~ Public Methods 135 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 136 121 137 def GetAddresses(self, nodename='to'): 122 138 existing = self.iternodes(nodename, RBMessagingNamespace) … … 155 171 156 172 class URIAddressElement(RootElementBase): 173 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 174 #~ Constants / Variables / Etc. 175 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 176 177 __slots__ = ['_addr'] 178 179 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 180 #~ Public Methods 181 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 182 157 183 def OnUpdateContent(self): 158 184 RootElementBase.OnUpdateContent(self) 159 self.attrs['addr'] = str(self.addr) 185 try: self.attrs['addr'] = str(self._addr) 186 except AttributeError: pass 160 187 161 188 #~ addr property ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 162 189 163 190 def _get_addr(self): 164 if self._addr is None: 191 try: 192 return self._addr 193 except AttributeError: 165 194 self._addr = URIAddress.URIAddress(self.attrs.get('addr', '')) 166 return self._addr195 return self._addr 167 196 def _set_addr(self, value): self._addr = value 168 197 def _del_addr(self, value): del self._addr 169 198 addr = property(_get_addr, _set_addr, _del_addr) 170 _addr = None171 199 trunk/RBTelepathy/RBTelepathy/Packet/ErrorHandler.py
r433 r442 50 50 ElementFactories = {(RBMessagingNamespace, 'error'): EF.Static(ErrorElement)} 51 51 StreamPacketHandlers = ElementFactories.keys() 52 RoutedPacketHandlers = ()52 RoutedPacketHandlers = {} 53 53 54 54 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ … … 57 57 58 58 def __init__(self, connection): 59 pass59 self.connection = connection 60 60 61 61 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ … … 72 72 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 73 73 74 def OnStreamPacket(self, connection,packet, *args, **kw):74 def OnStreamPacket(self, packet, *args, **kw): 75 75 self.log.warn('%s: %s', packet.attrs.get('type', 'UnknownError'), str(packet)) 76 76 trunk/RBTelepathy/RBTelepathy/Packet/MessageHandler.py
r437 r442 58 58 ElementFactories = {(RBMessagingNamespace, 'message'): EF.Static(MessageElement)} 59 59 StreamPacketHandlers = ElementFactories.keys() 60 RoutedPacketHandlers = ElementFactories.keys()60 RoutedPacketHandlers = {} 61 61 62 62 log = logging.getLogger('HostMessageHandler') … … 67 67 68 68 def __init__(self, connection): 69 self.connection = connection 69 70 self.router = connection.model().GetRouter((RBMessagingNamespace, 'message')) 70 71 if self.router: … … 73 74 self.router.AuthorityRoutes[key] = [connection] 74 75 75 def OnRoutedPacket(self, connection,packet, addresses):76 def OnRoutedPacket(self, packet, addresses): 76 77 # Copy the root level packet, so we can adjust it's addresses 77 78 #packet = copy.copy(packet) … … 79 80 packet.SetAddresses(addresses) 80 81 self.log.debug('Sending routed packet to protocol') 81 connection.SendPacket(packet)82 self.connection.SendPacket(packet) 82 83 83 def OnStreamPacket(self, connection,packet):84 def OnStreamPacket(self, packet): 84 85 # TODO: Flush out 85 86 #fromlist = packet.GetAddresses('from') … … 106 107 ElementFactories = {(RBMessagingNamespace, 'message'): EF.Static(MessageElement)} 107 108 StreamPacketHandlers = ElementFactories.keys() 108 RoutedPacketHandlers = ElementFactories.keys()109 RoutedPacketHandlers = {} 109 110 110 111 log = logging.getLogger('HostMessageHandler') … … 115 116 116 117 def __init__(self, connection): 118 self.connection = connection 117 119 router = connection.model().Routers.get((RBMessagingNamespace, 'message'), None) 118 120 119 def OnRoutedPacket(self, connection,packet, addresses):121 def OnRoutedPacket(self, packet, addresses): 120 122 print 'OnRoutedPacket:' 121 123 print packet._toXML() … … 125 127 print 126 128 127 def OnStreamPacket(self, connection,packet):129 def OnStreamPacket(self, packet): 128 130 print 'OnStreamPacket:' 129 131 print packet._toXML() trunk/RBTelepathy/RBTelepathy/Packet/StreamElements.py
r433 r442 32 32 class StreamBase(PacketElementBase): 33 33 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 34 #~ Constants / Variables / Etc.35 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~36 37 StreamContent = ''38 39 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~40 34 #~ Methods 41 35 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 42 36 43 37 def _xmlInitStarted(self): 38 self.StreamContent = '' 44 39 self._owner.datastreams.append(self) 45 40 trunk/RBTelepathy/RBTelepathy/Packet/URIAddress.py
r417 r442 31 31 32 32 class URIAddress(URIAuthorityDefault): 33 """URIAddress provides methods for manipulating URIs. 34 35 Can handle proto:account@network/path?query#fragment 36 In addition to the functionality present in URIAuthroityDefault, 37 URIAddress splits out two more properties: account and network. 38 Also important to know is that if the following is encountered:: 39 40 proto:testing/path?query#fragment 41 42 "testing" is the *account* name, and the network is assumed to be the 43 same as the current connection. To correct this, use a fully qualified 44 name:: 45 46 proto:account@testing/path?query#fragment 47 48 The reason for this is so I can simply use an account name when I know I'm 49 on the same network. (Or should be. ;) 50 """ 51 33 52 def _getAuthority(self): 34 53 if self.network is not None: trunk/RBTelepathy/RBTelepathy/Routing/RouterBase.py
r437 r442 31 31 32 32 class RouterBase(object): 33 def RoutePacket(self, packet, addresslist): 34 """Template method that governs generalized packet routing. 35 Handles multiple addresses to multiple destinations with the least number of commands. 36 Returns a list of callable objects, that represent the commands to complete the routing action.""" 33 """ 34 Routers are responsible for directing traffic between packet handlers, and 35 thereby, between Connections. There are many ways to route these packets, 36 and possibly just as many different interfaces needed to do so. This 37 interface takes a fairly simple and extensible approach to allowing for 38 these different implementations, while still providing some structure. 39 """ 37 40 38 destinations = {} 39 # Find routing destinations for all addresses 40 for address in addresslist: 41 # Record common destinations 42 destinationroutes = self.FindRouteDestinations(address) or () 43 for dst in destinationroutes: 44 destinations.setdefault(dst, []).append(address) 45 if not destinationroutes: 46 dst = self.DefaultDestination(address) 47 destinations.setdefault(dst, []).append(address) 41 def RoutePacket(self, *args, **kw): 42 """Synonym for OnRoutedPacket""" 43 return self.OnRoutedPacket(*args, **kw) 48 44 49 # Now send the packet to the destination, including all the addresses 50 commandresults = [self.CommandRouteToDestination(dst, packet, dstaddresslist) 51 for dst, dstaddresslist in destinations.iteritems() if dst] 52 return commandresults 45 def OnRoutedPacket(self, packet, *args, **kw): 46 """Entry point for generalized packet routing. 53 47 54 # Make OnRoutedPacket equivlant to RoutePacket so we can chain routers 55 OnRoutedPacket = RoutePacket 56 # Alternative, if other code is needed 57 ##def OnRoutedPacket(self, packet, addresslist): 58 ## self.RoutePacket(packet, addresslist) 59 60 def DefaultDestination(self, address): 61 """Returns the default destination for an otherwise unrouteable address. 62 Can return None, raise an error, or return a valid destination.""" 63 raise ErrorTypes.RoutingError("No route found for '%s'" % (address,)) 64 65 def FindRouteDestinations(self, address): 66 """Finds destination route(s) for address. 67 If multiple are routes are provided, the packet will be sent to all returned.""" 68 return [] 69 70 def CommandRouteToDestination(self, destination, packet, addresslist): 71 """Returns a callable object that carries out the routing of packet to destination with the addresslist.""" 72 if destination is None: 73 return ErrorRoutingCommand(destination, packet, addresslist) 74 else: 75 return DefaultRoutingCommand(destination, packet, addresslist) 48 Returns a list of callable objects, that represent the commands to 49 complete the routing action. This allows more control to the caller 50 and more informative exceptions to be raised.""" 51 raise NotImplementedError 76 52 77 53 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ … … 79 55 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 80 56 81 class RoutingException(KeyError):82 pass83 84 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~85 86 57 class RoutingCommandBase(object): 87 def __init__(self, destination, packet, addresslist):58 def __init__(self, destination, packet, keylist): 88 59 pass 89 60 90 61 def __call__(self, *args, **kw): 91 """S eeCommit()"""62 """Synonym for Commit()""" 92 63 return self.Commit(*args, **kw) 93 64 94 65 def Errors(self): 95 66 """Simple method to query for errors""" 96 return []67 return () 97 68 98 69 def Commit(self): … … 101 72 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 102 73 103 class ErrorRoutingCommand(RoutingCommandBase, RoutingException):104 def __init__(self, destination, packet, addresslist):105 self. addresslist = addresslist74 class ErrorRoutingCommand(RoutingCommandBase, ErrorTypes.RoutingError): 75 def __init__(self, destination, packet, keylist): 76 self.keylist = keylist 106 77 107 78 def Errors(self): 108 return ['No route found for "%s"' % (each,) for each in self. addresslist]79 return ['No route found for "%s"' % (each,) for each in self.keylist] 109 80 110 81 def Commit(self): 111 raise self, "No routes found for %s" % (self.addresslist,)82 raise self, 'No routes found for %s' % (self.keylist,) 112 83 113 84 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 114 85 115 86 class DefaultRoutingCommand(RoutingCommandBase): 116 def __init__(self, destination, packet, addresslist):87 def __init__(self, destination, packet, keylist=()): 117 88 self.destination = destination 118 89 self.packet = packet 119 self. addresslist = addresslist90 self.keylist = keylist 120 91 121 92 def Commit(self): 122 self.destination.OnRoutedPacket(self.packet, self.addresslist)93 return self.destination.OnRoutedPacket(self.packet, self.keylist) 123 94 trunk/RBTelepathy/RBTelepathy/Routing/SimpleRouter.py
r437 r442 24 24 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 25 25 26 from RouterBase import RouterBase26 from KeylistRouterBase import KeylistRouterBase 27 27 from RBMessaging import ErrorTypes 28 28 … … 31 31 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 32 32 33 class AuthorityDictRouter(RouterBase): 33 class AuthorityDictRouter(KeylistRouterBase): 34 """WARNING: In an extremely simple prototype stage!""" 35 34 36 def __init__(self): 35 37 self.AuthorityRoutes = {} … … 39 41 If multiple are routes are provided, the packet will be sent to all returned.""" 40 42 result = self.AuthorityRoutes.get(address.authority) 41 return result 43 if isinstance(result, (list, tuple, dict)): 44 return result 45 else: return (result,) 42 46 trunk/RBTelepathy/RBTelepathy/SocketConnections.py
r419 r442 52 52 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 53 53 54 class Protocol BaseConnection(Connection.Connection):54 class ProtocolConnection(Connection.Connection): 55 55 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 56 56 #~ Constants / Variables / Etc. … … 89 89 return self 90 90 BuildFromSocket = classmethod(BuildFromSocket) 91 92 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~93 #~ Public Methods94 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~95 96 def LoadHandler(self, HandlerFactory, *args, **kw):97 try:98 handler = HandlerFactory(self, *args, **kw)99 self.protocol.packetbuilder.AddElementFactories(handler.ElementFactories)100 for key in handler.StreamPacketHandlers:101 self.StreamPacketHandlers[key] = handler102 for key in handler.RoutedPacketHandlers:103 self.RoutedPacketHandlers[key] = handler104 except ErrorTypes.RBMessagingError, e:105 raise106 except:107 self.log.critical('Unexpected error loading handler %s', HandlerFactory, exc_info=1)108 raise109 110 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~111 112 class ClientConnection(ProtocolBaseConnection):113 """TODO:114 Load Packet.ErrorHandler115 Load Packet.AuthenticationHandler for Client Connection116 117 This process should be controllable by an xml loader118 """119 120 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~121 #~ Constants / Variables / Etc.122 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~123 124 log = logging.getLogger('ClientConnection')125 126 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~127 #~ Public Class Methods128 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~129 91 130 92 def BuildFromInfo(klass, model, hostname=DefaultHost, port=DefaultPort): … … 134 96 return klass.BuildFromSocket(model, clientsocket) 135 97 BuildFromInfo = classmethod(BuildFromInfo) 136 98 137 99 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 138 100 #~ Public Methods … … 142 104 self.protocol.SendPacket(*args, **kw) 143 105 106 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 107 108 class ClientConnection(ProtocolConnection): 109 """Created at the request of an in-process client, client connections represent outbound requests to external services. 110 111 These External services include other routers.""" 112 113 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 114 #~ Constants / Variables / Etc. 115 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 116 117 log = logging.getLogger('ClientConnection') 118 119 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 120 #~ Public Methods 121 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 122 144 123 def LoadDefaultHandlers(self): 145 Protocol BaseConnection.LoadDefaultHandlers(self)124 ProtocolConnection.LoadDefaultHandlers(self) 146 125 self.protocol.packetbuilder = StandardStreamPacketBuilder() 147 126 self.LoadHandler(ErrorHandler.LogErrorHandler) … … 160 139 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 161 140 162 class HostConnection(ProtocolBaseConnection): 163 """TODO: 164 Load Packet.ErrorHandler 165 Load Packet.AuthenticationHandler for Host Connection 166 167 TODO On Authenticated: 168 Load Packet.ConfigureHandler for Host Connection 169 Load Packet.MessageHandler for Host Connection 170 171 This process should be controllable by an xml loader 172 """ 141 class HostConnection(ProtocolConnection): 142 """Created in response to inbound requests, host connections handle inbound traffic *from* a client connection.""" 173 143 174 144 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ … … 186 156 187 157 def LoadDefaultHandlers(self): 188 Protocol BaseConnection.LoadDefaultHandlers(self)158 ProtocolConnection.LoadDefaultHandlers(self) 189 159 self.protocol.packetbuilder = StandardStreamPacketBuilder() 190 160 self.LoadHandler(ErrorHandler.LogErrorHandler) … … 204 174 205 175 class ServerConnection(Connection.Connection): 176 """Creates HostConnection setups from incomming requests of the server socket.""" 177 206 178 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 207 179 #~ Constants / Variables / Etc. trunk/RBTelepathy/RBTelepathy/Stream/Protocol.py
r437 r442 98 98 return self.data and True or False 99 99 100 def Shutdown(self): 101 self.log.info('Shutting down stream protocol') 102 self.stream.shutdown('local') 103 100 104 def StreamError(self, error, forceshutdown=False): 101 105 if isinstance(error, ErrorTypes.RBMessagingError): … … 110 114 111 115 if error.shutdown or forceshutdown: 112 self. stream.shutdown('local')116 self.Shutdown() 113 117 del self.data 114 118 else: … … 119 123 self.stream.write('''<error xmlns='%s' %s>%s</error>%s''' % (namespace, errortype, errortext, self.delimiter)) 120 124 if forceshutdown: 121 self. stream.shutdown('local')125 self.Shutdown() 122 126 del self.data 123 127
