Browse Source

chore: add sqlite model test and fix

pull/460/head
Parodi, Eugenio 🌶 5 months ago
parent
commit
c3a5be7157
  1. 91
      libs/pyTermTk/TermTk/TTkWidgets/TTkModelView/tablemodelsqlite3.py
  2. 337
      tests/pytest/modelView/test_002_tablemodelsqlite3.py

91
libs/pyTermTk/TermTk/TTkWidgets/TTkModelView/tablemodelsqlite3.py

@ -25,6 +25,8 @@ __all__=['TTkTableModelSQLite3']
import sqlite3
import threading
from typing import Any
from TermTk.TTkCore.log import TTkLog
from TermTk.TTkCore.constant import TTkK
from TermTk.TTkAbstract.abstracttablemodel import TTkAbstractTableModel, TTkModelIndex
@ -102,28 +104,27 @@ class TTkTableModelSQLite3(TTkAbstractTableModel):
self._sort = ''
self._sortColumn = -1
self._sqliteMutex.acquire()
self._conn = conn = sqlite3.connect(fileName, check_same_thread=False)
self._cur = cur = self._conn.cursor()
with self._sqliteMutex:
self._conn = conn = sqlite3.connect(fileName, check_same_thread=False)
self._cur = cur = self._conn.cursor()
res = cur.execute(f"SELECT COUNT(*) FROM {table}")
self._count = res.fetchone()[0]
res = cur.execute(f"SELECT COUNT(*) FROM {table}")
self._count = res.fetchone()[0]
for row in cur.execute(f"PRAGMA table_info({table})"):
if row[-1] == 0:
self._columns.append(row[1])
if row[-1] == 1:
self._key = row[1]
for row in cur.execute(f"PRAGMA table_info({table})"):
if row[-1] == 0:
self._columns.append(row[1])
if row[-1] == 1:
self._key = row[1]
self._refreshIdMap()
self._sqliteMutex.release()
self._refreshIdMap()
super().__init__()
def _refreshIdMap(self):
self._idMap = {
_id : _rn
for _rn,_id in self._cur.execute(f"SELECT ROW_NUMBER() OVER ({self._sort}) RN, {self._key} from users")}
for _rn,_id in self._cur.execute(f"SELECT ROW_NUMBER() OVER ({self._sort}) RN, {self._key} from {self._table}")}
def rowCount(self) -> int:
return self._count
@ -135,39 +136,36 @@ class TTkTableModelSQLite3(TTkAbstractTableModel):
return self._idMap[key]
def index(self, row:int, col:int) -> TTkModelIndex:
self._sqliteMutex.acquire()
res = self._cur.execute(
f"SELECT {self._key} FROM {self._table} "
f"{self._sort} "
f"LIMIT 1 OFFSET {row}")
key=res.fetchone()[0]
self._sqliteMutex.release()
return _TTkModelIndexSQLite3(col=col,rowId=key,sqModel=self)
def data(self, row:int, col:int) -> None:
self._sqliteMutex.acquire()
res = self._cur.execute(
f"SELECT {self._columns[col]} FROM {self._table} "
f"{self._sort} "
f"LIMIT 1 OFFSET {row}")
self._sqliteMutex.release()
return res.fetchone()[0]
with self._sqliteMutex:
res = self._cur.execute(
f"SELECT {self._key} FROM {self._table} "
f"{self._sort} "
f"LIMIT 1 OFFSET {row}")
key=res.fetchone()[0]
return _TTkModelIndexSQLite3(col=col,rowId=key,sqModel=self)
def data(self, row:int, col:int) -> Any:
with self._sqliteMutex:
res = self._cur.execute(
f"SELECT {self._columns[col]} FROM {self._table} "
f"{self._sort} "
f"LIMIT 1 OFFSET {row}")
return res.fetchone()[0]
def setData(self, row:int, col:int, data:object) -> None:
self._sqliteMutex.acquire()
res = self._cur.execute(
f"SELECT {self._key} FROM {self._table} "
f"{self._sort} "
f"LIMIT 1 OFFSET {row}")
key=res.fetchone()[0]
res = self._cur.execute(
f"UPDATE {self._table} "
f"SET {self._columns[col]} = '{data}' "
f"WHERE {self._key} = {key} ")
self._conn.commit()
if col == self._sortColumn:
self._refreshIdMap()
self._sqliteMutex.releases()
with self._sqliteMutex:
res = self._cur.execute(
f"SELECT {self._key} FROM {self._table} "
f"{self._sort} "
f"LIMIT 1 OFFSET {row}")
key=res.fetchone()[0]
res = self._cur.execute(
f"UPDATE {self._table} "
f"SET {self._columns[col]} = '{data}' "
f"WHERE {self._key} = {key} ")
self._conn.commit()
if col == self._sortColumn:
self._refreshIdMap()
return True
def headerData(self, num:int, orientation:int):
@ -191,6 +189,5 @@ class TTkTableModelSQLite3(TTkAbstractTableModel):
self._sort = f"ORDER BY {self._columns[column]} ASC"
else:
self._sort = f"ORDER BY {self._columns[column]} DESC"
self._sqliteMutex.acquire()
self._refreshIdMap()
self._sqliteMutex.release()
with self._sqliteMutex:
self._refreshIdMap()

337
tests/pytest/modelView/test_002_tablemodelsqlite3.py

@ -0,0 +1,337 @@
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2025 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import os
import sys
import sqlite3
import tempfile
from unittest.mock import Mock
import pytest
sys.path.append(os.path.join(sys.path[0],'../../../libs/pyTermTk'))
import TermTk as ttk
class TestTTkTableModelSQLite3:
"""Test cases for TTkTableModelSQLite3 class"""
def setup_method(self):
"""Set up test database for each test"""
# Create temporary database file
self.temp_db = tempfile.NamedTemporaryFile(suffix='.db', delete=False)
self.temp_db_path = self.temp_db.name
self.temp_db.close()
# Create test database and table
self.conn = sqlite3.connect(self.temp_db_path)
self.cur = self.conn.cursor()
# Create users table with primary key
self.cur.execute('''
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
age INTEGER,
role TEXT
)
''')
# Insert test data
test_data = [
('Alice', 25, 'Engineer'),
('Bob', 30, 'Designer'),
('Charlie', 35, 'Manager'),
('Diana', 28, 'Developer')
]
self.cur.executemany('INSERT INTO users (name, age, role) VALUES (?, ?, ?)', test_data)
self.conn.commit()
self.conn.close()
def teardown_method(self):
"""Clean up after each test"""
if os.path.exists(self.temp_db_path):
os.unlink(self.temp_db_path)
def test_init_with_valid_database(self):
"""Test initialization with valid database and table"""
model = ttk.TTkTableModelSQLite3(fileName=self.temp_db_path, table='users')
assert model.rowCount() == 4
assert model.columnCount() == 3 # name, age, role (id is primary key, not in columns)
def test_init_with_invalid_table(self):
"""Test initialization with invalid table name"""
with pytest.raises(sqlite3.OperationalError):
ttk.TTkTableModelSQLite3(fileName=self.temp_db_path, table='nonexistent_table')
def test_init_with_invalid_database(self):
"""Test initialization with invalid database file"""
with pytest.raises(sqlite3.OperationalError):
ttk.TTkTableModelSQLite3(fileName='/nonexistent/path/db.sqlite', table='users')
def test_column_detection(self):
"""Test that columns are correctly detected from database schema"""
model = ttk.TTkTableModelSQLite3(fileName=self.temp_db_path, table='users')
# Check header data for columns
assert model.headerData(0, ttk.TTkK.HORIZONTAL) == 'name'
assert model.headerData(1, ttk.TTkK.HORIZONTAL) == 'age'
assert model.headerData(2, ttk.TTkK.HORIZONTAL) == 'role'
def test_data_access(self):
"""Test data retrieval from database"""
model = ttk.TTkTableModelSQLite3(fileName=self.temp_db_path, table='users')
# Test data access
assert model.data(0, 0) == 'Alice' # First row, name column
assert model.data(0, 1) == 25 # First row, age column
assert model.data(0, 2) == 'Engineer' # First row, role column
assert model.data(1, 0) == 'Bob'
assert model.data(2, 0) == 'Charlie'
assert model.data(3, 0) == 'Diana'
def test_set_data(self):
"""Test data modification in database"""
model = ttk.TTkTableModelSQLite3(fileName=self.temp_db_path, table='users')
# Change Alice's name to 'Alicia'
result = model.setData(0, 0, 'Alicia')
assert result == True
# Verify the change
assert model.data(0, 0) == 'Alicia'
# Verify change persisted in database
conn = sqlite3.connect(self.temp_db_path)
cur = conn.cursor()
res = cur.execute("SELECT name FROM users WHERE id = 1")
assert res.fetchone()[0] == 'Alicia'
conn.close()
def test_index_method(self):
"""Test index method returns correct TTkModelIndex"""
model = ttk.TTkTableModelSQLite3(fileName=self.temp_db_path, table='users')
# Get index for first row, first column
index = model.index(0, 0)
assert isinstance(index, ttk.TTkModelIndex)
assert index.row() == 0
assert index.col() == 0
assert index.data() == 'Alice'
def test_model_index_set_data(self):
"""Test setting data through model index"""
model = ttk.TTkTableModelSQLite3(fileName=self.temp_db_path, table='users')
# Get index and modify data through it
index = model.index(1, 0) # Bob's name
index.setData('Robert')
# Verify change
assert model.data(1, 0) == 'Robert'
assert index.data() == 'Robert'
def test_flags(self):
"""Test item flags"""
model = ttk.TTkTableModelSQLite3(fileName=self.temp_db_path, table='users')
flags = model.flags(0, 0)
# Should have enabled, editable, and selectable flags
assert flags & ttk.TTkK.ItemFlag.ItemIsEnabled
assert flags & ttk.TTkK.ItemFlag.ItemIsEditable
assert flags & ttk.TTkK.ItemFlag.ItemIsSelectable
def test_sort_ascending(self):
"""Test sorting in ascending order"""
model = ttk.TTkTableModelSQLite3(fileName=self.temp_db_path, table='users')
# Sort by name (column 0) ascending
model.sort(0, ttk.TTkK.SortOrder.AscendingOrder)
# Check order: Alice, Bob, Charlie, Diana
assert model.data(0, 0) == 'Alice'
assert model.data(1, 0) == 'Bob'
assert model.data(2, 0) == 'Charlie'
assert model.data(3, 0) == 'Diana'
def test_sort_descending(self):
"""Test sorting in descending order"""
model = ttk.TTkTableModelSQLite3(fileName=self.temp_db_path, table='users')
# Sort by name (column 0) descending
model.sort(0, ttk.TTkK.SortOrder.DescendingOrder)
# Check order: Diana, Charlie, Bob, Alice
assert model.data(0, 0) == 'Diana'
assert model.data(1, 0) == 'Charlie'
assert model.data(2, 0) == 'Bob'
assert model.data(3, 0) == 'Alice'
def test_sort_by_age_column(self):
"""Test sorting by age column"""
model = ttk.TTkTableModelSQLite3(fileName=self.temp_db_path, table='users')
# Sort by age (column 1) ascending
model.sort(1, ttk.TTkK.SortOrder.AscendingOrder)
# Check order by age: Alice(25), Diana(28), Bob(30), Charlie(35)
assert model.data(0, 0) == 'Alice' # Age 25
assert model.data(1, 0) == 'Diana' # Age 28
assert model.data(2, 0) == 'Bob' # Age 30
assert model.data(3, 0) == 'Charlie' # Age 35
def test_sort_reset(self):
"""Test resetting sort order"""
model = ttk.TTkTableModelSQLite3(fileName=self.temp_db_path, table='users')
# First sort by name descending
model.sort(0, ttk.TTkK.SortOrder.DescendingOrder)
assert model.data(0, 0) == 'Diana'
# Reset sort
model.sort(-1, ttk.TTkK.SortOrder.AscendingOrder)
# Should return to original order (by id)
assert model.data(0, 0) == 'Alice'
assert model.data(1, 0) == 'Bob'
assert model.data(2, 0) == 'Charlie'
assert model.data(3, 0) == 'Diana'
def test_header_data_horizontal(self):
"""Test horizontal header data"""
model = ttk.TTkTableModelSQLite3(fileName=self.temp_db_path, table='users')
assert model.headerData(0, ttk.TTkK.HORIZONTAL) == 'name'
assert model.headerData(1, ttk.TTkK.HORIZONTAL) == 'age'
assert model.headerData(2, ttk.TTkK.HORIZONTAL) == 'role'
def test_header_data_vertical(self):
"""Test vertical header data (should use parent implementation)"""
model = ttk.TTkTableModelSQLite3(fileName=self.temp_db_path, table='users')
# Should return default from parent class (likely row numbers)
header = model.headerData(0, ttk.TTkK.VERTICAL)
assert isinstance(header, (ttk.TTkString, str, int, type(None)))
def test_concurrent_access(self):
"""Test thread safety with mutex"""
model = ttk.TTkTableModelSQLite3(fileName=self.temp_db_path, table='users')
# This test ensures no exceptions are raised with concurrent access
# In a real scenario, you'd use threading to test this properly
for i in range(10):
data = model.data(i % 4, 0) # Access data multiple times
assert data in ['Alice', 'Bob', 'Charlie', 'Diana']
def test_setdata_with_sort_column_refresh(self):
"""Test that changing data in sorted column refreshes id map"""
model = ttk.TTkTableModelSQLite3(fileName=self.temp_db_path, table='users')
# Sort by name
model.sort(0, ttk.TTkK.SortOrder.AscendingOrder)
# Change Alice to Zoe (should move to end when sorted)
model.setData(0, 0, 'Zoe')
# The model should automatically refresh the sort order
# Note: This tests the _refreshIdMap functionality when sortColumn matches
def test_database_with_different_primary_key(self):
"""Test with a table that has a different primary key setup"""
# Create another test table
conn = sqlite3.connect(self.temp_db_path)
cur = conn.cursor()
cur.execute('''
CREATE TABLE products (
product_id INTEGER PRIMARY KEY,
name TEXT,
price REAL
)
''')
cur.executemany('INSERT INTO products (name, price) VALUES (?, ?)', [
('Laptop', 999.99),
('Mouse', 25.50)
])
conn.commit()
conn.close()
model = ttk.TTkTableModelSQLite3(fileName=self.temp_db_path, table='products')
assert model.rowCount() == 2
assert model.columnCount() == 2 # name, price
assert model.data(0, 0) == 'Laptop'
assert model.data(0, 1) == 999.99
def test_integration_with_table_widget():
"""Integration test with TTkTable widget"""
# Create temporary database
temp_db = tempfile.NamedTemporaryFile(suffix='.db', delete=False)
temp_db_path = temp_db.name
temp_db.close()
try:
# Set up database
conn = sqlite3.connect(temp_db_path)
cur = conn.cursor()
cur.execute('''
CREATE TABLE test_table (
id INTEGER PRIMARY KEY,
name TEXT,
value INTEGER
)
''')
cur.executemany('INSERT INTO test_table (name, value) VALUES (?, ?)', [
('Item1', 100),
('Item2', 200)
])
conn.commit()
conn.close()
# Test model with table widget
model = ttk.TTkTableModelSQLite3(fileName=temp_db_path, table='test_table')
table = ttk.TTkTable(tableModel=model)
# Basic integration tests
assert table.model() == model
assert model.rowCount() == 2
assert model.columnCount() == 2
finally:
# Cleanup
if os.path.exists(temp_db_path):
os.unlink(temp_db_path)
if __name__ == '__main__':
pytest.main([__file__])
Loading…
Cancel
Save