Browse Source

chore: add tests

pull/575/head
Parodi, Eugenio 🌶 2 months ago
parent
commit
6680b8a55a
  1. 20
      docs/source/sphinx_modules/test_01.py
  2. 0
      tests/t.generic/test.generic.010.daemon.02.py
  3. 49
      tests/t.generic/test.generic.013.multiprocessing.01.py
  4. 56
      tests/t.generic/test.generic.013.multiprocessing.02.py
  5. 17
      tests/t.generic/test.pytest.01.collector.py

20
docs/source/sphinx_modules/test_01.py

@ -23,16 +23,16 @@ if True or TYPE_CHECKING:
from sphinx.writers.html5 import HTML5Translator
def test_process_signature(app, what, name, obj, options, sig, ret_ann):
'''Callback function to provide new signatures.'''
print(f"{app=}\n{what=}\n{name=}\n{obj=}\n{options=}\n{sig=}\n{ret_ann=}")
return '\n'.join(["a","b","c"]), "Pipppo Pippero"
def test_process_docstring(app, what, name, obj, options, lines:list[str]):
'''Callback function to provide overloaded signatures.'''
print(f"{app=}\n{what=}\n{name=}\n{obj=}\n{options=}\n{lines=}")
for i,line in enumerate(lines):
lines[i] = line.replace("Table","PIPPO")
# def test_process_signature(app, what, name, obj, options, sig, ret_ann):
# '''Callback function to provide new signatures.'''
# print(f"{app=}\n{what=}\n{name=}\n{obj=}\n{options=}\n{sig=}\n{ret_ann=}")
# return '\n'.join(["a","b","c"]), "Pipppo Pippero"
# def test_process_docstring(app, what, name, obj, options, lines:list[str]):
# '''Callback function to provide overloaded signatures.'''
# print(f"{app=}\n{what=}\n{name=}\n{obj=}\n{options=}\n{lines=}")
# for i,line in enumerate(lines):
# lines[i] = line.replace("Table","PIPPO")
class TermTkMethod(PyMethod):
option_spec: ClassVar[OptionSpec] = PyMethod.option_spec.copy()

0
tests/t.generic/test.generic.010.daemon.02 copy.py → tests/t.generic/test.generic.010.daemon.02.py

49
tests/t.generic/test.generic.013.multiprocessing.01.py

@ -0,0 +1,49 @@
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2025 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import time, multiprocessing
print(1)
def task():
print(2)
process = multiprocessing.current_process()
print(f"Task: {process.daemon=}")
for i in range(15):
print(f"{i=}",flush=True)
time.sleep(0.1)
print(3)
if __name__ == '__main__':
print(4)
process = multiprocessing.Process(target=task, daemon=True)
process.start()
process.join()
print(f"Main Process exit...")

56
tests/t.generic/test.generic.013.multiprocessing.02.py

@ -0,0 +1,56 @@
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2025 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import time, multiprocessing
print(1)
def task(a,b,c,d,shared_results):
print(2,a,b,c,d,shared_results)
process = multiprocessing.current_process()
print(f"Task: {process.daemon=}")
for i in range(15):
print(f"{i=}",flush=True)
time.sleep(0.1)
shared_results['pippo'] = (5,6,7,8)
return 3,4,5,6
print(3)
if __name__ == '__main__':
print(4)
manager = multiprocessing.Manager()
shared_results = manager.dict()
process = multiprocessing.Process(target=task, daemon=True, args=(1,2,3,4,shared_results))
process.start()
process.join()
print(shared_results)
print(f"Main Process exit...")

17
tests/t.generic/test.pytest.01.collector.py

@ -13,14 +13,23 @@ class ResultCollector:
"longrepr": str(report.longrepr) if report.failed else None
})
def pytest_itemcollected(self, item):
# Called during --collect-only
self.results.append({
"nodeid": item.nodeid,
"outcome": "collected",
"duration": 0,
"longrepr": None
})
def run_tests_and_collect():
collector = ResultCollector()
pytest.main(["tests/"], plugins=[collector])
pytest.main(["-p no:terminal", "tests/"], plugins=[collector])
return collector.results
def run_tests_and_collect_2():
collector = ResultCollector()
pytest.main(["--collect-only", "-q", "tests/"], plugins=[collector])
pytest.main(["--collect-only", "-p no:terminal", "tests/"], plugins=[collector])
return collector.results
if __name__ == "__main__":
@ -30,10 +39,10 @@ if __name__ == "__main__":
if r['outcome'] == 'failed':
print(" ↳ Error:", r['longrepr'])
print()
print("##################")
results = run_tests_and_collect_2()
for r in results:
print(f"{r['nodeid']}: {r['outcome']} ({r['duration']:.2f}s)")
print(f"{r['nodeid']}")
if r['outcome'] == 'failed':
print(" ↳ Error:", r['longrepr'])

Loading…
Cancel
Save