diff --git a/apps/ttkode/ttkode/plugins/_030/_main_scan.py b/apps/ttkode/ttkode/plugins/_030/_main_scan.py new file mode 100644 index 00000000..e8a6b80a --- /dev/null +++ b/apps/ttkode/ttkode/plugins/_030/_main_scan.py @@ -0,0 +1,33 @@ +# MIT License +# +# Copyright (c) 2026 Eugenio Parodi +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +def main() -> None: + import sys, pytest + + dirname = sys.argv[1] + + sys.path.append(f'{dirname}/..') + from _030.pytest_glue import ResultCollector_ItemCollected + + collector = ResultCollector_ItemCollected() + pytest.main(['--collect-only', '-p', 'no:terminal', '.'], plugins=[collector]) +main() \ No newline at end of file diff --git a/apps/ttkode/ttkode/plugins/_030/_main_tests.py b/apps/ttkode/ttkode/plugins/_030/_main_tests.py new file mode 100644 index 00000000..38051644 --- /dev/null +++ b/apps/ttkode/ttkode/plugins/_030/_main_tests.py @@ -0,0 +1,35 @@ +# MIT License +# +# Copyright (c) 2026 Eugenio Parodi +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +def main() -> None: + import sys, pytest + + dirname = sys.argv[1] + test_path = sys.argv[2] + + sys.path.append(f'{dirname}/..') + from _030.pytest_glue import ResultCollector_Logreport + + + collector = ResultCollector_Logreport() + pytest.main(['-p', 'no:terminal', test_path], plugins=[collector]) +main() \ No newline at end of file diff --git a/apps/ttkode/ttkode/plugins/_030/pytest_data.py b/apps/ttkode/ttkode/plugins/_030/pytest_data.py new file mode 100644 index 00000000..c3184fc3 --- /dev/null +++ b/apps/ttkode/ttkode/plugins/_030/pytest_data.py @@ -0,0 +1,33 @@ +# MIT License +# +# Copyright (c) 2026 Eugenio Parodi +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +__all__ = ['TestResult'] + +from dataclasses import dataclass +from typing import Optional + +@dataclass +class TestResult(): + nodeId: str + outcome: str + duration: float + longrepr: Optional[str] \ No newline at end of file diff --git a/apps/ttkode/ttkode/plugins/_030/pytest_engine.py b/apps/ttkode/ttkode/plugins/_030/pytest_engine.py index 582c6abc..ed35dd4d 100644 --- a/apps/ttkode/ttkode/plugins/_030/pytest_engine.py +++ b/apps/ttkode/ttkode/plugins/_030/pytest_engine.py @@ -20,18 +20,20 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -__all__ = ['PytestEngine', 'ScanItem'] +__all__ = ['PytestEngine', 'ScanItem', 'TestResult'] import os import sys import json import threading import subprocess +from pathlib import Path from dataclasses import dataclass, asdict, fields from typing import Dict, Any, Optional, List import TermTk as ttk +from _030.pytest_data import TestResult def _strip_result(result: str) -> str: lines = result.splitlines() @@ -45,18 +47,19 @@ class ScanItem(): class PytestEngine(): __slots__ = ( - '_scan_lock', + '_scan_lock', '_test_lock', 'itemScanned','testResultReady', 'errorReported', 'endScan', 'endTest') def __init__(self): self.itemScanned = ttk.pyTTkSignal(ScanItem) - self.testResultReady = ttk.pyTTkSignal() + self.testResultReady = ttk.pyTTkSignal(TestResult) self.errorReported = ttk.pyTTkSignal(str) self.endScan = ttk.pyTTkSignal() self.endTest = ttk.pyTTkSignal() self._scan_lock = threading.RLock() + self._test_lock = threading.RLock() def scan(self) -> Dict: threading.Thread(target=self._scan_thread).start() @@ -64,25 +67,11 @@ class PytestEngine(): def _scan_thread(self): with self._scan_lock: dirname = os.path.dirname(__file__) - script = _strip_result(f""" - # I need this extra line to allow the debugger to inject the code - def main(): - import sys, json, pytest - - sys.path.append('{dirname}/..') - from _030.pytest_glue import ResultCollector_ItemCollected - - collector = ResultCollector_ItemCollected() - pytest.main(['--collect-only', '-p', 'no:terminal', '.'], plugins=[collector]) - - # Print results as JSON to stdout - # print(json.dumps(collector.results)) - main() - """) - + script_file = Path(dirname) / '_main_scan.py' + script = script_file.read_text() # Run the script in a subprocess and capture output process = subprocess.Popen( - [sys.executable, "-c", script], + [sys.executable, "-c", script, dirname], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, @@ -112,58 +101,54 @@ class PytestEngine(): self.endScan.emit() - # # Wait for threads to finish - # stdout_thread.join() - # stderr_thread.join() - - # stdout_content = ''.join(stdout_lines) - # stderr_content = ''.join(stderr_lines) - - # # Parse the JSON results from stdout - # try: - # return items - # except json.JSONDecodeError: - # ttk.TTkLog.error(f"Error parsing results: {process.stdout}") - # ttk.TTkLog.error(f"Stderr: {process.stderr}") - # except Exception as e: - # ttk.TTkLog.error(str(e)) - - # self.endScan.emit() - - - @staticmethod - def run_tests() -> Dict: - dirname = os.path.dirname(__file__) - script = _strip_result(f""" - # I need this extra line to allow the debugger to inject the code - def main(): - import sys, json, pytest - - sys.path.append('{dirname}/..') - from _030.pytest_glue import ResultCollector_Logreport - - collector = ResultCollector_Logreport() - pytest.main(['-p', 'no:terminal', 'tests/'], plugins=[collector]) - - # Print results as JSON to stdout - print(json.dumps(collector.results)) - main() - """) - - # Run the script in a subprocess and capture output - process = subprocess.run( - [sys.executable, "-c", script], - capture_output=True, - text=True - ) - - # Parse the JSON results from stdout - try: - return json.loads(process.stdout) - except json.JSONDecodeError: - ttk.TTkLog.error(f"Error parsing results: {process.stdout}") - ttk.TTkLog.error(f"Stderr: {process.stderr}") - except Exception as e: - ttk.TTkLog.error(str(e)) - - return {} + + def run_all_tests(self): + self.run_tests('tests') + + def run_tests(self, test_path:str): + threading.Thread(target=self._run_tests_thread, args=(test_path,)).start() + + def _run_tests_thread(self, test_path:str): + with self._test_lock: + dirname = os.path.dirname(__file__) + script_file = Path(dirname) / '_main_tests.py' + script = script_file.read_text() + + # Run the script in a subprocess with streaming output + process = subprocess.Popen( + [sys.executable, "-c", script, dirname, test_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, # Line buffered + universal_newlines=True + ) + + def read_stdout(): + for line in iter(process.stdout.readline, ''): + line = line.strip() + if line: + try: + result_dict = json.loads(line) + result = TestResult(**result_dict) + self.testResultReady.emit(result) + except (json.JSONDecodeError, KeyError) as e: + ttk.TTkLog.error(f"Error parsing test result: {line}") + process.stdout.close() + + def read_stderr(): + for line in iter(process.stderr.readline, ''): + self.errorReported.emit(line) + process.stderr.close() + + # Start threads to read stdout and stderr + stdout_thread = threading.Thread(target=read_stdout) + stderr_thread = threading.Thread(target=read_stderr) + + stdout_thread.start() + stderr_thread.start() + + # Wait for process to complete + process.wait() + + self.endTest.emit() diff --git a/apps/ttkode/ttkode/plugins/_030/pytest_glue.py b/apps/ttkode/ttkode/plugins/_030/pytest_glue.py index f96fa8ae..ada77c0a 100644 --- a/apps/ttkode/ttkode/plugins/_030/pytest_glue.py +++ b/apps/ttkode/ttkode/plugins/_030/pytest_glue.py @@ -22,21 +22,24 @@ import pytest -from dataclasses import dataclass, asdict, fields -from typing import Dict, Any, Optional, List, Literal +from dataclasses import asdict +from typing import Dict, Any -class ResultCollector_Logreport: - def __init__(self): - self.results:Dict[str,Any] = [] +from _030.pytest_data import TestResult +class ResultCollector_Logreport: def pytest_runtest_logreport(self, report:pytest.TestReport) -> None: if report.when == "call": - self.results.append({ - "nodeid": report.nodeid, - "outcome": report.outcome, - "duration": report.duration, - "longrepr": str(report.longrepr) if report.failed else None - }) + result = TestResult( + nodeId=report.nodeid, + outcome=report.outcome, + duration=report.duration, + longrepr=str(report.longrepr) if report.failed else None + ) + # Print result immediately for real-time streaming + import json + print(json.dumps(asdict(result)), flush=True) + class ResultCollector_ItemCollected: def __init__(self): @@ -45,6 +48,3 @@ class ResultCollector_ItemCollected: def pytest_itemcollected(self, item:pytest.Item) -> None: # Called during --collect-only print(item.nodeid) - self.results.append({ - "nodeid": item.nodeid - }) diff --git a/apps/ttkode/ttkode/plugins/_030/pytest_widget.py b/apps/ttkode/ttkode/plugins/_030/pytest_widget.py index ffb724a6..fd889d2c 100644 --- a/apps/ttkode/ttkode/plugins/_030/pytest_widget.py +++ b/apps/ttkode/ttkode/plugins/_030/pytest_widget.py @@ -22,13 +22,6 @@ __all__ = ['PyTestWidget'] -import io -import os -import sys -import json -import pytest -import subprocess - from typing import Dict, List, Any, Optional from pygments import highlight @@ -41,7 +34,7 @@ from ttkode import ttkodeProxy from ttkode.app.ttkode import TTKodeFileWidgetItem from .pytest_tree import _testStatus, TestTreeItemPath, TestTreeItemMethod -from .pytest_engine import PytestEngine, ScanItem +from .pytest_engine import PytestEngine, ScanItem, TestResult def _strip_result(result: str) -> str: lines = result.splitlines() @@ -49,6 +42,12 @@ def _strip_result(result: str) -> str: result = "\n".join(line[indent:] for line in lines) return result.lstrip('\n') +_out_map = { + 'passed': ttk.TTkString('PASS', ttk.TTkColor.GREEN)+ttk.TTkColor.RST, + 'failed': ttk.TTkString('FAIL', ttk.TTkColor.RED)+ttk.TTkColor.RST +} +_error_color = ttk.TTkColor.fgbg('#FFFF00',"#FF0000") + class PyTestWidget(ttk.TTkContainer): __slots__ = ( '_test_engine', @@ -67,50 +66,27 @@ class PyTestWidget(ttk.TTkContainer): layout.addWidget(res_tree:=ttk.TTkTree(), 1,0,1,2) res_tree.setHeaderLabels(["Tests"]) - - testResults.setText('Ciaoooo Testsssss\nPippo') + testResults.setText('Info Tests') self._res_tree = res_tree btn_scan.clicked.connect(self._scan) btn_run.clicked.connect(self._run_tests) + self._test_engine.testResultReady.connect(self._test_updated) + + def _get_node_from_path(self, _n:TestTreeItemPath, _p:str) -> Optional[TestTreeItemPath]: + for _c in _n.children(): + if isinstance(_c, TestTreeItemPath) and _c.path() == _p: + return _c + if _cc:= self._get_node_from_path(_n=_c, _p=_p): + return _cc + return None @ttk.pyTTkSlot() def _scan(self): self._res_tree.clear() self._test_results.clear() - # _tree = {} - # def _add_node(_node:str)->None: - # _full_path,_leaf = _node.split('::') - # _loop_tree = _tree - # for _path in _full_path.split('/'): - # if _path not in _loop_tree: - # _loop_tree[_path] = {} - # _loop_tree = _loop_tree[_path] - # _loop_tree[_leaf] = '_node' - - # for _r in results: - # self._test_results.append(f"{_r['nodeid']}") - # _add_node(_r['nodeid']) - - # def _iter_tree(_node:Dict,_path:str) -> List[ttk.TTkTreeWidgetItem]: - # _ret = [] - # for _n,_content in _node.items(): - # if isinstance(_content, str): - # _test_call = f"{_path}::{_n}" - # _ret_node = TestTreeItemMethod([ttk.TTkString(_n)],test_call=_test_call) - # elif isinstance(_content, dict): - # _node_path = f"{_path}/{_n}" if _path else _n - # _ret_node = TestTreeItemPath([ttk.TTkString(_n)], path=_node_path, expanded=True) - # _ret_node.addChildren(_iter_tree(_content,_node_path)) - # _ret.append(_ret_node) - # return _ret - - # self._test_results.append(f"{_tree}") - # self._res_tree.addTopLevelItems(_iter_tree(_tree,'')) - # self._res_tree.resizeColumnToContents(0) - def _get_or_add_path_in_node(_n:TestTreeItemPath, _p:str, _name:str) -> TestTreeItemPath: for _c in _n.children(): if isinstance(_c, TestTreeItemPath) and _c.path() == _p: @@ -153,42 +129,40 @@ class PyTestWidget(ttk.TTkContainer): self._test_engine.scan() - @ttk.pyTTkSlot() - def _run_tests(self): - self._test_results.clear() + def _mark_node(self, _nodeid:str, outcome:str) -> None: + status = { + 'passed' : _testStatus.Pass, + 'failed' : _testStatus.Fail, + }.get(outcome, _testStatus.Undefined) + def _recurse_node(_n:ttk.TTkTreeWidgetItem): + for _c in _n.children(): + # ttk.TTkLog.debug(_c.data(0)) + if isinstance(_c, TestTreeItemPath) and _nodeid.startswith(_c.path()): + _c.setTestStatus(status) + elif isinstance(_c, TestTreeItemMethod) and _nodeid.startswith(_c.test_call()): + _c.setTestStatus(status) + _recurse_node(_c) + _recurse_node(self._res_tree.invisibleRootItem()) + + + @ttk.pyTTkSlot(TestResult) + def _test_updated(self, test:TestResult) -> None: + self._mark_node(_nodeid=test.nodeId, outcome=test.outcome) + + _outcome = _out_map.get(test.outcome, test.outcome) + self._test_results.append(f"{test.nodeId}: " + _outcome + f" ({test.duration:.2f}s)") + if test.outcome == 'failed': + self._test_results.append(ttk.TTkString("◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤ ↳ Error: ◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤", ttk.TTkColor.RED)) + _code = test.longrepr + _code_str = ttk.TTkString(highlight(_code, PythonLexer(), TerminalTrueColorFormatter(style='material'))) + _code_h_err = [ + _l.setColor(_error_color) if _l._text.startswith('E') else _l + for _l in _code_str.split('\n')] + self._test_results.append(ttk.TTkString('\n').join(_code_h_err)) + self._test_results.append(ttk.TTkString("◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤", ttk.TTkColor.RED)) - def _mark_node(_nodeid:str, outcome:str): - status = { - 'passed' : _testStatus.Pass, - 'failed' : _testStatus.Fail, - }.get(outcome, _testStatus.Undefined) - def _recurse_node(_n:ttk.TTkTreeWidgetItem): - for _c in _n.children(): - # ttk.TTkLog.debug(_c.data(0)) - if isinstance(_c, TestTreeItemPath) and _nodeid.startswith(_c.path()): - _c.setTestStatus(status) - elif isinstance(_c, TestTreeItemMethod) and _nodeid.startswith(_c.test_call()): - _c.setTestStatus(status) - _recurse_node(_c) - _recurse_node(self._res_tree.invisibleRootItem()) - - results = PytestEngine.run_tests() - _out_map = { - 'passed': ttk.TTkString('PASS', ttk.TTkColor.GREEN)+ttk.TTkColor.RST, - 'failed': ttk.TTkString('FAIL', ttk.TTkColor.RED)+ttk.TTkColor.RST - } - _error_color = ttk.TTkColor.fgbg('#FFFF00',"#FF0000") - for _r in results: - _outcome = _out_map.get(_r['outcome'], _r['outcome']) - self._test_results.append(f"{_r['nodeid']}: " + _outcome + f" ({_r['duration']:.2f}s)") - _mark_node(_r['nodeid'], _r['outcome']) - if _r['outcome'] == 'failed': - self._test_results.append(ttk.TTkString("◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤ ↳ Error: ◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤", ttk.TTkColor.RED)) - _code = _r['longrepr'] - _code_str = ttk.TTkString(highlight(_code, PythonLexer(), TerminalTrueColorFormatter(style='material'))) - _code_h_err = [ - _l.setColor(_error_color) if _l._text.startswith('E') else _l - for _l in _code_str.split('\n')] - self._test_results.append(ttk.TTkString('\n').join(_code_h_err)) - self._test_results.append(ttk.TTkString("◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤", ttk.TTkColor.RED)) + @ttk.pyTTkSlot() + def _run_tests(self) -> None: + self._test_results.clear() + self._test_engine.run_all_tests() \ No newline at end of file