root/trunk/RBTelepathy/test/test_streamProtocol.py

Revision 455, 8.4 kB (checked in by sholloway, 6 years ago)

*** empty log message ***

Line 
1 #!/usr/bin/env python
2 ##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
3 ##~ License
4 ##~
5 ##- The RuneBlade Foundation library is intended to ease some
6 ##- aspects of writing intricate Jabber, XML, and User Interface (wxPython, etc.)
7 ##- applications, while providing the flexibility to modularly change the
8 ##- architecture. Enjoy.
9 ##~
10 ##~ Copyright (C) 2002  TechGame Networks, LLC.
11 ##~
12 ##~ This library is free software; you can redistribute it and/or
13 ##~ modify it under the terms of the BSD style License as found in the
14 ##~ LICENSE file included with this distribution.
15 ##~
16 ##~ TechGame Networks, LLC can be reached at:
17 ##~ 3578 E. Hartsel Drive #211
18 ##~ Colorado Springs, Colorado, USA, 80920
19 ##~
20 ##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
21
22 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
23 #~ Imports
24 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
25
26 # Path adjustmet so the test can run out of the box
27 import sys; sys.path.append('..')
28 from test import test_support
29 import unittest
30
31 from RBTelepathy.Stream import Protocol
32
33 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
34 #~ Definitions
35 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
36
37 class BaseTestCase(unittest.TestCase):
38     def setUp(self):
39         self.failIf(self.isshutdown is True)
40         self.failUnless(self.streamdata is None)
41         self.failUnless(self.packet is None)
42
43         self.protocol = Protocol.StreamProtocol()
44         self.protocol.stream = self
45         self.protocol.OnStreamPacket = self.OnStreamPacket
46
47     def tearDown(self):
48         del self.protocol.stream
49         del self.protocol.OnStreamPacket
50         del self.protocol
51         try: del self.isshutdown
52         except AttributeError: pass
53         try: del self.streamdata
54         except AttributeError: pass
55         try: del self.packet
56         except AttributeError: pass
57
58     #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
59     #~ Support methods
60     #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
61
62     def ShouldBe(self, good):
63         if good:
64             self.failIf(self.isshutdown is True)
65             self.failUnless(self.streamdata is None)
66             self.failIf(self.packet is None)
67         else:
68             self.failUnless(self.isshutdown is True)
69             self.failIf(self.streamdata is None)
70             self.failUnless(self.packet is None)
71
72     packet = None
73     def OnStreamPacket(self, packet):
74         self.packet = packet
75
76     streamdata = None
77     def write(self, streamdata):
78         self.streamdata = streamdata
79
80     isshutdown = False
81     def shutdown(self):
82         self.isshutdown = True
83
84 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
85
86 class SimpleTestCase(BaseTestCase):
87     def testNotFinishedCorrect(self):
88         '''Correct not finished'''
89         msg = "<message />"
90         self.protocol.OnRecvStreamData(msg)
91         self.failIf(self.isshutdown is True)
92         self.failUnless(self.streamdata is None)
93         self.failUnless(self.packet is None)
94
95     def testMessageCorrect(self):
96         '''Correct message'''
97         msg = "<message />\x1f"
98         self.protocol.OnRecvStreamData(msg)
99         self.ShouldBe(good=True)
100
101     def testErrorCorrect(self):
102         '''Correct error'''
103         msg = "<error />\x1f"
104         self.protocol.OnRecvStreamData(msg)
105         self.ShouldBe(good=True)
106
107     def testChildElementCorrect(self):
108         '''Correct namespaced child element'''
109         msg = "<message><NotAbsurd xmlns='SomethingElse'/></message>\x1f"
110         self.protocol.OnRecvStreamData(msg)
111         self.ShouldBe(good=True)
112
113     def testJunkAfterError(self):
114         '''Error junk after XML'''
115         msg = "<message/>junk\x1f"
116         self.protocol.OnRecvStreamData(msg)
117         self.ShouldBe(good=False)
118
119     def testInvalidXMLError(self):
120         '''Error invalid XML'''
121         msg = "</message>\x1f"
122         self.protocol.OnRecvStreamData(msg)
123         self.ShouldBe(good=False)
124
125     def testIncompleteError(self):
126         '''Error incomplete'''
127         msg = "<message>\x1f"
128         self.protocol.OnRecvStreamData(msg)
129         self.ShouldBe(good=False)
130
131     def testInvalidChildElementError(self):
132         '''Error invalid child element'''
133         msg = "<message><Absurd/></message>\x1f"
134         self.protocol.OnRecvStreamData(msg)
135         self.ShouldBe(good=False)
136
137 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
138
139 class StreamTestCase(BaseTestCase):
140     data = '\x00\x00 Some \x00 Binary \x01 Values \x1f should \x1fwork fine\xff\xff'
141
142     def testStreamSizeCorrect(self):
143         '''Correct stream size attribute'''
144         self.failIf(len(self.data) < 30, "Test assumes data is longer")
145         msg = "<message><stream size='30'/></message>\x1f" + self.data[:30]
146         self.protocol.OnRecvStreamData(msg)
147         self.ShouldBe(good=True)
148
149     def testStreamTwoSizeCorrect(self):
150         '''Correct two sized streams'''
151         self.failIf(len(self.data) < 30, "Test assumes data is longer")
152         msg = "<message><stream size='30'/><stream size='30'/></message>\x1f" + self.data[:30] + self.data[-30:]
153         self.protocol.OnRecvStreamData(msg)
154         self.ShouldBe(good=True)
155
156     def testStreamTermCorrect(self):
157         '''Correct \\x04 terminated stream'''
158         self.failIf(len(self.data) < 30, "Test assumes data is longer")
159         msg = "<message><stream format='terminated'/></message>\x1f" + self.data[:30] + '\x04'
160         self.protocol.OnRecvStreamData(msg)
161         self.ShouldBe(good=True)
162
163     def testStreamTermNewlineCorrect(self):
164         '''Correct \\n terminated stream'''
165         self.failIf(len(self.data) < 30, "Test assumes data is longer")
166         msg = "<message><stream format='terminated' ordinal='10'/></message>\x1f" + self.data[:30] + '\n'
167         self.protocol.OnRecvStreamData(msg)
168         self.ShouldBe(good=True)
169
170     def testTwoStreamTermCorrect(self):
171         '''Correct \\n terminated and \\x04 terminated stream'''
172         self.failIf(len(self.data) < 30, "Test assumes data is longer")
173         msg = "<message><stream format='terminated' ordinal='10'/><stream format='terminated'/></message>\x1f" + self.data[:30] + '\n' + self.data[-30:] + '\x04'
174         self.protocol.OnRecvStreamData(msg)
175         self.ShouldBe(good=True)
176
177     def testStreamInPartsCorrect(self):
178         '''Correct stream in parts'''
179         self.failIf(len(self.data) < 30, "Test assumes data is longer")
180         msg = "<message><stream size='30'/></message>\x1f" + self.data[:30]
181
182         while msg:
183             self.failIf(self.isshutdown is True)
184             self.failUnless(self.streamdata is None)
185             self.failUnless(self.packet is None)
186             self.protocol.OnRecvStreamData(msg[:5]); msg = msg[5:]
187
188         self.ShouldBe(good=True)
189
190     def testInvalidXMLError(self):
191         '''Error invalid XML'''
192         msg = "<message><stream></message>\x1f"
193         self.protocol.OnRecvStreamData(msg)
194         self.ShouldBe(good=False)
195
196     def testStreamSizeMissingError(self):
197         '''Error stream size attribute missing'''
198         msg = "<message><stream/></message>\x1f"
199         self.protocol.OnRecvStreamData(msg)
200         self.ShouldBe(good=False)
201
202     def testStreamFormatIncorrectError(self):
203         '''Error stream format attribute incorrect'''
204         msg = "<message><stream format='absurdformat'/></message>\x1f"
205         self.protocol.OnRecvStreamData(msg)
206         self.ShouldBe(good=False)
207
208     def testStreamLengthOverrunError(self):
209         '''Error stream length overrun'''
210         self.failIf(len(self.data) < 30, "Test assumes data is longer")
211         msg = "<message><stream size='3'/></message>\x1f" + self.data[:30]
212
213         self.protocol.OnRecvStreamData(msg)
214         self.failUnless(self.isshutdown is True)
215         self.failIf(self.streamdata is None)
216         self.failIf(self.packet is None)
217
218     def testStreamTermNullError(self):
219         '''Error \\x01 terminated stream with \x00 inbedded in data'''
220         self.failIf(len(self.data) < 30, "Test assumes data is longer")
221         msg = "<message><stream format='terminated' ordinal='0'/></message>\x1f" + self.data[:30] + '\x00'
222         self.protocol.OnRecvStreamData(msg)
223         self.failUnless(self.isshutdown is True)
224         self.failIf(self.streamdata is None)
225         self.failIf(self.packet is None)
226
227 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
228 #~ Unittest Main
229 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
230
231 if __name__=='__main__':
232     import logging
233     class NullTestingHandler(logging.Handler):
234         def emit(self, record): pass
235     logging.root.addHandler(NullTestingHandler())
236
237     unittest.main()
238
Note: See TracBrowser for help on using the browser.