Browse Source

chore: prototyping the transition to asyncio

315-transition-to-asyncio
Eugenio Parodi 1 year ago
parent
commit
b87eeaf9fd
  1. 3
      TermTk/TTkCore/TTkTerm/input.py
  2. 205
      TermTk/TTkCore/TTkTerm/input_asyncio.py
  3. 8
      TermTk/TTkCore/drivers/__init__.py
  4. 49
      TermTk/TTkCore/drivers/asyncio.py
  5. 150
      TermTk/TTkCore/drivers/term_unix_asyncio.py
  6. 126
      TermTk/TTkCore/drivers/unix_asyncio.py
  7. 112
      TermTk/TTkCore/helper.py
  8. 149
      TermTk/TTkCore/helper_draw.py
  9. 3
      TermTk/TTkCore/propertyanimation.py
  10. 56
      TermTk/TTkCore/timer_asyncio.py
  11. 379
      TermTk/TTkCore/ttk.py
  12. 401
      TermTk/TTkCore/ttk_asyncio.py
  13. 401
      TermTk/TTkCore/ttk_thread.py
  14. 7
      TermTk/TTkWidgets/widget.py

3
TermTk/TTkCore/TTkTerm/input.py

@ -23,4 +23,5 @@
__all__ = ['TTkInput']
# from .input_mono import TTkInput
from .input_thread import TTkInput
# from .input_thread import TTkInput
from .input_asyncio import TTkInput

205
TermTk/TTkCore/TTkTerm/input_asyncio.py

@ -0,0 +1,205 @@
# MIT License
#
# Copyright (c) 2021 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__all__ = ['TTkInput']
import re
from time import time
from ..drivers import (TTkInputDriver,TTkAsyncio)
from TermTk.TTkCore.log import TTkLog
from TermTk.TTkCore.constant import TTkK
from TermTk.TTkCore.signal import pyTTkSignal
from .term import TTkTerm
from .inputkey import TTkKeyEvent
from .inputmouse import TTkMouseEvent
class TTkInput:
inputEvent = pyTTkSignal(TTkKeyEvent, TTkMouseEvent)
pasteEvent = pyTTkSignal(str)
_pasteBuffer = ""
_bracketedPaste = False
_readInput = None
_inputThread = None
_inputQueue = None
_leftLastTime = 0
_midLastTime = 0
_rightLastTime = 0
_leftTap = 0
_midTap = 0
_rightTap = 0
_mouse_re = re.compile(r"\033\[<(\d+);(\d+);(\d+)([mM])")
class Mouse(int):
ON = 0x01
DIRECT = 0x02
@staticmethod
def init(mouse:bool=False, directMouse:bool=False) -> None:
TTkInput._inputQueue = TTkAsyncio.Queue()
TTkTerm.setMouse(mouse, directMouse)
@staticmethod
def close() -> None:
TTkTerm.setMouse(False, False)
if TTkInput._readInput:
TTkInput._readInput.close()
@staticmethod
def stop() -> None:
pass
@staticmethod
def cont() -> None:
if TTkInput._readInput:
TTkInput._readInput.cont()
@staticmethod
async def start() -> None:
await TTkInputDriver.init()
while inq := await TTkInputDriver.queue().get():
kevt,mevt,paste = TTkInput.key_process(inq)
# Try to filter out the queued moved mouse events
while (not kevt and
not paste and
mevt and mevt.evt == TTkK.Drag and
not TTkInput._inputQueue.empty() ):
mevtOld = mevt
kevt, mevt, paste = TTkInput._inputQueue.get()
if (kevt or
paste or
mevt and mevt.evt != TTkK.Drag):
TTkInput.inputEvent.emit(kevt, mevtOld)
break
if kevt or mevt:
TTkInput.inputEvent.emit(kevt, mevt)
if paste:
TTkInput.pasteEvent.emit(paste)
TTkLog.debug("Close TTkInput")
@staticmethod
def _handleBracketedPaste(stdinRead:str):
if stdinRead.endswith("\033[201~"):
TTkInput._pasteBuffer += stdinRead[:-6]
TTkInput._bracketedPaste = False
# due to the CRNL methos (don't ask me why) the terminal
# is substituting all the \n with \r
_paste = TTkInput._pasteBuffer.replace('\r','\n')
TTkInput._pasteBuffer = ""
return None, None, _paste
else:
TTkInput._pasteBuffer += stdinRead
return None, None, None
@staticmethod
def key_process(stdinRead:str) -> None:
if TTkInput._bracketedPaste:
return TTkInput._handleBracketedPaste(stdinRead)
mevt,kevt = None,None
if not stdinRead.startswith("\033[<"):
# Key Event
kevt = TTkKeyEvent.parse(stdinRead)
else:
# Mouse Event
m = TTkInput._mouse_re.match(stdinRead)
if not m:
# TODO: Return Error
hex = [f"0x{ord(x):02x}" for x in stdinRead]
TTkLog.error("UNHANDLED (mouse): "+stdinRead.replace("\033","<ESC>") + " - "+",".join(hex))
return None, None, None
code = int(m.group(1))
x = int(m.group(2))-1
y = int(m.group(3))-1
state = m.group(4)
key = TTkMouseEvent.NoButton
evt = TTkMouseEvent.Move
tap = 0
def _checkTap(lastTime, tap):
if state=="M":
t = time()
if (t-lastTime) < 0.4:
return t, tap+1
else:
return t, 1
return lastTime, tap
mod = TTkK.NoModifier
if code & 0x10:
code &= ~0x10
mod |= TTkK.ControlModifier
if code & 0x08:
code &= ~0x08
mod |= TTkK.AltModifier
if code == 0x00:
TTkInput._leftLastTime, TTkInput._leftTap = _checkTap(TTkInput._leftLastTime, TTkInput._leftTap)
tap = TTkInput._leftTap
key = TTkMouseEvent.LeftButton
evt = TTkMouseEvent.Press if state=="M" else TTkMouseEvent.Release
elif code == 0x01:
TTkInput._midLastTime, TTkInput._midTap = _checkTap(TTkInput._midLastTime, TTkInput._midTap)
tap = TTkInput._midTap
key = TTkMouseEvent.MidButton
evt = TTkMouseEvent.Press if state=="M" else TTkMouseEvent.Release
elif code == 0x02:
TTkInput._rightLastTime, TTkInput._rightTap = _checkTap(TTkInput._rightLastTime, TTkInput._rightTap)
tap = TTkInput._rightTap
key = TTkMouseEvent.RightButton
evt = TTkMouseEvent.Press if state=="M" else TTkMouseEvent.Release
elif code == 0x20:
key = TTkMouseEvent.LeftButton
evt = TTkMouseEvent.Drag
elif code == 0x21:
key = TTkMouseEvent.MidButton
evt = TTkMouseEvent.Drag
elif code == 0x22:
key = TTkMouseEvent.RightButton
evt = TTkMouseEvent.Drag
elif code == 0x40:
key = TTkMouseEvent.Wheel
evt = TTkMouseEvent.Up
elif code == 0x41:
key = TTkMouseEvent.Wheel
evt = TTkMouseEvent.Down
elif code == 0x23:
evt = TTkMouseEvent.Move
elif code == 0x27:
mod |= TTkK.ShiftModifier
evt = TTkMouseEvent.Move
mevt = TTkMouseEvent(x, y, key, evt, mod, tap, m.group(0).replace("\033", "<ESC>"))
if kevt or mevt:
return kevt, mevt, None
if stdinRead.startswith("\033[200~"):
TTkInput._bracketedPaste = True
return TTkInput._handleBracketedPaste(stdinRead[6:])
hex = [f"0x{ord(x):02x}" for x in stdinRead]
TTkLog.error("UNHANDLED: "+stdinRead.replace("\033","<ESC>") + " - "+",".join(hex))

8
TermTk/TTkCore/drivers/__init__.py

@ -1,6 +1,8 @@
import importlib.util
import platform
from .asyncio import *
if importlib.util.find_spec('pyodideProxy'):
from .pyodide import *
from .term_pyodide import *
@ -19,8 +21,10 @@ elif platform.system() == 'Linux':
from .term_unix import *
elif platform.system() == 'Darwin':
from .unix import *
from .term_unix import *
# from .unix import *
from .unix_asyncio import *
# from .term_unix import *
from .term_unix_asyncio import *
elif platform.system() == 'Windows':
from .windows import *

49
TermTk/TTkCore/drivers/asyncio.py

@ -0,0 +1,49 @@
# MIT License
#
# Copyright (c) 2023 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE
__all__ = ['TTkAsyncio']
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
class TTkAsyncio():
loop = asyncio.get_event_loop()
Queue = asyncio.Queue
Event = asyncio.Event
Lock = asyncio.Lock
sleep = asyncio.sleep
@staticmethod
def run(coro):
asyncio.set_event_loop(TTkAsyncio.loop)
# asyncio.run(coro)
TTkAsyncio.loop.run_until_complete(coro)
TTkAsyncio.loop.close()
@staticmethod
def create_task(*args, **kwargs):
if TTkAsyncio.loop.is_running():
# asyncio.set_event_loop(TTkAsyncio.loop)
TTkAsyncio.loop.create_task(*args, **kwargs)

150
TermTk/TTkCore/drivers/term_unix_asyncio.py

@ -0,0 +1,150 @@
# MIT License
#
# Copyright (c) 2022 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__all__ = ['TTkTerm']
import sys, os, signal
try: import termios
except Exception as e:
print(f'ERROR: {e}')
exit(1)
from . import TTkAsyncio
from ..TTkTerm.term_base import TTkTermBase
from TermTk.TTkCore.log import TTkLog
class TTkTerm(TTkTermBase):
_sigWinChCb = None
# Save treminal attributes during the initialization in order to
# restore later the original states
_termAttr = termios.tcgetattr(sys.stdin)
_termAttrBk = []
@staticmethod
def saveTermAttr():
TTkTerm._termAttrBk.append(termios.tcgetattr(sys.stdin))
@staticmethod
def restoreTermAttr():
if TTkTerm._termAttrBk:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, TTkTerm._termAttrBk.pop())
else:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, TTkTerm._termAttr)
@staticmethod
def _setSigmask(mask, value=True):
attr = termios.tcgetattr(sys.stdin)
if mask & TTkTerm.Sigmask.CTRL_C:
attr[6][termios.VINTR]= b'\x03' if value else 0
if mask & TTkTerm.Sigmask.CTRL_S:
attr[6][termios.VSTOP]= b'\x13' if value else 0
if mask & TTkTerm.Sigmask.CTRL_Z:
attr[6][termios.VSUSP]= b'\x1a' if value else 0
if mask & TTkTerm.Sigmask.CTRL_Q:
attr[6][termios.VSTART]= b'\x11' if value else 0
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, attr)
TTkTermBase.setSigmask = _setSigmask
@staticmethod
def getSigmask():
mask = 0x00
attr = termios.tcgetattr(sys.stdin)
mask |= TTkTerm.Sigmask.CTRL_C if attr[6][termios.VINTR] else 0
mask |= TTkTerm.Sigmask.CTRL_S if attr[6][termios.VSTOP] else 0
mask |= TTkTerm.Sigmask.CTRL_Z if attr[6][termios.VSUSP] else 0
mask |= TTkTerm.Sigmask.CTRL_Q if attr[6][termios.VSTART] else 0
return mask
@staticmethod
def exit():
TTkTermBase.exit()
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, TTkTerm._termAttr)
@staticmethod
def _push(*args):
try:
sys.stdout.write(str(*args))
sys.stdout.flush()
except BlockingIOError as e:
TTkLog.fatal(f"{e=} {e.characters_written=}")
except Exception as e:
TTkLog.fatal(e)
TTkTermBase.push = _push
@staticmethod
def _flush():
sys.stdout.flush()
TTkTermBase.flush = _flush
@staticmethod
def _setEcho(val: bool):
# Set/Unset Terminal Input Echo
(i,o,c,l,isp,osp,cc) = termios.tcgetattr(sys.stdin.fileno())
if val: l |= termios.ECHO
else: l &= ~termios.ECHO
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, [i,o,c,l,isp,osp,cc])
TTkTermBase.setEcho = _setEcho
@staticmethod
def _CRNL(val: bool):
#Translate carriage return to newline on input (unless IGNCR is set).
# '\n' CTRL-J
# '\r' CTRL-M (Enter)
(i,o,c,l,isp,osp,cc) = termios.tcgetattr(sys.stdin.fileno())
if val: i |= termios.ICRNL
else: i &= ~termios.ICRNL
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, [i,o,c,l,isp,osp,cc])
TTkTermBase.CRNL = _CRNL
@staticmethod
def _getTerminalSize():
try:
return os.get_terminal_size()
except OSError as e:
print(f'ERROR: {e}')
TTkTermBase.getTerminalSize = _getTerminalSize
_sigWinChMutex = TTkAsyncio.Lock()
@staticmethod
async def _sigWinChThreaded():
if not TTkTerm._sigWinChMutex.acquire(blocking=False): return
while (TTkTerm.width, TTkTerm.height) != (wh:=TTkTerm.getTerminalSize()):
TTkTerm.width, TTkTerm.height = wh
if TTkTerm._sigWinChCb is not None:
TTkTerm._sigWinChCb(TTkTerm.width, TTkTerm.height)
TTkTerm._sigWinChMutex.release()
@staticmethod
def _sigWinCh(signum, frame):
pass
# Thread(target=TTkTerm._sigWinChThreaded).start()
@staticmethod
def _registerResizeCb(callback):
TTkTerm._sigWinChCb = callback
# Dummy call to retrieve the terminal size
TTkTerm._sigWinCh(signal.SIGWINCH, None)
signal.signal(signal.SIGWINCH, TTkTerm._sigWinCh)
TTkTermBase.registerResizeCb = _registerResizeCb

126
TermTk/TTkCore/drivers/unix_asyncio.py

@ -0,0 +1,126 @@
# MIT License
#
# Copyright (c) 2022 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__all__ = ['TTkSignalDriver','TTkInputDriver']
import sys, os, re
import signal
from select import select
try: import fcntl, termios, tty
except Exception as e:
print(f'ERROR: {e}')
exit(1)
from . import TTkAsyncio
from TermTk.TTkCore.signal import pyTTkSignal, pyTTkSlot
class TTkInputDriver():
_inputQueue = TTkAsyncio.Queue()
_readPipe = os.pipe()
_attr = termios.tcgetattr(sys.stdin)
@staticmethod
async def init():
tty.setcbreak(sys.stdin)
TTkAsyncio.loop.add_reader(sys.stdin.fileno(), TTkInputDriver._readcb)
# TTkAsyncio.create_task(TTkInputDriver._newRead())
@staticmethod
def close():
TTkAsyncio.loop.remove_reader(sys.stdin.fileno())
termios.tcsetattr(sys.stdin, termios.TCSANOW, TTkInputDriver._attr)
os.write(TTkInputDriver._readPipe[1], b'quit')
@staticmethod
def cont():
tty.setcbreak(sys.stdin)
@staticmethod
def queue():
return TTkInputDriver._inputQueue
_rm = re.compile('(\033?[^\033]+)')
def _readcb():
_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, _fl | os.O_NONBLOCK) # Set the input as NONBLOCK to read the full sequence
stdinRead = sys.stdin.read()
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, _fl)
# Split all the ansi sequences
# or yield any separate input char
if stdinRead == '\033':
TTkAsyncio.create_task(TTkInputDriver._inputQueue.put('\033'))
return
for sr in TTkInputDriver._rm.findall(stdinRead):
if '\033' == sr[0]:
TTkAsyncio.create_task(TTkInputDriver._inputQueue.put(sr))
else:
for ch in sr:
TTkAsyncio.create_task(TTkInputDriver._inputQueue.put(ch))
async def _newRead():
import asyncio
"""Reads from stdin asynchronously line by line."""
loop = TTkAsyncio.loop
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, asyncio.sys.stdin)
while True:
_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
fcntl.fcntl(asyncio.sys.stdin, fcntl.F_SETFL, _fl | os.O_NONBLOCK) # Set the input as NONBLOCK to read the full sequence
stdinRead = await reader.read()
fcntl.fcntl(asyncio.sys.stdin, fcntl.F_SETFL, _fl)
# Split all the ansi sequences
# or yield any separate input char
if stdinRead == '\033':
await TTkInputDriver._inputQueue.put('\033')
return
for sr in TTkInputDriver._rm.findall(stdinRead):
if '\033' == sr[0]:
await TTkInputDriver._inputQueue.put(sr)
else:
for ch in sr:
await TTkInputDriver._inputQueue.put(ch)
class TTkSignalDriver():
sigStop = pyTTkSignal()
sigCont = pyTTkSignal()
sigInt = pyTTkSignal()
@staticmethod
def init():
# Register events
signal.signal(signal.SIGTSTP, TTkSignalDriver._SIGSTOP) # Ctrl-Z
signal.signal(signal.SIGCONT, TTkSignalDriver._SIGCONT) # Resume
signal.signal(signal.SIGINT, TTkSignalDriver._SIGINT) # Ctrl-C
def exit():
signal.signal(signal.SIGINT, signal.SIG_DFL)
def _SIGSTOP(signum, frame): TTkSignalDriver.sigStop.emit()
def _SIGCONT(signum, frame): TTkSignalDriver.sigCont.emit()
def _SIGINT( signum, frame): TTkSignalDriver.sigInt.emit()

112
TermTk/TTkCore/helper.py

@ -25,9 +25,8 @@ __all__ = ['TTkHelper']
from typing import TYPE_CHECKING
from dataclasses import dataclass
from TermTk.TTkCore.TTkTerm.colors import TTkTermColor
from TermTk.TTkCore.TTkTerm.term import TTkTerm
from TermTk.TTkCore.cfg import TTkCfg, TTkGlbl
from TermTk.TTkCore.cfg import TTkGlbl
from TermTk.TTkCore.constant import TTkK
from TermTk.TTkCore.signal import pyTTkSignal, pyTTkSlot
@ -48,8 +47,6 @@ class TTkHelper:
_focusWidget = None
_rootCanvas = None
_rootWidget = None
_updateWidget = set()
_updateBuffer = set()
_mousePos = (0,0)
_cursorPos = (0,0)
_cursor = False
@ -64,30 +61,6 @@ class TTkHelper:
widget.move(x,y)
_overlay = []
@staticmethod
def updateAll():
if TTkHelper._rootWidget:
TTkHelper._rootWidget.update(repaint=True, updateLayout=True)
for w in TTkHelper._rootWidget.layout().iterWidgets():
w.update(repaint=True, updateLayout=True)
@staticmethod
def unlockPaint():
if rw := TTkHelper._rootWidget:
rw._paintEvent.set()
@staticmethod
def addUpdateWidget(widget):
# if not widget.isVisibleAndParent(): return
if widget not in TTkHelper._updateWidget:
TTkHelper._updateWidget.add(widget)
TTkHelper.unlockPaint()
@staticmethod
def addUpdateBuffer(canvas):
if canvas is not TTkHelper._rootCanvas:
TTkHelper._updateBuffer.add(canvas)
@staticmethod
def registerRootWidget(widget):
TTkHelper._rootCanvas = widget.getCanvas()
@ -285,73 +258,6 @@ class TTkHelper:
def mousePos():
return TTkHelper._mousePos
@staticmethod
def paintAll():
'''
_updateBuffer = list widgets that require a repaint [paintEvent]
_updateWidget = list widgets that need to be pushed below
'''
if TTkHelper._rootCanvas is None:
return
# Build a list of buffers to be repainted
updateWidgetsBk = TTkHelper._updateWidget.copy()
updateBuffers = TTkHelper._updateBuffer.copy()
TTkHelper._updateWidget.clear()
TTkHelper._updateBuffer.clear()
updateWidgets = set()
# TTkLog.debug(f"{len(TTkHelper._updateBuffer)} {len(TTkHelper._updateWidget)}")
for widget in updateWidgetsBk:
if not widget.isVisibleAndParent(): continue
updateBuffers.add(widget)
updateWidgets.add(widget)
parent = widget.parentWidget()
while parent is not None:
updateBuffers.add(parent)
updateWidgets.add(parent)
parent = parent.parentWidget()
# Paint all the canvas
for widget in updateBuffers:
if not widget.isVisibleAndParent(): continue
# Resize the canvas just before the paintEvent
# to avoid too many allocations
canvas = widget.getCanvas()
canvas.updateSize()
canvas.clean()
widget.paintEvent(canvas)
# Compose all the canvas to the parents
# From the deepest children to the bottom
pushToTerminal = False
sortedUpdateWidget = sorted(updateWidgets, key=lambda w: -TTkHelper.widgetDepth(w))
for widget in sortedUpdateWidget:
if not widget.isVisibleAndParent(): continue
pushToTerminal = True
widget.paintChildCanvas()
if pushToTerminal:
if TTkHelper._cursor:
TTkTerm.Cursor.hide()
if TTkCfg.doubleBuffer:
TTkHelper._rootCanvas.pushToTerminalBuffered(0, 0, TTkGlbl.term_w, TTkGlbl.term_h)
elif TTkCfg.doubleBufferNew:
TTkHelper._rootCanvas.pushToTerminalBufferedNew(0, 0, TTkGlbl.term_w, TTkGlbl.term_h)
else:
TTkHelper._rootCanvas.pushToTerminal(0, 0, TTkGlbl.term_w, TTkGlbl.term_h)
if TTkHelper._cursor:
x,y = TTkHelper._cursorPos
TTkTerm.push(TTkTerm.Cursor.moveTo(y+1,x+1))
TTkTerm.Cursor.show(TTkHelper._cursorType)
@staticmethod
def rePaintAll():
if TTkHelper._rootCanvas and TTkHelper._rootWidget:
TTkTerm.push(TTkTerm.CLEAR)
TTkHelper._rootCanvas.cleanBuffers()
TTkHelper._rootWidget.update()
@staticmethod
def widgetDepth(widget) -> int:
if widget is None:
@ -460,22 +366,6 @@ class TTkHelper:
def clearFocus():
TTkHelper._focusWidget = None
@staticmethod
def showCursor(cursorType = TTkK.Cursor_Blinking_Block):
newType = {
TTkK.Cursor_Blinking_Block : TTkTerm.Cursor.BLINKING_BLOCK,
TTkK.Cursor_Blinking_Block_Also : TTkTerm.Cursor.BLINKING_BLOCK_ALSO,
TTkK.Cursor_Steady_Block : TTkTerm.Cursor.STEADY_BLOCK,
TTkK.Cursor_Blinking_Underline : TTkTerm.Cursor.BLINKING_UNDERLINE,
TTkK.Cursor_Steady_Underline : TTkTerm.Cursor.STEADY_UNDERLINE,
TTkK.Cursor_Blinking_Bar : TTkTerm.Cursor.BLINKING_BAR,
TTkK.Cursor_Steady_Bar : TTkTerm.Cursor.STEADY_BAR,
}.get(cursorType, TTkTerm.Cursor.BLINKING_BAR)
if not TTkHelper._cursor or TTkHelper._cursorType != newType:
TTkHelper._cursorType = newType
TTkTerm.Cursor.show(TTkHelper._cursorType)
TTkHelper._cursor = True
@staticmethod
def hideCursor():
TTkTerm.Cursor.hide()

149
TermTk/TTkCore/helper_draw.py

@ -0,0 +1,149 @@
# MIT License
#
# Copyright (c) 2025 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__all__ = ['TTkHelperDraw']
from .helper import TTkHelper
from TermTk.TTkCore.drivers import TTkAsyncio
from TermTk.TTkCore.TTkTerm.term import TTkTerm
from TermTk.TTkCore.cfg import TTkCfg, TTkGlbl
from TermTk.TTkCore.constant import TTkK
class TTkHelperDraw:
'''TTkHelperDraw
This is a collection of helper utilities to be used all around TermTk
'''
_updateWidget = set()
_updateBuffer = set()
@staticmethod
def updateAll():
if TTkHelper._rootWidget:
TTkHelper._rootWidget.update(repaint=True, updateLayout=True)
for w in TTkHelper._rootWidget.layout().iterWidgets():
w.update(repaint=True, updateLayout=True)
@staticmethod
def unlockPaint():
if rw := TTkHelper._rootWidget:
async def _set():
rw._paintEvent.set()
# TTkAsyncio.create_task(rw._paintEvent.set )
# rw._paintEvent.set()
TTkAsyncio.create_task(_set())
@staticmethod
def addUpdateWidget(widget):
# if not widget.isVisibleAndParent(): return
if widget not in TTkHelperDraw._updateWidget:
TTkHelperDraw._updateWidget.add(widget)
TTkHelperDraw.unlockPaint()
@staticmethod
def addUpdateBuffer(canvas):
if canvas is not TTkHelper._rootCanvas:
TTkHelperDraw._updateBuffer.add(canvas)
@staticmethod
def paintAll():
'''
_updateBuffer = list widgets that require a repaint [paintEvent]
_updateWidget = list widgets that need to be pushed below
'''
if TTkHelper._rootCanvas is None:
return
# Build a list of buffers to be repainted
updateWidgetsBk = TTkHelperDraw._updateWidget.copy()
updateBuffers = TTkHelperDraw._updateBuffer.copy()
TTkHelperDraw._updateWidget.clear()
TTkHelperDraw._updateBuffer.clear()
updateWidgets = set()
# TTkLog.debug(f"{len(TTkHelperDraw._updateBuffer)} {len(TTkHelperDraw._updateWidget)}")
for widget in updateWidgetsBk:
if not widget.isVisibleAndParent(): continue
updateBuffers.add(widget)
updateWidgets.add(widget)
parent = widget.parentWidget()
while parent is not None:
updateBuffers.add(parent)
updateWidgets.add(parent)
parent = parent.parentWidget()
# Paint all the canvas
for widget in updateBuffers:
if not widget.isVisibleAndParent(): continue
# Resize the canvas just before the paintEvent
# to avoid too many allocations
canvas = widget.getCanvas()
canvas.updateSize()
canvas.clean()
widget.paintEvent(canvas)
# Compose all the canvas to the parents
# From the deepest children to the bottom
pushToTerminal = False
sortedUpdateWidget = sorted(updateWidgets, key=lambda w: -TTkHelper.widgetDepth(w))
for widget in sortedUpdateWidget:
if not widget.isVisibleAndParent(): continue
pushToTerminal = True
widget.paintChildCanvas()
if pushToTerminal:
if TTkHelper._cursor:
TTkTerm.Cursor.hide()
if TTkCfg.doubleBuffer:
TTkHelper._rootCanvas.pushToTerminalBuffered(0, 0, TTkGlbl.term_w, TTkGlbl.term_h)
elif TTkCfg.doubleBufferNew:
TTkHelper._rootCanvas.pushToTerminalBufferedNew(0, 0, TTkGlbl.term_w, TTkGlbl.term_h)
else:
TTkHelper._rootCanvas.pushToTerminal(0, 0, TTkGlbl.term_w, TTkGlbl.term_h)
if TTkHelper._cursor:
x,y = TTkHelper._cursorPos
TTkTerm.push(TTkTerm.Cursor.moveTo(y+1,x+1))
TTkTerm.Cursor.show(TTkHelper._cursorType)
@staticmethod
def rePaintAll():
if TTkHelper._rootCanvas and TTkHelper._rootWidget:
TTkTerm.push(TTkTerm.CLEAR)
TTkHelper._rootCanvas.cleanBuffers()
TTkHelper._rootWidget.update()
@staticmethod
def showCursor(cursorType = TTkK.Cursor_Blinking_Block):
newType = {
TTkK.Cursor_Blinking_Block : TTkTerm.Cursor.BLINKING_BLOCK,
TTkK.Cursor_Blinking_Block_Also : TTkTerm.Cursor.BLINKING_BLOCK_ALSO,
TTkK.Cursor_Steady_Block : TTkTerm.Cursor.STEADY_BLOCK,
TTkK.Cursor_Blinking_Underline : TTkTerm.Cursor.BLINKING_UNDERLINE,
TTkK.Cursor_Steady_Underline : TTkTerm.Cursor.STEADY_UNDERLINE,
TTkK.Cursor_Blinking_Bar : TTkTerm.Cursor.BLINKING_BAR,
TTkK.Cursor_Steady_Bar : TTkTerm.Cursor.STEADY_BAR,
}.get(cursorType, TTkTerm.Cursor.BLINKING_BAR)
if not TTkHelper._cursor or TTkHelper._cursorType != newType:
TTkHelper._cursorType = newType
TTkTerm.Cursor.show(TTkHelper._cursorType)
TTkHelper._cursor = True

3
TermTk/TTkCore/propertyanimation.py

@ -29,6 +29,7 @@ from types import LambdaType
from TermTk.TTkCore.log import TTkLog
from TermTk.TTkCore.signal import pyTTkSignal, pyTTkSlot
from TermTk.TTkCore.helper import TTkHelper
from TermTk.TTkCore.helper_draw import TTkHelperDraw
class TTkEasingCurve():
Linear = 0
@ -367,7 +368,7 @@ class TTkPropertyAnimation():
else:
newVal = self._easingCurve.process(self._startValue,self._endValue,v)
self._cb(*self._cast(newVal))
TTkHelper.unlockPaint()
TTkHelperDraw.unlockPaint()
@pyTTkSlot()
def start(self):

56
TermTk/TTkCore/timer_asyncio.py

@ -0,0 +1,56 @@
# MIT License
#
# Copyright (c) 2021 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__all__ = ['TTkTimer']
from TermTk.TTkCore.drivers import TTkAsyncio
from TermTk.TTkCore.signal import pyTTkSlot, pyTTkSignal
from TermTk.TTkCore.helper import TTkHelper
class TTkTimer():
__slots__ = (
'timeout', '_timerHandle')
def __init__(self):
self.timeout = pyTTkSignal()
self._timerHandle = None
super().__init__()
TTkHelper.quitEvent.connect(self.quit)
def quit(self):
TTkHelper.quitEvent.disconnect(self.quit)
if self._timerHandle:
self._timerHandle.cancel()
self.timeout.clear()
# def run(self):
# self.timeout.emit()
@pyTTkSlot(float)
def start(self, sec=0.0):
self._timerHandle = TTkAsyncio.loop.call_later(sec, self.timeout.emit)
@pyTTkSlot()
def stop(self):
# delay = self._timerHandle.when() - TTkAsyncio.loop.time()
if self._timerHandle:
self._timerHandle.cancel()

379
TermTk/TTkCore/ttk.py

@ -22,380 +22,5 @@
__all__ = ['TTk']
import os
import signal
import time
import queue
import threading
import platform
from TermTk.TTkCore.drivers import TTkSignalDriver
from TermTk.TTkCore.TTkTerm.input import TTkInput
from TermTk.TTkCore.TTkTerm.inputkey import TTkKeyEvent
from TermTk.TTkCore.TTkTerm.inputmouse import TTkMouseEvent
from TermTk.TTkCore.TTkTerm.term import TTkTerm
from TermTk.TTkCore.signal import pyTTkSignal, pyTTkSlot
from TermTk.TTkCore.constant import TTkK
from TermTk.TTkCore.log import TTkLog
from TermTk.TTkCore.cfg import TTkCfg, TTkGlbl
from TermTk.TTkCore.helper import TTkHelper
from TermTk.TTkCore.timer import TTkTimer
from TermTk.TTkCore.color import TTkColor
from TermTk.TTkCore.shortcut import TTkShortcut
from TermTk.TTkWidgets.about import TTkAbout
from TermTk.TTkWidgets.widget import TTkWidget
from TermTk.TTkWidgets.container import TTkContainer
class _MouseCursor():
__slots__ = ('_cursor','_color', '_pos', 'updated')
def __init__(self):
self.updated = pyTTkSignal()
self._pos = (0,0)
self._cursor = ''
self._color = TTkColor.RST
TTkInput.inputEvent.connect(self._mouseInput)
@pyTTkSlot(TTkKeyEvent, TTkMouseEvent)
def _mouseInput(self, _, mevt):
if mevt is not None:
self._cursor = ''
self._color = TTkColor.RST
if mevt.key == TTkK.Wheel:
if mevt.evt == TTkK.WHEEL_Up:
self._cursor = ''
else:
self._cursor = ''
elif mevt.evt == TTkK.Press:
self._color = TTkColor.bg('#FFFF00') + TTkColor.fg('#000000')
elif mevt.evt == TTkK.Drag:
self._color = TTkColor.bg('#666600') + TTkColor.fg('#FFFF00')
# elif mevt.evt == TTkK.Release:
# self._color = TTkColor.bg('#006600') + TTkColor.fg('#00FFFF')
self._pos = (mevt.x, mevt.y)
self.updated.emit()
class TTk(TTkContainer):
__slots__ = (
'_termMouse', '_termDirectMouse',
'_title',
'_showMouseCursor', '_mouseCursor',
'_sigmask', '_timer',
'_drawMutex',
'_paintEvent',
'_lastMultiTap',
'paintExecuted')
def __init__(self, *,
title:str='TermTk',
sigmask:TTkTerm.Sigmask=TTkK.NONE,
mouseTrack:bool=False,
mouseCursor:bool=False,
**kwargs) -> None:
# If the "TERMTK_FILE_LOG" env variable is defined
# logs are saved in the file identified by this variable
# i.e.
# TERMTK_FILE_LOG=session.log python3 demo/demo.py
if ('TERMTK_FILE_LOG' in os.environ and (_logFile := os.environ['TERMTK_FILE_LOG'])):
TTkLog.use_default_file_logging(_logFile)
self._timer = None
self._title = title
self._sigmask = sigmask
self.paintExecuted = pyTTkSignal()
self._termMouse = True
self._termDirectMouse = mouseTrack
self._mouseCursor = None
self._showMouseCursor = os.environ.get("TERMTK_MOUSE",mouseCursor)
super().__init__(**kwargs)
TTkInput.inputEvent.connect(self._processInput)
TTkInput.pasteEvent.connect(self._processPaste)
TTkSignalDriver.sigStop.connect(self._SIGSTOP)
TTkSignalDriver.sigCont.connect(self._SIGCONT)
TTkSignalDriver.sigInt.connect( self._SIGINT)
self._drawMutex = threading.Lock()
self._paintEvent = threading.Event()
self._paintEvent.set()
self.setFocusPolicy(TTkK.ClickFocus)
self.hide()
w,h = TTkTerm.getTerminalSize()
self.setGeometry(0,0,w,h)
if 'TERMTK_NEWRENDERER' in os.environ:
TTkCfg.doubleBuffer = False
TTkCfg.doubleBufferNew = True
if os.environ.get("TERMTK_GPM",False):
self._showMouseCursor = True
TTkHelper.registerRootWidget(self)
frame = 0
time = time.time()
def _fps(self):
curtime = time.time()
self.frame+=1
delta = curtime - self.time
if delta > 5:
TTkLog.debug(f"fps: {int(self.frame/delta)}")
self.frame = 0
self.time = curtime
def mainloop(self):
try:
'''Enters the main event loop and waits until :meth:`~quit` is called or the main widget is destroyed.'''
TTkLog.debug( "" )
TTkLog.debug( " ████████╗ ████████╗ " )
TTkLog.debug( " ╚══██╔══╝ ╚══██╔══╝ " )
TTkLog.debug( " ██║ ▄▄ ▄ ▄▄ ▄▄▖▄▖ ██║ █ ▗▖ " )
TTkLog.debug( " ▞▀▚ ▖▗ ██║ █▄▄█ █▀▘ █ █ █ ██║ █▟▘ " )
TTkLog.debug( " ▙▄▞▐▄▟ ██║ ▀▄▄▖ █ █ ▝ █ ██║ █ ▀▄ " )
TTkLog.debug( " ▌ ▐ ╚═╝ ╚═╝ " )
TTkLog.debug( " ▚▄▄▘ " )
TTkLog.debug( "" )
TTkLog.debug(f" Version: {TTkCfg.version}" )
TTkLog.debug( "" )
TTkLog.debug( "Starting Main Loop..." )
TTkLog.debug(f"screen = ({TTkTerm.getTerminalSize()})")
# Register events
TTkSignalDriver.init()
TTkLog.debug("Signal Event Registered")
TTkTerm.registerResizeCb(self._win_resize_cb)
self._timer = TTkTimer()
self._timer.timeout.connect(self._time_event)
self._timer.start(0.1)
self.show()
# Keep track of the multiTap to avoid the extra key release
self._lastMultiTap = False
TTkInput.init(
mouse=self._termMouse,
directMouse=self._termDirectMouse)
TTkTerm.init(
title=self._title,
sigmask=self._sigmask)
if self._showMouseCursor:
self._mouseCursor = _MouseCursor()
self._mouseCursor.updated.connect(self.update)
self.paintChildCanvas = self._mouseCursorPaintChildCanvas
self._mainLoop()
finally:
if platform.system() != 'Emscripten':
TTkHelper.quitEvent.emit()
if self._timer:
self._timer.timeout.disconnect(self._time_event)
self._paintEvent.set()
self._timer.join()
TTkSignalDriver.exit()
self.quit()
TTkTerm.exit()
def _mouseCursorPaintChildCanvas(self) -> None:
super().paintChildCanvas()
ch = self._mouseCursor._cursor
pos = self._mouseCursor._pos
color = self._mouseCursor._color
self.getCanvas().drawChar(char=ch, pos=pos, color=color)
def _mainLoop(self):
if platform.system() == 'Emscripten':
return
TTkInput.start()
@pyTTkSlot(str)
def _processPaste(self, txt:str):
if focusWidget := TTkHelper.getFocus():
while focusWidget and not focusWidget.pasteEvent(txt):
focusWidget = focusWidget.parentWidget()
@pyTTkSlot(TTkKeyEvent, TTkMouseEvent)
def _processInput(self, kevt, mevt):
self._drawMutex.acquire()
if kevt is not None:
self._key_event(kevt)
if mevt is not None:
self._mouse_event(mevt)
self._drawMutex.release()
def _mouse_event(self, mevt):
# Upload the global mouse position
# Mainly used by the drag pixmap display
TTkHelper.setMousePos((mevt.x,mevt.y))
TTkWidget._mouseOverProcessed = False
# Avoid to broadcast a key release after a multitap event
if mevt.evt == TTkK.Release and self._lastMultiTap: return
self._lastMultiTap = mevt.tap > 1
if ( TTkHelper.isDnD() and
mevt.evt != TTkK.Drag and
mevt.evt != TTkK.Release ):
# Clean Drag Drop status for any event that is not
# Mouse Drag, Key Release
TTkHelper.dndEnd()
# Mouse Events forwarded straight to the Focus widget:
# - Drag
# - Release
focusWidget = TTkHelper.getFocus()
if ( focusWidget is not None and
( mevt.evt == TTkK.Drag or
mevt.evt == TTkK.Release ) and
not TTkHelper.isDnD() ) :
x,y = TTkHelper.absPos(focusWidget)
nmevt = mevt.clone(pos=(mevt.x-x, mevt.y-y))
focusWidget.mouseEvent(nmevt)
else:
# Sometimes the release event is not retrieved
if ( focusWidget and
focusWidget._pendingMouseRelease and
not TTkHelper.isDnD() ):
focusWidget.mouseEvent(mevt.clone(evt=TTkK.Release))
focusWidget._pendingMouseRelease = False
# Adding this Crappy logic to handle a corner case in the drop routine
# where the mouse is leaving any widget able to handle the drop event
if not self.mouseEvent(mevt):
if dndw := TTkHelper.dndWidget():
dndw.dragLeaveEvent(TTkHelper.dndGetDrag().getDragLeaveEvent(mevt))
TTkHelper.dndEnter(None)
if mevt.evt == TTkK.Press and focusWidget:
focusWidget.clearFocus()
TTkHelper.focusLastModal()
# Clean the Drag and Drop in case of mouse release
if mevt.evt == TTkK.Release:
TTkHelper.dndEnd()
def _key_event(self, kevt):
keyHandled = False
# TTkLog.debug(f"Key: {kevt}")
focusWidget = TTkHelper.getFocus()
# TTkLog.debug(f"{focusWidget}")
if focusWidget is not None:
keyHandled = focusWidget.keyEvent(kevt)
if not keyHandled:
TTkShortcut.processKey(kevt, focusWidget)
# Handle Next Focus Key Binding
if not keyHandled and \
((kevt.key == TTkK.Key_Tab and kevt.mod == TTkK.NoModifier) or
( kevt.key == TTkK.Key_Right or kevt.key == TTkK.Key_Down)):
TTkHelper.nextFocus(focusWidget if focusWidget else self)
# Handle Prev Focus Key Binding
if not keyHandled and \
((kevt.key == TTkK.Key_Tab and kevt.mod == TTkK.ShiftModifier) or
( kevt.key == TTkK.Key_Left or kevt.key == TTkK.Key_Up)):
TTkHelper.prevFocus(focusWidget if focusWidget else self)
def _time_event(self):
# Event.{wait and clear} should be atomic,
# BUTt: ( y )
# if an update event (set) happen in between the wait and clear
# the widget is still processed in the current paint routine
# if an update event (set) happen after the wait and clear
# the widget is processed in the current paint routine
# an extra paint routine is triggered which return immediately due to
# the empty list of widgets to be processed - Not a big deal
# if an update event (set) happen after the wait and clear and the paintAll Routine
# well, it works as it is supposed to be
self._paintEvent.wait()
self._paintEvent.clear()
w,h = TTkTerm.getTerminalSize()
self._drawMutex.acquire()
self.setGeometry(0,0,w,h)
self._fps()
TTkHelper.paintAll()
self.paintExecuted.emit()
self._drawMutex.release()
self._timer.start(1/TTkCfg.maxFps)
def _win_resize_cb(self, width, height):
TTkGlbl.term_w = int(width)
TTkGlbl.term_h = int(height)
self._drawMutex.acquire()
self.setGeometry(0,0,TTkGlbl.term_w,TTkGlbl.term_h)
TTkHelper.rePaintAll()
self._drawMutex.release()
TTkLog.info(f"Resize: w:{TTkGlbl.term_w}, h:{TTkGlbl.term_h}")
@pyTTkSlot()
def quit(self):
'''quit TermTk
.. warning::
Method Deprecated,
use :py:class:`TTkHelper` -> :py:meth:`TTkHelper.quit` instead
i.e.
.. code:: python
buttonQuit = TTkButton(text="QUIT",border=True)
buttonQuit.clicked.connect(TTkHelper.quit)
'''
TTkHelper.quit()
@pyTTkSlot()
def _quit(self):
'''Tells the application to exit with a return code.'''
if self._timer:
self._timer.timeout.disconnect(self._time_event)
TTkInput.inputEvent.clear()
TTkInput.close()
@pyTTkSlot()
def _SIGSTOP(self):
"""Reset terminal settings and stop background input read before putting to sleep"""
TTkLog.debug("Captured SIGSTOP <CTRL-z>")
TTkTerm.stop()
TTkInput.stop()
# TODO: stop the threads
os.kill(os.getpid(), signal.SIGSTOP)
@pyTTkSlot()
def _SIGCONT(self):
"""Set terminal settings and restart background input read"""
TTkLog.debug("Captured SIGCONT 'fg/bg'")
TTkTerm.cont()
TTkInput.cont()
TTkHelper.rePaintAll()
# TODO: Restart threads
# TODO: Redraw the screen
@pyTTkSlot()
def _SIGINT(self):
# If the "TERMTK_STACKTRACE" env variable is defined
# a stacktrace file is generated once CTRL+C is pressed
# i.e.
# TERMTK_STACKTRACE=stacktracetxt python3 demo/demo.py
if ('TERMTK_STACKTRACE' in os.environ and (_stacktraceFile := os.environ['TERMTK_STACKTRACE'])):
with open(_stacktraceFile,'w') as f:
import faulthandler
faulthandler.dump_traceback(f)
TTkLog.debug("Captured SIGINT <CTRL-C>")
# Deregister the handler
# so CTRL-C can be redirected to the default handler if the app does not exit
signal.signal(signal.SIGINT, signal.SIG_DFL)
TTkHelper.quit()
def isVisibleAndParent(self):
return self.isVisible()
@pyTTkSlot()
def aboutTermTk(self):
'''
Displays a simple message box about `pyTermTk <https://github.com/ceccopierangiolieugenio/pyTermTk>`__.
The message includes the version number of TermTk being used by the application.
This is useful for inclusion in the Help menu of an application, as shown in the Menus example.
'''
TTkHelper.overlay(None, TTkAbout(), 30,10)
# from .ttk_thread import TTk
from .ttk_asyncio import TTk

401
TermTk/TTkCore/ttk_asyncio.py

@ -0,0 +1,401 @@
# MIT License
#
# Copyright (c) 2021 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__all__ = ['TTk']
import os
import signal
import time
import platform
from TermTk.TTkCore.drivers import (TTkSignalDriver, TTkAsyncio)
from TermTk.TTkCore.TTkTerm.input import TTkInput
from TermTk.TTkCore.TTkTerm.inputkey import TTkKeyEvent
from TermTk.TTkCore.TTkTerm.inputmouse import TTkMouseEvent
from TermTk.TTkCore.TTkTerm.term import TTkTerm
from TermTk.TTkCore.signal import pyTTkSignal, pyTTkSlot
from TermTk.TTkCore.constant import TTkK
from TermTk.TTkCore.log import TTkLog
from TermTk.TTkCore.cfg import TTkCfg, TTkGlbl
from TermTk.TTkCore.helper import TTkHelper
from TermTk.TTkCore.helper_draw import TTkHelperDraw
from TermTk.TTkCore.timer import TTkTimer
from TermTk.TTkCore.color import TTkColor
from TermTk.TTkCore.shortcut import TTkShortcut
from TermTk.TTkWidgets.about import TTkAbout
from TermTk.TTkWidgets.widget import TTkWidget
from TermTk.TTkWidgets.container import TTkContainer
class _MouseCursor():
__slots__ = ('_cursor','_color', '_pos', 'updated')
def __init__(self):
self.updated = pyTTkSignal()
self._pos = (0,0)
self._cursor = ''
self._color = TTkColor.RST
TTkInput.inputEvent.connect(self._mouseInput)
@pyTTkSlot(TTkKeyEvent, TTkMouseEvent)
def _mouseInput(self, _, mevt):
if mevt is not None:
self._cursor = ''
self._color = TTkColor.RST
if mevt.key == TTkK.Wheel:
if mevt.evt == TTkK.WHEEL_Up:
self._cursor = ''
else:
self._cursor = ''
elif mevt.evt == TTkK.Press:
self._color = TTkColor.bg('#FFFF00') + TTkColor.fg('#000000')
elif mevt.evt == TTkK.Drag:
self._color = TTkColor.bg('#666600') + TTkColor.fg('#FFFF00')
# elif mevt.evt == TTkK.Release:
# self._color = TTkColor.bg('#006600') + TTkColor.fg('#00FFFF')
self._pos = (mevt.x, mevt.y)
self.updated.emit()
class TTk(TTkContainer):
__slots__ = (
'_termMouse', '_termDirectMouse',
'_title',
'_showMouseCursor', '_mouseCursor',
'_sigmask', '_timer',
'_drawMutex',
'_paintEvent',
'_lastMultiTap',
'paintExecuted')
def __init__(self, *,
title:str='TermTk',
sigmask:TTkTerm.Sigmask=TTkK.NONE,
mouseTrack:bool=False,
mouseCursor:bool=False,
**kwargs) -> None:
# If the "TERMTK_FILE_LOG" env variable is defined
# logs are saved in the file identified by this variable
# i.e.
# TERMTK_FILE_LOG=session.log python3 demo/demo.py
if ('TERMTK_FILE_LOG' in os.environ and (_logFile := os.environ['TERMTK_FILE_LOG'])):
TTkLog.use_default_file_logging(_logFile)
self._timer = None
self._title = title
self._sigmask = sigmask
self.paintExecuted = pyTTkSignal()
self._termMouse = True
self._termDirectMouse = mouseTrack
self._mouseCursor = None
self._showMouseCursor = os.environ.get("TERMTK_MOUSE",mouseCursor)
super().__init__(**kwargs)
TTkInput.inputEvent.connect(self._processInput)
TTkInput.pasteEvent.connect(self._processPaste)
TTkSignalDriver.sigStop.connect(self._SIGSTOP)
TTkSignalDriver.sigCont.connect(self._SIGCONT)
TTkSignalDriver.sigInt.connect( self._SIGINT)
self._drawMutex = TTkAsyncio.Lock()
self._paintEvent = TTkAsyncio.Event()
self._paintEvent.set()
self.setFocusPolicy(TTkK.ClickFocus)
self.hide()
w,h = TTkTerm.getTerminalSize()
self.setGeometry(0,0,w,h)
if 'TERMTK_NEWRENDERER' in os.environ:
TTkCfg.doubleBuffer = False
TTkCfg.doubleBufferNew = True
if os.environ.get("TERMTK_GPM",False):
self._showMouseCursor = True
TTkHelper.registerRootWidget(self)
frame = 0
time = time.time()
def _fps(self):
curtime = time.time()
self.frame+=1
delta = curtime - self.time
if delta > 5:
TTkLog.debug(f"fps: {int(self.frame/delta)}")
self.frame = 0
self.time = curtime
def mainloop(self):
try:
'''Enters the main event loop and waits until :meth:`~quit` is called or the main widget is destroyed.'''
TTkLog.debug( "" )
TTkLog.debug( " ████████╗ ████████╗ " )
TTkLog.debug( " ╚══██╔══╝ ╚══██╔══╝ " )
TTkLog.debug( " ██║ ▄▄ ▄ ▄▄ ▄▄▖▄▖ ██║ █ ▗▖ " )
TTkLog.debug( " ▞▀▚ ▖▗ ██║ █▄▄█ █▀▘ █ █ █ ██║ █▟▘ " )
TTkLog.debug( " ▙▄▞▐▄▟ ██║ ▀▄▄▖ █ █ ▝ █ ██║ █ ▀▄ " )
TTkLog.debug( " ▌ ▐ ╚═╝ ╚═╝ " )
TTkLog.debug( " ▚▄▄▘ " )
TTkLog.debug( "" )
TTkLog.debug(f" Version: {TTkCfg.version}" )
TTkLog.debug( "" )
TTkLog.debug( "Starting Main Loop..." )
TTkLog.debug(f"screen = ({TTkTerm.getTerminalSize()})")
# Register events
TTkSignalDriver.init()
TTkLog.debug("Signal Event Registered")
TTkTerm.registerResizeCb(self._win_resize_cb)
# self._timer = TTkTimer()
# self._timer.timeout.connect(self._time_event)
# self.show()
# Keep track of the multiTap to avoid the extra key release
self._lastMultiTap = False
TTkInput.init(
mouse=self._termMouse,
directMouse=self._termDirectMouse)
TTkTerm.init(
title=self._title,
sigmask=self._sigmask)
if self._showMouseCursor:
self._mouseCursor = _MouseCursor()
self._mouseCursor.updated.connect(self.update)
self.paintChildCanvas = self._mouseCursorPaintChildCanvas
TTkAsyncio.run(self._mainLoop())
finally:
if platform.system() != 'Emscripten':
TTkHelper.quitEvent.emit()
# if self._timer:
# self._timer.timeout.disconnect(self._time_event)
# self._paintEvent.set()
# self._timer.join()
TTkSignalDriver.exit()
self.quit()
TTkTerm.exit()
def _mouseCursorPaintChildCanvas(self) -> None:
super().paintChildCanvas()
ch = self._mouseCursor._cursor
pos = self._mouseCursor._pos
color = self._mouseCursor._color
self.getCanvas().drawChar(char=ch, pos=pos, color=color)
async def _mainLoop(self):
self.show()
TTkAsyncio.create_task(self._drawLoop())
if platform.system() == 'Emscripten':
return
await TTkInput.start()
@pyTTkSlot(str)
def _processPaste(self, txt:str):
if focusWidget := TTkHelper.getFocus():
while focusWidget and not focusWidget.pasteEvent(txt):
focusWidget = focusWidget.parentWidget()
@pyTTkSlot(TTkKeyEvent, TTkMouseEvent)
def _processInput(self, kevt, mevt):
# await self._drawMutex.acquire()
if kevt is not None:
self._key_event(kevt)
if mevt is not None:
self._mouse_event(mevt)
# await self._drawMutex.release()
def _mouse_event(self, mevt):
# Upload the global mouse position
# Mainly used by the drag pixmap display
TTkHelper.setMousePos((mevt.x,mevt.y))
TTkWidget._mouseOverProcessed = False
# Avoid to broadcast a key release after a multitap event
if mevt.evt == TTkK.Release and self._lastMultiTap: return
self._lastMultiTap = mevt.tap > 1
if ( TTkHelper.isDnD() and
mevt.evt != TTkK.Drag and
mevt.evt != TTkK.Release ):
# Clean Drag Drop status for any event that is not
# Mouse Drag, Key Release
TTkHelper.dndEnd()
# Mouse Events forwarded straight to the Focus widget:
# - Drag
# - Release
focusWidget = TTkHelper.getFocus()
if ( focusWidget is not None and
( mevt.evt == TTkK.Drag or
mevt.evt == TTkK.Release ) and
not TTkHelper.isDnD() ) :
x,y = TTkHelper.absPos(focusWidget)
nmevt = mevt.clone(pos=(mevt.x-x, mevt.y-y))
focusWidget.mouseEvent(nmevt)
else:
# Sometimes the release event is not retrieved
if ( focusWidget and
focusWidget._pendingMouseRelease and
not TTkHelper.isDnD() ):
focusWidget.mouseEvent(mevt.clone(evt=TTkK.Release))
focusWidget._pendingMouseRelease = False
# Adding this Crappy logic to handle a corner case in the drop routine
# where the mouse is leaving any widget able to handle the drop event
if not self.mouseEvent(mevt):
if dndw := TTkHelper.dndWidget():
dndw.dragLeaveEvent(TTkHelper.dndGetDrag().getDragLeaveEvent(mevt))
TTkHelper.dndEnter(None)
if mevt.evt == TTkK.Press and focusWidget:
focusWidget.clearFocus()
TTkHelper.focusLastModal()
# Clean the Drag and Drop in case of mouse release
if mevt.evt == TTkK.Release:
TTkHelper.dndEnd()
def _key_event(self, kevt):
keyHandled = False
# TTkLog.debug(f"Key: {kevt}")
focusWidget = TTkHelper.getFocus()
# TTkLog.debug(f"{focusWidget}")
if focusWidget is not None:
keyHandled = focusWidget.keyEvent(kevt)
if not keyHandled:
TTkShortcut.processKey(kevt, focusWidget)
# Handle Next Focus Key Binding
if not keyHandled and \
((kevt.key == TTkK.Key_Tab and kevt.mod == TTkK.NoModifier) or
( kevt.key == TTkK.Key_Right or kevt.key == TTkK.Key_Down)):
TTkHelper.nextFocus(focusWidget if focusWidget else self)
# Handle Prev Focus Key Binding
if not keyHandled and \
((kevt.key == TTkK.Key_Tab and kevt.mod == TTkK.ShiftModifier) or
( kevt.key == TTkK.Key_Left or kevt.key == TTkK.Key_Up)):
TTkHelper.prevFocus(focusWidget if focusWidget else self)
async def _drawLoop(self):
# Event.{wait and clear} should be atomic,
# BUTt: ( y )
# if an update event (set) happen in between the wait and clear
# the widget is still processed in the current paint routine
# if an update event (set) happen after the wait and clear
# the widget is processed in the current paint routine
# an extra paint routine is triggered which return immediately due to
# the empty list of widgets to be processed - Not a big deal
# if an update event (set) happen after the wait and clear and the paintAll Routine
# well, it works as it is supposed to be
await self._paintEvent.wait()
self._paintEvent.clear()
w,h = TTkTerm.getTerminalSize()
await self._drawMutex.acquire()
self.setGeometry(0,0,w,h)
self._fps()
TTkHelperDraw.paintAll()
self.paintExecuted.emit()
self._drawMutex.release()
await TTkAsyncio.sleep(1/TTkCfg.maxFps)
TTkAsyncio.create_task(self._drawLoop())
# self._timer.start(1/TTkCfg.maxFps)
def _win_resize_cb(self, width, height):
TTkGlbl.term_w = int(width)
TTkGlbl.term_h = int(height)
# await self._drawMutex.acquire()
self.setGeometry(0,0,TTkGlbl.term_w,TTkGlbl.term_h)
TTkHelperDraw.rePaintAll()
# await self._drawMutex.release()
TTkLog.info(f"Resize: w:{TTkGlbl.term_w}, h:{TTkGlbl.term_h}")
@pyTTkSlot()
def quit(self):
'''quit TermTk
.. warning::
Method Deprecated,
use :py:class:`TTkHelper` -> :py:meth:`TTkHelper.quit` instead
i.e.
.. code:: python
buttonQuit = TTkButton(text="QUIT",border=True)
buttonQuit.clicked.connect(TTkHelper.quit)
'''
TTkHelper.quit()
@pyTTkSlot()
def _quit(self):
'''Tells the application to exit with a return code.'''
if self._timer:
self._timer.timeout.disconnect(self._time_event)
TTkInput.inputEvent.clear()
TTkInput.close()
@pyTTkSlot()
def _SIGSTOP(self):
"""Reset terminal settings and stop background input read before putting to sleep"""
TTkLog.debug("Captured SIGSTOP <CTRL-z>")
TTkTerm.stop()
TTkInput.stop()
# TODO: stop the threads
os.kill(os.getpid(), signal.SIGSTOP)
@pyTTkSlot()
def _SIGCONT(self):
"""Set terminal settings and restart background input read"""
TTkLog.debug("Captured SIGCONT 'fg/bg'")
TTkTerm.cont()
TTkInput.cont()
TTkHelper.rePaintAll()
# TODO: Restart threads
# TODO: Redraw the screen
@pyTTkSlot()
def _SIGINT(self):
# If the "TERMTK_STACKTRACE" env variable is defined
# a stacktrace file is generated once CTRL+C is pressed
# i.e.
# TERMTK_STACKTRACE=stacktracetxt python3 demo/demo.py
if ('TERMTK_STACKTRACE' in os.environ and (_stacktraceFile := os.environ['TERMTK_STACKTRACE'])):
with open(_stacktraceFile,'w') as f:
import faulthandler
faulthandler.dump_traceback(f)
TTkLog.debug("Captured SIGINT <CTRL-C>")
# Deregister the handler
# so CTRL-C can be redirected to the default handler if the app does not exit
signal.signal(signal.SIGINT, signal.SIG_DFL)
TTkHelper.quit()
def isVisibleAndParent(self):
return self.isVisible()
@pyTTkSlot()
def aboutTermTk(self):
'''
Displays a simple message box about `pyTermTk <https://github.com/ceccopierangiolieugenio/pyTermTk>`__.
The message includes the version number of TermTk being used by the application.
This is useful for inclusion in the Help menu of an application, as shown in the Menus example.
'''
TTkHelper.overlay(None, TTkAbout(), 30,10)

401
TermTk/TTkCore/ttk_thread.py

@ -0,0 +1,401 @@
# MIT License
#
# Copyright (c) 2021 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__all__ = ['TTk']
import os
import signal
import time
import queue
import threading
import platform
from TermTk.TTkCore.drivers import TTkSignalDriver
from TermTk.TTkCore.TTkTerm.input import TTkInput
from TermTk.TTkCore.TTkTerm.inputkey import TTkKeyEvent
from TermTk.TTkCore.TTkTerm.inputmouse import TTkMouseEvent
from TermTk.TTkCore.TTkTerm.term import TTkTerm
from TermTk.TTkCore.signal import pyTTkSignal, pyTTkSlot
from TermTk.TTkCore.constant import TTkK
from TermTk.TTkCore.log import TTkLog
from TermTk.TTkCore.cfg import TTkCfg, TTkGlbl
from TermTk.TTkCore.helper import TTkHelper
from TermTk.TTkCore.timer import TTkTimer
from TermTk.TTkCore.color import TTkColor
from TermTk.TTkCore.shortcut import TTkShortcut
from TermTk.TTkWidgets.about import TTkAbout
from TermTk.TTkWidgets.widget import TTkWidget
from TermTk.TTkWidgets.container import TTkContainer
class _MouseCursor():
__slots__ = ('_cursor','_color', '_pos', 'updated')
def __init__(self):
self.updated = pyTTkSignal()
self._pos = (0,0)
self._cursor = ''
self._color = TTkColor.RST
TTkInput.inputEvent.connect(self._mouseInput)
@pyTTkSlot(TTkKeyEvent, TTkMouseEvent)
def _mouseInput(self, _, mevt):
if mevt is not None:
self._cursor = ''
self._color = TTkColor.RST
if mevt.key == TTkK.Wheel:
if mevt.evt == TTkK.WHEEL_Up:
self._cursor = ''
else:
self._cursor = ''
elif mevt.evt == TTkK.Press:
self._color = TTkColor.bg('#FFFF00') + TTkColor.fg('#000000')
elif mevt.evt == TTkK.Drag:
self._color = TTkColor.bg('#666600') + TTkColor.fg('#FFFF00')
# elif mevt.evt == TTkK.Release:
# self._color = TTkColor.bg('#006600') + TTkColor.fg('#00FFFF')
self._pos = (mevt.x, mevt.y)
self.updated.emit()
class TTk(TTkContainer):
__slots__ = (
'_termMouse', '_termDirectMouse',
'_title',
'_showMouseCursor', '_mouseCursor',
'_sigmask', '_timer',
'_drawMutex',
'_paintEvent',
'_lastMultiTap',
'paintExecuted')
def __init__(self, *,
title:str='TermTk',
sigmask:TTkTerm.Sigmask=TTkK.NONE,
mouseTrack:bool=False,
mouseCursor:bool=False,
**kwargs) -> None:
# If the "TERMTK_FILE_LOG" env variable is defined
# logs are saved in the file identified by this variable
# i.e.
# TERMTK_FILE_LOG=session.log python3 demo/demo.py
if ('TERMTK_FILE_LOG' in os.environ and (_logFile := os.environ['TERMTK_FILE_LOG'])):
TTkLog.use_default_file_logging(_logFile)
self._timer = None
self._title = title
self._sigmask = sigmask
self.paintExecuted = pyTTkSignal()
self._termMouse = True
self._termDirectMouse = mouseTrack
self._mouseCursor = None
self._showMouseCursor = os.environ.get("TERMTK_MOUSE",mouseCursor)
super().__init__(**kwargs)
TTkInput.inputEvent.connect(self._processInput)
TTkInput.pasteEvent.connect(self._processPaste)
TTkSignalDriver.sigStop.connect(self._SIGSTOP)
TTkSignalDriver.sigCont.connect(self._SIGCONT)
TTkSignalDriver.sigInt.connect( self._SIGINT)
self._drawMutex = threading.Lock()
self._paintEvent = threading.Event()
self._paintEvent.set()
self.setFocusPolicy(TTkK.ClickFocus)
self.hide()
w,h = TTkTerm.getTerminalSize()
self.setGeometry(0,0,w,h)
if 'TERMTK_NEWRENDERER' in os.environ:
TTkCfg.doubleBuffer = False
TTkCfg.doubleBufferNew = True
if os.environ.get("TERMTK_GPM",False):
self._showMouseCursor = True
TTkHelper.registerRootWidget(self)
frame = 0
time = time.time()
def _fps(self):
curtime = time.time()
self.frame+=1
delta = curtime - self.time
if delta > 5:
TTkLog.debug(f"fps: {int(self.frame/delta)}")
self.frame = 0
self.time = curtime
def mainloop(self):
try:
'''Enters the main event loop and waits until :meth:`~quit` is called or the main widget is destroyed.'''
TTkLog.debug( "" )
TTkLog.debug( " ████████╗ ████████╗ " )
TTkLog.debug( " ╚══██╔══╝ ╚══██╔══╝ " )
TTkLog.debug( " ██║ ▄▄ ▄ ▄▄ ▄▄▖▄▖ ██║ █ ▗▖ " )
TTkLog.debug( " ▞▀▚ ▖▗ ██║ █▄▄█ █▀▘ █ █ █ ██║ █▟▘ " )
TTkLog.debug( " ▙▄▞▐▄▟ ██║ ▀▄▄▖ █ █ ▝ █ ██║ █ ▀▄ " )
TTkLog.debug( " ▌ ▐ ╚═╝ ╚═╝ " )
TTkLog.debug( " ▚▄▄▘ " )
TTkLog.debug( "" )
TTkLog.debug(f" Version: {TTkCfg.version}" )
TTkLog.debug( "" )
TTkLog.debug( "Starting Main Loop..." )
TTkLog.debug(f"screen = ({TTkTerm.getTerminalSize()})")
# Register events
TTkSignalDriver.init()
TTkLog.debug("Signal Event Registered")
TTkTerm.registerResizeCb(self._win_resize_cb)
self._timer = TTkTimer()
self._timer.timeout.connect(self._time_event)
self._timer.start(0.1)
self.show()
# Keep track of the multiTap to avoid the extra key release
self._lastMultiTap = False
TTkInput.init(
mouse=self._termMouse,
directMouse=self._termDirectMouse)
TTkTerm.init(
title=self._title,
sigmask=self._sigmask)
if self._showMouseCursor:
self._mouseCursor = _MouseCursor()
self._mouseCursor.updated.connect(self.update)
self.paintChildCanvas = self._mouseCursorPaintChildCanvas
self._mainLoop()
finally:
if platform.system() != 'Emscripten':
TTkHelper.quitEvent.emit()
if self._timer:
self._timer.timeout.disconnect(self._time_event)
self._paintEvent.set()
self._timer.join()
TTkSignalDriver.exit()
self.quit()
TTkTerm.exit()
def _mouseCursorPaintChildCanvas(self) -> None:
super().paintChildCanvas()
ch = self._mouseCursor._cursor
pos = self._mouseCursor._pos
color = self._mouseCursor._color
self.getCanvas().drawChar(char=ch, pos=pos, color=color)
def _mainLoop(self):
if platform.system() == 'Emscripten':
return
TTkInput.start()
@pyTTkSlot(str)
def _processPaste(self, txt:str):
if focusWidget := TTkHelper.getFocus():
while focusWidget and not focusWidget.pasteEvent(txt):
focusWidget = focusWidget.parentWidget()
@pyTTkSlot(TTkKeyEvent, TTkMouseEvent)
def _processInput(self, kevt, mevt):
self._drawMutex.acquire()
if kevt is not None:
self._key_event(kevt)
if mevt is not None:
self._mouse_event(mevt)
self._drawMutex.release()
def _mouse_event(self, mevt):
# Upload the global mouse position
# Mainly used by the drag pixmap display
TTkHelper.setMousePos((mevt.x,mevt.y))
TTkWidget._mouseOverProcessed = False
# Avoid to broadcast a key release after a multitap event
if mevt.evt == TTkK.Release and self._lastMultiTap: return
self._lastMultiTap = mevt.tap > 1
if ( TTkHelper.isDnD() and
mevt.evt != TTkK.Drag and
mevt.evt != TTkK.Release ):
# Clean Drag Drop status for any event that is not
# Mouse Drag, Key Release
TTkHelper.dndEnd()
# Mouse Events forwarded straight to the Focus widget:
# - Drag
# - Release
focusWidget = TTkHelper.getFocus()
if ( focusWidget is not None and
( mevt.evt == TTkK.Drag or
mevt.evt == TTkK.Release ) and
not TTkHelper.isDnD() ) :
x,y = TTkHelper.absPos(focusWidget)
nmevt = mevt.clone(pos=(mevt.x-x, mevt.y-y))
focusWidget.mouseEvent(nmevt)
else:
# Sometimes the release event is not retrieved
if ( focusWidget and
focusWidget._pendingMouseRelease and
not TTkHelper.isDnD() ):
focusWidget.mouseEvent(mevt.clone(evt=TTkK.Release))
focusWidget._pendingMouseRelease = False
# Adding this Crappy logic to handle a corner case in the drop routine
# where the mouse is leaving any widget able to handle the drop event
if not self.mouseEvent(mevt):
if dndw := TTkHelper.dndWidget():
dndw.dragLeaveEvent(TTkHelper.dndGetDrag().getDragLeaveEvent(mevt))
TTkHelper.dndEnter(None)
if mevt.evt == TTkK.Press and focusWidget:
focusWidget.clearFocus()
TTkHelper.focusLastModal()
# Clean the Drag and Drop in case of mouse release
if mevt.evt == TTkK.Release:
TTkHelper.dndEnd()
def _key_event(self, kevt):
keyHandled = False
# TTkLog.debug(f"Key: {kevt}")
focusWidget = TTkHelper.getFocus()
# TTkLog.debug(f"{focusWidget}")
if focusWidget is not None:
keyHandled = focusWidget.keyEvent(kevt)
if not keyHandled:
TTkShortcut.processKey(kevt, focusWidget)
# Handle Next Focus Key Binding
if not keyHandled and \
((kevt.key == TTkK.Key_Tab and kevt.mod == TTkK.NoModifier) or
( kevt.key == TTkK.Key_Right or kevt.key == TTkK.Key_Down)):
TTkHelper.nextFocus(focusWidget if focusWidget else self)
# Handle Prev Focus Key Binding
if not keyHandled and \
((kevt.key == TTkK.Key_Tab and kevt.mod == TTkK.ShiftModifier) or
( kevt.key == TTkK.Key_Left or kevt.key == TTkK.Key_Up)):
TTkHelper.prevFocus(focusWidget if focusWidget else self)
def _time_event(self):
# Event.{wait and clear} should be atomic,
# BUTt: ( y )
# if an update event (set) happen in between the wait and clear
# the widget is still processed in the current paint routine
# if an update event (set) happen after the wait and clear
# the widget is processed in the current paint routine
# an extra paint routine is triggered which return immediately due to
# the empty list of widgets to be processed - Not a big deal
# if an update event (set) happen after the wait and clear and the paintAll Routine
# well, it works as it is supposed to be
self._paintEvent.wait()
self._paintEvent.clear()
w,h = TTkTerm.getTerminalSize()
self._drawMutex.acquire()
self.setGeometry(0,0,w,h)
self._fps()
TTkHelper.paintAll()
self.paintExecuted.emit()
self._drawMutex.release()
self._timer.start(1/TTkCfg.maxFps)
def _win_resize_cb(self, width, height):
TTkGlbl.term_w = int(width)
TTkGlbl.term_h = int(height)
self._drawMutex.acquire()
self.setGeometry(0,0,TTkGlbl.term_w,TTkGlbl.term_h)
TTkHelper.rePaintAll()
self._drawMutex.release()
TTkLog.info(f"Resize: w:{TTkGlbl.term_w}, h:{TTkGlbl.term_h}")
@pyTTkSlot()
def quit(self):
'''quit TermTk
.. warning::
Method Deprecated,
use :py:class:`TTkHelper` -> :py:meth:`TTkHelper.quit` instead
i.e.
.. code:: python
buttonQuit = TTkButton(text="QUIT",border=True)
buttonQuit.clicked.connect(TTkHelper.quit)
'''
TTkHelper.quit()
@pyTTkSlot()
def _quit(self):
'''Tells the application to exit with a return code.'''
if self._timer:
self._timer.timeout.disconnect(self._time_event)
TTkInput.inputEvent.clear()
TTkInput.close()
@pyTTkSlot()
def _SIGSTOP(self):
"""Reset terminal settings and stop background input read before putting to sleep"""
TTkLog.debug("Captured SIGSTOP <CTRL-z>")
TTkTerm.stop()
TTkInput.stop()
# TODO: stop the threads
os.kill(os.getpid(), signal.SIGSTOP)
@pyTTkSlot()
def _SIGCONT(self):
"""Set terminal settings and restart background input read"""
TTkLog.debug("Captured SIGCONT 'fg/bg'")
TTkTerm.cont()
TTkInput.cont()
TTkHelper.rePaintAll()
# TODO: Restart threads
# TODO: Redraw the screen
@pyTTkSlot()
def _SIGINT(self):
# If the "TERMTK_STACKTRACE" env variable is defined
# a stacktrace file is generated once CTRL+C is pressed
# i.e.
# TERMTK_STACKTRACE=stacktracetxt python3 demo/demo.py
if ('TERMTK_STACKTRACE' in os.environ and (_stacktraceFile := os.environ['TERMTK_STACKTRACE'])):
with open(_stacktraceFile,'w') as f:
import faulthandler
faulthandler.dump_traceback(f)
TTkLog.debug("Captured SIGINT <CTRL-C>")
# Deregister the handler
# so CTRL-C can be redirected to the default handler if the app does not exit
signal.signal(signal.SIGINT, signal.SIG_DFL)
TTkHelper.quit()
def isVisibleAndParent(self):
return self.isVisible()
@pyTTkSlot()
def aboutTermTk(self):
'''
Displays a simple message box about `pyTermTk <https://github.com/ceccopierangiolieugenio/pyTermTk>`__.
The message includes the version number of TermTk being used by the application.
This is useful for inclusion in the Help menu of an application, as shown in the Menus example.
'''
TTkHelper.overlay(None, TTkAbout(), 30,10)

7
TermTk/TTkWidgets/widget.py

@ -31,6 +31,7 @@ from TermTk.TTkCore.cfg import TTkCfg, TTkGlbl
from TermTk.TTkCore.constant import TTkK
from TermTk.TTkCore.log import TTkLog
from TermTk.TTkCore.helper import TTkHelper
from TermTk.TTkCore.helper_draw import TTkHelperDraw
from TermTk.TTkCore.color import TTkColor
from TermTk.TTkCore.string import TTkString
from TermTk.TTkCore.canvas import TTkCanvas
@ -39,7 +40,7 @@ from TermTk.TTkTemplates.dragevents import TDragEvents
from TermTk.TTkTemplates.mouseevents import TMouseEvents
from TermTk.TTkTemplates.keyevents import TKeyEvents
from TermTk.TTkLayouts.layout import TTkWidgetItem
from TermTk.TTkCore.TTkTerm.inputmouse import TTkMouseEvent
from TermTk.TTkCore.TTkTerm import TTkMouseEvent
class TTkWidget(TMouseEvents,TKeyEvents, TDragEvents):
''' Widget sizes:
@ -661,8 +662,8 @@ class TTkWidget(TMouseEvents,TKeyEvents, TDragEvents):
canvas.drawText(pos=(0,0), text=self.text)
'''
if repaint:
TTkHelper.addUpdateBuffer(self)
TTkHelper.addUpdateWidget(self)
TTkHelperDraw.addUpdateBuffer(self)
TTkHelperDraw.addUpdateWidget(self)
if updateParent and self._parent is not None:
self._parent.update(updateLayout=True)

Loading…
Cancel
Save