Browse Source

chore: uniform data

pull/588/head
Parodi, Eugenio 🌶 2 months ago
parent
commit
2c2ea9c4ad
  1. 33
      apps/ttkode/ttkode/plugins/_030/_main_scan.py
  2. 35
      apps/ttkode/ttkode/plugins/_030/_main_tests.py
  3. 33
      apps/ttkode/ttkode/plugins/_030/pytest_data.py
  4. 135
      apps/ttkode/ttkode/plugins/_030/pytest_engine.py
  5. 28
      apps/ttkode/ttkode/plugins/_030/pytest_glue.py
  6. 130
      apps/ttkode/ttkode/plugins/_030/pytest_widget.py

33
apps/ttkode/ttkode/plugins/_030/_main_scan.py

@ -0,0 +1,33 @@
# MIT License
#
# Copyright (c) 2026 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
def main() -> None:
import sys, pytest
dirname = sys.argv[1]
sys.path.append(f'{dirname}/..')
from _030.pytest_glue import ResultCollector_ItemCollected
collector = ResultCollector_ItemCollected()
pytest.main(['--collect-only', '-p', 'no:terminal', '.'], plugins=[collector])
main()

35
apps/ttkode/ttkode/plugins/_030/_main_tests.py

@ -0,0 +1,35 @@
# MIT License
#
# Copyright (c) 2026 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
def main() -> None:
import sys, pytest
dirname = sys.argv[1]
test_path = sys.argv[2]
sys.path.append(f'{dirname}/..')
from _030.pytest_glue import ResultCollector_Logreport
collector = ResultCollector_Logreport()
pytest.main(['-p', 'no:terminal', test_path], plugins=[collector])
main()

33
apps/ttkode/ttkode/plugins/_030/pytest_data.py

@ -0,0 +1,33 @@
# MIT License
#
# Copyright (c) 2026 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__all__ = ['TestResult']
from dataclasses import dataclass
from typing import Optional
@dataclass
class TestResult():
nodeId: str
outcome: str
duration: float
longrepr: Optional[str]

135
apps/ttkode/ttkode/plugins/_030/pytest_engine.py

@ -20,18 +20,20 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__all__ = ['PytestEngine', 'ScanItem']
__all__ = ['PytestEngine', 'ScanItem', 'TestResult']
import os
import sys
import json
import threading
import subprocess
from pathlib import Path
from dataclasses import dataclass, asdict, fields
from typing import Dict, Any, Optional, List
import TermTk as ttk
from _030.pytest_data import TestResult
def _strip_result(result: str) -> str:
lines = result.splitlines()
@ -45,18 +47,19 @@ class ScanItem():
class PytestEngine():
__slots__ = (
'_scan_lock',
'_scan_lock', '_test_lock',
'itemScanned','testResultReady',
'errorReported',
'endScan', 'endTest')
def __init__(self):
self.itemScanned = ttk.pyTTkSignal(ScanItem)
self.testResultReady = ttk.pyTTkSignal()
self.testResultReady = ttk.pyTTkSignal(TestResult)
self.errorReported = ttk.pyTTkSignal(str)
self.endScan = ttk.pyTTkSignal()
self.endTest = ttk.pyTTkSignal()
self._scan_lock = threading.RLock()
self._test_lock = threading.RLock()
def scan(self) -> Dict:
threading.Thread(target=self._scan_thread).start()
@ -64,25 +67,11 @@ class PytestEngine():
def _scan_thread(self):
with self._scan_lock:
dirname = os.path.dirname(__file__)
script = _strip_result(f"""
# I need this extra line to allow the debugger to inject the code
def main():
import sys, json, pytest
sys.path.append('{dirname}/..')
from _030.pytest_glue import ResultCollector_ItemCollected
collector = ResultCollector_ItemCollected()
pytest.main(['--collect-only', '-p', 'no:terminal', '.'], plugins=[collector])
# Print results as JSON to stdout
# print(json.dumps(collector.results))
main()
""")
script_file = Path(dirname) / '_main_scan.py'
script = script_file.read_text()
# Run the script in a subprocess and capture output
process = subprocess.Popen(
[sys.executable, "-c", script],
[sys.executable, "-c", script, dirname],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
@ -112,58 +101,54 @@ class PytestEngine():
self.endScan.emit()
# # Wait for threads to finish
# stdout_thread.join()
# stderr_thread.join()
# stdout_content = ''.join(stdout_lines)
# stderr_content = ''.join(stderr_lines)
# # Parse the JSON results from stdout
# try:
# return items
# except json.JSONDecodeError:
# ttk.TTkLog.error(f"Error parsing results: {process.stdout}")
# ttk.TTkLog.error(f"Stderr: {process.stderr}")
# except Exception as e:
# ttk.TTkLog.error(str(e))
# self.endScan.emit()
@staticmethod
def run_tests() -> Dict:
dirname = os.path.dirname(__file__)
script = _strip_result(f"""
# I need this extra line to allow the debugger to inject the code
def main():
import sys, json, pytest
sys.path.append('{dirname}/..')
from _030.pytest_glue import ResultCollector_Logreport
collector = ResultCollector_Logreport()
pytest.main(['-p', 'no:terminal', 'tests/'], plugins=[collector])
# Print results as JSON to stdout
print(json.dumps(collector.results))
main()
""")
# Run the script in a subprocess and capture output
process = subprocess.run(
[sys.executable, "-c", script],
capture_output=True,
text=True
)
# Parse the JSON results from stdout
try:
return json.loads(process.stdout)
except json.JSONDecodeError:
ttk.TTkLog.error(f"Error parsing results: {process.stdout}")
ttk.TTkLog.error(f"Stderr: {process.stderr}")
except Exception as e:
ttk.TTkLog.error(str(e))
return {}
def run_all_tests(self):
self.run_tests('tests')
def run_tests(self, test_path:str):
threading.Thread(target=self._run_tests_thread, args=(test_path,)).start()
def _run_tests_thread(self, test_path:str):
with self._test_lock:
dirname = os.path.dirname(__file__)
script_file = Path(dirname) / '_main_tests.py'
script = script_file.read_text()
# Run the script in a subprocess with streaming output
process = subprocess.Popen(
[sys.executable, "-c", script, dirname, test_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1, # Line buffered
universal_newlines=True
)
def read_stdout():
for line in iter(process.stdout.readline, ''):
line = line.strip()
if line:
try:
result_dict = json.loads(line)
result = TestResult(**result_dict)
self.testResultReady.emit(result)
except (json.JSONDecodeError, KeyError) as e:
ttk.TTkLog.error(f"Error parsing test result: {line}")
process.stdout.close()
def read_stderr():
for line in iter(process.stderr.readline, ''):
self.errorReported.emit(line)
process.stderr.close()
# Start threads to read stdout and stderr
stdout_thread = threading.Thread(target=read_stdout)
stderr_thread = threading.Thread(target=read_stderr)
stdout_thread.start()
stderr_thread.start()
# Wait for process to complete
process.wait()
self.endTest.emit()

28
apps/ttkode/ttkode/plugins/_030/pytest_glue.py

@ -22,21 +22,24 @@
import pytest
from dataclasses import dataclass, asdict, fields
from typing import Dict, Any, Optional, List, Literal
from dataclasses import asdict
from typing import Dict, Any
class ResultCollector_Logreport:
def __init__(self):
self.results:Dict[str,Any] = []
from _030.pytest_data import TestResult
class ResultCollector_Logreport:
def pytest_runtest_logreport(self, report:pytest.TestReport) -> None:
if report.when == "call":
self.results.append({
"nodeid": report.nodeid,
"outcome": report.outcome,
"duration": report.duration,
"longrepr": str(report.longrepr) if report.failed else None
})
result = TestResult(
nodeId=report.nodeid,
outcome=report.outcome,
duration=report.duration,
longrepr=str(report.longrepr) if report.failed else None
)
# Print result immediately for real-time streaming
import json
print(json.dumps(asdict(result)), flush=True)
class ResultCollector_ItemCollected:
def __init__(self):
@ -45,6 +48,3 @@ class ResultCollector_ItemCollected:
def pytest_itemcollected(self, item:pytest.Item) -> None:
# Called during --collect-only
print(item.nodeid)
self.results.append({
"nodeid": item.nodeid
})

130
apps/ttkode/ttkode/plugins/_030/pytest_widget.py

@ -22,13 +22,6 @@
__all__ = ['PyTestWidget']
import io
import os
import sys
import json
import pytest
import subprocess
from typing import Dict, List, Any, Optional
from pygments import highlight
@ -41,7 +34,7 @@ from ttkode import ttkodeProxy
from ttkode.app.ttkode import TTKodeFileWidgetItem
from .pytest_tree import _testStatus, TestTreeItemPath, TestTreeItemMethod
from .pytest_engine import PytestEngine, ScanItem
from .pytest_engine import PytestEngine, ScanItem, TestResult
def _strip_result(result: str) -> str:
lines = result.splitlines()
@ -49,6 +42,12 @@ def _strip_result(result: str) -> str:
result = "\n".join(line[indent:] for line in lines)
return result.lstrip('\n')
_out_map = {
'passed': ttk.TTkString('PASS', ttk.TTkColor.GREEN)+ttk.TTkColor.RST,
'failed': ttk.TTkString('FAIL', ttk.TTkColor.RED)+ttk.TTkColor.RST
}
_error_color = ttk.TTkColor.fgbg('#FFFF00',"#FF0000")
class PyTestWidget(ttk.TTkContainer):
__slots__ = (
'_test_engine',
@ -67,50 +66,27 @@ class PyTestWidget(ttk.TTkContainer):
layout.addWidget(res_tree:=ttk.TTkTree(), 1,0,1,2)
res_tree.setHeaderLabels(["Tests"])
testResults.setText('Ciaoooo Testsssss\nPippo')
testResults.setText('Info Tests')
self._res_tree = res_tree
btn_scan.clicked.connect(self._scan)
btn_run.clicked.connect(self._run_tests)
self._test_engine.testResultReady.connect(self._test_updated)
def _get_node_from_path(self, _n:TestTreeItemPath, _p:str) -> Optional[TestTreeItemPath]:
for _c in _n.children():
if isinstance(_c, TestTreeItemPath) and _c.path() == _p:
return _c
if _cc:= self._get_node_from_path(_n=_c, _p=_p):
return _cc
return None
@ttk.pyTTkSlot()
def _scan(self):
self._res_tree.clear()
self._test_results.clear()
# _tree = {}
# def _add_node(_node:str)->None:
# _full_path,_leaf = _node.split('::')
# _loop_tree = _tree
# for _path in _full_path.split('/'):
# if _path not in _loop_tree:
# _loop_tree[_path] = {}
# _loop_tree = _loop_tree[_path]
# _loop_tree[_leaf] = '_node'
# for _r in results:
# self._test_results.append(f"{_r['nodeid']}")
# _add_node(_r['nodeid'])
# def _iter_tree(_node:Dict,_path:str) -> List[ttk.TTkTreeWidgetItem]:
# _ret = []
# for _n,_content in _node.items():
# if isinstance(_content, str):
# _test_call = f"{_path}::{_n}"
# _ret_node = TestTreeItemMethod([ttk.TTkString(_n)],test_call=_test_call)
# elif isinstance(_content, dict):
# _node_path = f"{_path}/{_n}" if _path else _n
# _ret_node = TestTreeItemPath([ttk.TTkString(_n)], path=_node_path, expanded=True)
# _ret_node.addChildren(_iter_tree(_content,_node_path))
# _ret.append(_ret_node)
# return _ret
# self._test_results.append(f"{_tree}")
# self._res_tree.addTopLevelItems(_iter_tree(_tree,''))
# self._res_tree.resizeColumnToContents(0)
def _get_or_add_path_in_node(_n:TestTreeItemPath, _p:str, _name:str) -> TestTreeItemPath:
for _c in _n.children():
if isinstance(_c, TestTreeItemPath) and _c.path() == _p:
@ -153,42 +129,40 @@ class PyTestWidget(ttk.TTkContainer):
self._test_engine.scan()
@ttk.pyTTkSlot()
def _run_tests(self):
self._test_results.clear()
def _mark_node(self, _nodeid:str, outcome:str) -> None:
status = {
'passed' : _testStatus.Pass,
'failed' : _testStatus.Fail,
}.get(outcome, _testStatus.Undefined)
def _recurse_node(_n:ttk.TTkTreeWidgetItem):
for _c in _n.children():
# ttk.TTkLog.debug(_c.data(0))
if isinstance(_c, TestTreeItemPath) and _nodeid.startswith(_c.path()):
_c.setTestStatus(status)
elif isinstance(_c, TestTreeItemMethod) and _nodeid.startswith(_c.test_call()):
_c.setTestStatus(status)
_recurse_node(_c)
_recurse_node(self._res_tree.invisibleRootItem())
@ttk.pyTTkSlot(TestResult)
def _test_updated(self, test:TestResult) -> None:
self._mark_node(_nodeid=test.nodeId, outcome=test.outcome)
_outcome = _out_map.get(test.outcome, test.outcome)
self._test_results.append(f"{test.nodeId}: " + _outcome + f" ({test.duration:.2f}s)")
if test.outcome == 'failed':
self._test_results.append(ttk.TTkString("◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤ ↳ Error: ◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤", ttk.TTkColor.RED))
_code = test.longrepr
_code_str = ttk.TTkString(highlight(_code, PythonLexer(), TerminalTrueColorFormatter(style='material')))
_code_h_err = [
_l.setColor(_error_color) if _l._text.startswith('E') else _l
for _l in _code_str.split('\n')]
self._test_results.append(ttk.TTkString('\n').join(_code_h_err))
self._test_results.append(ttk.TTkString("◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤", ttk.TTkColor.RED))
def _mark_node(_nodeid:str, outcome:str):
status = {
'passed' : _testStatus.Pass,
'failed' : _testStatus.Fail,
}.get(outcome, _testStatus.Undefined)
def _recurse_node(_n:ttk.TTkTreeWidgetItem):
for _c in _n.children():
# ttk.TTkLog.debug(_c.data(0))
if isinstance(_c, TestTreeItemPath) and _nodeid.startswith(_c.path()):
_c.setTestStatus(status)
elif isinstance(_c, TestTreeItemMethod) and _nodeid.startswith(_c.test_call()):
_c.setTestStatus(status)
_recurse_node(_c)
_recurse_node(self._res_tree.invisibleRootItem())
results = PytestEngine.run_tests()
_out_map = {
'passed': ttk.TTkString('PASS', ttk.TTkColor.GREEN)+ttk.TTkColor.RST,
'failed': ttk.TTkString('FAIL', ttk.TTkColor.RED)+ttk.TTkColor.RST
}
_error_color = ttk.TTkColor.fgbg('#FFFF00',"#FF0000")
for _r in results:
_outcome = _out_map.get(_r['outcome'], _r['outcome'])
self._test_results.append(f"{_r['nodeid']}: " + _outcome + f" ({_r['duration']:.2f}s)")
_mark_node(_r['nodeid'], _r['outcome'])
if _r['outcome'] == 'failed':
self._test_results.append(ttk.TTkString("◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤ ↳ Error: ◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤", ttk.TTkColor.RED))
_code = _r['longrepr']
_code_str = ttk.TTkString(highlight(_code, PythonLexer(), TerminalTrueColorFormatter(style='material')))
_code_h_err = [
_l.setColor(_error_color) if _l._text.startswith('E') else _l
for _l in _code_str.split('\n')]
self._test_results.append(ttk.TTkString('\n').join(_code_h_err))
self._test_results.append(ttk.TTkString("◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤◢◤", ttk.TTkColor.RED))
@ttk.pyTTkSlot()
def _run_tests(self) -> None:
self._test_results.clear()
self._test_engine.run_all_tests()
Loading…
Cancel
Save