1
2
3 """
4 Tests for thread usage in lxml.etree.
5 """
6
7 import unittest, threading, sys, os.path
8
9 this_dir = os.path.dirname(__file__)
10 if this_dir not in sys.path:
11 sys.path.insert(0, this_dir)
12
13 from common_imports import etree, HelperTestCase, BytesIO, _bytes
14
37
39 XML = self.etree.XML
40 style = XML(_bytes('''\
41 <xsl:stylesheet version="1.0"
42 xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
43 <xsl:template match="*">
44 <foo><xsl:copy><xsl:value-of select="/a/b/text()" /></xsl:copy></foo>
45 </xsl:template>
46 </xsl:stylesheet>'''))
47 st = etree.XSLT(style)
48
49 result = []
50
51 def run_thread():
52 root = XML(_bytes('<a><b>B</b><c>C</c></a>'))
53 result.append( st(root) )
54
55 self._run_thread(run_thread)
56 self.assertEquals('''\
57 <?xml version="1.0"?>
58 <foo><a>B</a></foo>
59 ''',
60 str(result[0]))
61
77
78 self._run_thread(run_thread)
79 self.assertEquals(_bytes('<a><b>B</b><c>C</c><foo><a>B</a></foo></a>'),
80 tostring(root))
81
97
98 def run_parse():
99 thread_root = self.etree.parse(BytesIO(xml)).getroot()
100 result.append(thread_root[0])
101 result.append(thread_root[-1])
102
103 def run_move_main():
104 result.append(fragment[0])
105
106 def run_build():
107 result.append(
108 Element("{myns}foo", attrib={'{test}attr':'val'}))
109 SubElement(result, "{otherns}tasty")
110
111 def run_xslt():
112 style = XML(_bytes('''\
113 <xsl:stylesheet version="1.0"
114 xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
115 <xsl:template match="*">
116 <foo><xsl:copy><xsl:value-of select="/a/b/text()" /></xsl:copy></foo>
117 </xsl:template>
118 </xsl:stylesheet>'''))
119 st = etree.XSLT(style)
120 result.append( st(root).getroot()[0] )
121
122 for test in (run_XML, run_parse, run_move_main, run_xslt):
123 tostring(result)
124 self._run_thread(test)
125
126 self.assertEquals(
127 _bytes('<ns0:root xmlns:ns0="myns" att="someval"><b>B</b><c xmlns="test">C</c><b>B</b><c xmlns="test">C</c><tags/><a>B</a></ns0:root>'),
128 tostring(result))
129
130 def strip_first():
131 root = Element("newroot")
132 root.append(result[0])
133
134 while len(result):
135 self._run_thread(strip_first)
136
137 self.assertEquals(
138 _bytes('<ns0:root xmlns:ns0="myns" att="someval"/>'),
139 tostring(result))
140
142 XML = self.etree.XML
143 root = XML(_bytes('<root><a>A</a><b xmlns="test">B</b><c/></root>'))
144 child_count = len(root)
145 def testrun():
146 for i in range(10000):
147 el = root[i%child_count]
148 del el
149 threads = [ threading.Thread(target=testrun)
150 for _ in range(10) ]
151 for thread in threads:
152 thread.start()
153 for thread in threads:
154 thread.join()
155
157 XML = self.etree.XML
158
159 class TestElement(etree.ElementBase):
160 pass
161
162 class MyLookup(etree.CustomElementClassLookup):
163 repeat = range(100)
164 def lookup(self, t, d, ns, name):
165 count = 0
166 for i in self.repeat:
167
168 count += 1
169 return TestElement
170
171 parser = self.etree.XMLParser()
172 parser.set_element_class_lookup(MyLookup())
173
174 root = XML(_bytes('<root><a>A</a><b xmlns="test">B</b><c/></root>'),
175 parser)
176
177 child_count = len(root)
178 def testrun():
179 for i in range(1000):
180 el = root[i%child_count]
181 del el
182 threads = [ threading.Thread(target=testrun)
183 for _ in range(10) ]
184 for thread in threads:
185 thread.start()
186 for thread in threads:
187 thread.join()
188
190 suite = unittest.TestSuite()
191 suite.addTests([unittest.makeSuite(ThreadingTestCase)])
192 return suite
193
194 if __name__ == '__main__':
195 print('to test use test.py %s' % __file__)
196