@ -22,13 +22,11 @@
__all__ = [ ' TTkTerminalView ' ]
import os , pty , threading
import struct , fcntl , termios
import re
from dataclasses import dataclass
from threading import Lock
import re
from select import select
from TermTk . TTkCore . canvas import TTkCanvas
from TermTk . TTkCore . color import TTkColor
from TermTk . TTkCore . log import TTkLog
@ -37,20 +35,14 @@ from TermTk.TTkCore.cfg import TTkCfg
from TermTk . TTkCore . string import TTkString
from TermTk . TTkCore . signal import pyTTkSignal , pyTTkSlot
from TermTk . TTkCore . helper import TTkHelper
from TermTk . TTkGui . clipboard import TTkClipboard
from TermTk . TTkGui . textwrap1 import TTkTextWrap
from TermTk . TTkGui . textcursor import TTkTextCursor
from TermTk . TTkGui . textdocument import TTkTextDocument
from TermTk . TTkLayouts . gridlayout import TTkGridLayout
from TermTk . TTkAbstract . abstractscrollarea import TTkAbstractScrollArea
from TermTk . TTkAbstract . abstractscrollview import TTkAbstractScrollView , TTkAbstractScrollViewGridLayout
from TermTk . TTkWidgets . widget import TTkWidget
from TermTk . TTkAbstract . abstractscrollview import TTkAbstractScrollView
from TermTk . TTkWidgets . TTkTerminal . terminal_screen import _TTkTerminalScreen
from TermTk . TTkWidgets . TTkTerminal . mode import TTkTerminalModes
from TermTk . TTkWidgets . TTkTerminal . vt102 import TTkVT102
from TermTk . TTkCore . TTkTerm . colors import TTkTermColor
from TermTk . TTkCore . TTkTerm . colors_ansi_map import ansiMap16 , ansiMap256
@ -89,48 +81,43 @@ class TTkTerminalView(TTkAbstractScrollView, _TTkTerminal_CSI_DEC):
reportMove : bool = False
sgrMode : bool = False
__slots__ = ( ' _shell ' , ' _fd ' , ' _inout ' , ' _pid ' ,
' _quit_pipe ' , ' _resize_pipe ' ,
' _mode_normal '
' _clipboard ' , ' _selecting ' ,
' _buffer_lines ' , ' _buffer_screen ' ,
' _keyboard ' , ' _mouse ' , ' _terminal ' ,
' _screen_current ' , ' _screen_normal ' , ' _screen_alt ' ,
# Signals
' titleChanged ' , ' bell ' , ' terminalClosed ' , ' textSelected ' )
__slots__ = (
' _termLoop ' , ' _newSize ' ,
' _clipboard ' , ' _selecting ' ,
' _buffer_lines ' , ' _buffer_screen ' ,
' _keyboard ' , ' _mouse ' , ' _terminal ' ,
' _screen_current ' , ' _screen_normal ' , ' _screen_alt ' ,
# Signals
' bell ' ,
' titleChanged ' , ' terminalClosed ' , ' textSelected ' ,
' termData ' , ' termResized ' )
def __init__ ( self , * args , * * kwargs ) :
self . bell = pyTTkSignal ( )
self . terminalClosed = pyTTkSignal ( )
self . titleChanged = pyTTkSignal ( str )
self . textSelected = pyTTkSignal ( str )
self . _shell = os . environ . get ( ' SHELL ' , ' sh ' )
self . _fd = None
self . _inout = None
self . _pid = None
self . _mode_normal = True
self . _quit_pipe = None
self . _resize_pipe = None
self . termData = pyTTkSignal ( str )
self . termResized = pyTTkSignal ( int , int )
self . _newSize = None
self . _terminal = TTkTerminalView . _Terminal ( )
self . _keyboard = TTkTerminalView . _Keyboard ( )
self . _mouse = TTkTerminalView . _Mouse ( )
self . _buffer_lines = [ TTkString ( ) ]
# self._screen_normal = _TTkTerminalNormalScreen()
self . _screen_normal = _TTkTerminalScreen ( )
self . _screen_alt = _TTkTerminalScreen ( )
self . _screen_current = self . _screen_normal
self . _clipboard = TTkClipboard ( )
self . _selecting = False
# self._screen_normal.bell.connect(lambda : _termLog.debug("BELL!!! 🔔🔔🔔"))
# self._screen_alt.bell.connect( lambda : _termLog.debug("BELL!!! 🔔🔔🔔"))
self . _screen_normal . bell . connect ( self . bell . emit )
self . _screen_alt . bell . connect ( self . bell . emit )
super ( ) . __init__ ( * args , * * kwargs )
self . _termLoop = self . _loopGenerator ( )
next ( self . _termLoop )
self . _termLoop . send ( " " )
# self._screen_alt._CSI_MAP |= self._CSI_MAP
# self._screen_current._CSI_MAP |= self._CSI_MAP
super ( ) . __init__ ( * args , * * kwargs )
w , h = self . size ( )
self . _screen_alt . resize ( w , h )
@ -138,7 +125,6 @@ class TTkTerminalView(TTkAbstractScrollView, _TTkTerminal_CSI_DEC):
self . setFocusPolicy ( TTkK . ClickFocus + TTkK . TabFocus )
self . enableWidgetCursor ( )
TTkHelper . quitEvent . connect ( self . _quit )
self . viewChanged . connect ( self . _viewChangedHandler )
self . _screen_normal . bufferedLinesChanged . connect ( self . _screenChanged )
self . _screen_alt . bufferedLinesChanged . connect ( self . _screenChanged )
@ -163,149 +149,54 @@ class TTkTerminalView(TTkAbstractScrollView, _TTkTerminal_CSI_DEC):
def close ( self ) :
self . _quit ( )
def _resizeScreen ( self ) :
w , h = self . size ( )
if w < = 0 or h < = 0 : return
self . _screen_current . resize ( w , h )
if self . _fd :
# s = struct.pack('HHHH', 0, 0, 0, 0)
# t = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, s)
# print(struct.unpack('HHHH', t))
s = struct . pack ( ' HHHH ' , h , w , 0 , 0 )
t = fcntl . ioctl ( self . _fd , termios . TIOCSWINSZ , s )
# termios.tcsetwinsize(self._fd,(h,w))
def resizeEvent ( self , w : int , h : int ) :
if ( self . _resize_pipe and
( self . _screen_current . _w != w or
self . _screen_current . _h != h ) ) :
os . write ( self . _resize_pipe [ 1 ] , b ' resize ' )
self . _newSize = ( w , h )
# self._screen_alt.resize(w,h)
# self._screen_normal.resize(w,h)
_termLog . info ( f " Resize Terminal: { w =} { h =} " )
self . termResized . emit ( w , h )
return super ( ) . resizeEvent ( w , h )
def runShell ( self , program = None ) :
self . _shell = program if program else self . _shell
self . _pid , self . _fd = pty . fork ( )
if self . _pid == 0 :
def _spawnTerminal ( argv = [ self . _shell ] , env = os . environ ) :
os . execvpe ( argv [ 0 ] , argv , env )
# threading.Thread(target=_spawnTerminal).start()
# TTkHelper.quit()
_spawnTerminal ( )
import sys
sys . exit ( )
# os.execvpe(argv[0], argv, env)
# os.execvpe(argv[0], argv, env)
# self._proc = subprocess.Popen(self._shell)
# _termLog.debug(f"Terminal PID={self._proc.pid=}")
# self._proc.wait()
else :
self . _inout = os . fdopen ( self . _fd , " w+b " , 0 )
name = os . ttyname ( self . _fd )
_termLog . debug ( f " { self . _pid =} { self . _fd =} { name } " )
self . _quit_pipe = os . pipe ( )
self . _resize_pipe = os . pipe ( )
threading . Thread ( target = self . loop , args = [ self ] ) . start ( )
# def _wait(v, pid=self._pid):
# TTkLog.debug(f"Wait Terminal {v=} {self._pid=}")
# status = os.wait()
# TTkLog.debug(f"In parent process- {status=} {self._pid=}")
# TTkLog.debug(f"Terminated child's process id: {status[0]}")
# TTkLog.debug(f"Signal number that killed the child process: {status[1]}")
# threading.Thread(target=_wait,args=[self]).start()
w , h = self . size ( )
self . resizeEvent ( w , h )
# xterm escape sequences from:
# https://invisible-island.net/xterm/ctlseqs/ctlseqs.html
# https://invisible-island.net/xterm/ctlseqs/ctlseqs.html#h3-Functions-using-CSI-_-ordered-by-the-final-character_s_
re_CURSOR = re . compile ( ' ^ \ [(( \ d*)(;( \ d*))*)([@^`A-Za-z]) ' )
re_CURSOR = re . compile ( r ' ^ \ [(( \ d*)(;( \ d*))*)([@^`A-Za-z]) ' )
# https://invisible-island.net/xterm/ctlseqs/ctlseqs.html#h3-Functions-using-CSI-_-ordered-by-the-final-character_s_
# Basic Re for CSI Ps matches:
# CSI : Control Sequence Introducer "<ESC>[" = '\033['
# Ps : A single (usually optional) numeric parameter, composed of one or more digits.
# fn : the single char defining the function
re_CSI_Ps_fu = re . compile ( ' ^ \ [( \ d*)([@ABCDEFGIJKLMPSTXZ^`abcdeghinqx]) ' )
re_CSI_Ps_Ps_fu = re . compile ( ' ^ \ [( \ d*);( \ d*)([Hf]) ' )
re_CSI_Ps_fu = re . compile ( r ' ^ \ [( \ d*)([@ABCDEFGIJKLMPSTXZ^`abcdeghinqx]) ' )
re_CSI_Ps_Ps_fu = re . compile ( r ' ^ \ [( \ d*);( \ d*)([Hf]) ' )
re_DEC_SET_RST = re . compile ( ' ^ \ [( \ ??)( \ d+)([lh]) ' )
re_DEC_SET_RST = re . compile ( r ' ^ \ [( \ ??)( \ d+)([lh]) ' )
# re_CURSOR_1 = re.compile(r'^(\d+)([ABCDEFGIJKLMPSTXZHf])')
re_OSC_ps_Pt = re . compile ( ' ^( \ d*);(.*)$ ' )
re_OSC_ps_Pt = re . compile ( r ' ^( \ d*);(.*)$ ' )
re_XTWINOPS = re . compile ( ' ^ ' )
re_XTWINOPS = re . compile ( r ' ^ ' )
@pyTTkSlot ( )
def _quit ( self ) :
if self . _pid :
os . kill ( self . _pid , 0 )
os . kill ( self . _pid , 15 )
if self . _quit_pipe :
try :
os . write ( self . _quit_pipe [ 1 ] , b ' quit ' )
except :
pass
def _inputGenerator ( self ) :
while rs := select ( [ self . _inout , self . _quit_pipe [ 0 ] , self . _resize_pipe [ 0 ] ] , [ ] , [ ] ) [ 0 ] :
if self . _quit_pipe [ 0 ] in rs :
# os.close(self._quit_pipe[0])
os . close ( self . _quit_pipe [ 1 ] )
# os.close(self._resize_pipe[0])
os . close ( self . _resize_pipe [ 1 ] )
os . close ( self . _fd )
return
if self . _resize_pipe [ 0 ] in rs :
self . _resizeScreen ( )
os . read ( self . _resize_pipe [ 0 ] , 100 )
if self . _inout not in rs :
continue
# _termLog.debug(f"Select - {rs=}")
for r in rs :
if r is not self . _inout :
continue
def termWrite ( self , data ) :
if data :
self . _termLoop . send ( data )
try :
_fl = fcntl . fcntl ( self . _inout , fcntl . F_GETFL )
fcntl . fcntl ( self . _inout , fcntl . F_SETFL , _fl | os . O_NONBLOCK ) # Set the input as NONBLOCK to read the full sequence
out = b " "
while _out := self . _inout . read ( ) :
out + = _out
fcntl . fcntl ( self . _inout , fcntl . F_SETFL , _fl )
except Exception as e :
_termLog . error ( f " Error: { e =} " )
self . terminalClosed . emit ( )
return
# out = out.decode('utf-8','ignore')
try :
out = out . decode ( )
except Exception as e :
_termLog . error ( f " { e =} " )
_termLog . error ( f " Failed to decode { out } " )
out = out . decode ( ' utf-8 ' , ' ignore ' )
yield out
def loop ( self , _ ) :
def _loopGenerator ( self ) :
def _checkSize ( ) :
if self . _newSize :
TTkLog . debug ( f " { self . _newSize =} " )
self . _screen_alt . resize ( * self . _newSize )
self . _screen_normal . resize ( * self . _newSize )
self . _newSize = None
yield
SGR_SET = TTkTermColor . SGR_SET # Precacing those variables to
SGR_RST = TTkTermColor . SGR_RST # speedup the search
inputgenerator = self . _inputGenerator ( )
leftUnhandled = " "
for out in inputgenerator :
while True :
_checkSize ( )
# Entry Point 1
out = yield
if not out : return
_checkSize ( )
sout = ( leftUnhandled + out ) . split ( ' \033 ' )
_termLog . debug ( f " { leftUnhandled =} - { sout [ 0 ] =} " )
@ -476,23 +367,23 @@ class TTkTerminalView(TTkAbstractScrollView, _TTkTerminal_CSI_DEC):
self . _terminal . DCSstring + = dcs
return False , " "
def __processDCSInputGenerator ( slice ) :
ret , slice = __processDCSEscapeGenerator ( )
if not ret :
# If the terminator is not in the current escaped slices
# I need to fetch from the input until I got all of them
# This is not the nicest thing but it save a bunch of extra
# hcecks at any input just to find if we are in DCS mode or not
for out in inputgenerator :
while True :
# Entry Point 2
out = yield
if not out : return ( ) , " "
sout = out . split ( ' \033 ' )
self . _terminal . DCSstring + = sout [ 0 ]
escapeGenerator = ( i for i in sout [ 1 : ] )
ret , slice = __processDCSEscapeGenerator ( )
if ret :
return escapeGenerator , slice
ret , slice = __processDCSEscapeGenerator ( )
if not ret :
escapeGenerator , slice = __processDCSInputGenerator ( )
if ret : break
if not slice :
continue
@ -744,26 +635,26 @@ class TTkTerminalView(TTkAbstractScrollView, _TTkTerminal_CSI_DEC):
return ret , _slice , __oscString
return False , " " , __oscString
def __processOSCInputGenerator ( __oscString : str ) :
ret , slice , oscString = __checkOSCBell ( slice , oscString )
if not ret :
ret , slice , oscString = __processOSCEscapeGenerator ( oscString )
if not ret :
# If the terminator is not in the current escaped slices
# I need to fetch from the input until I got all of them
# This is not the nicest thing but it save a bunch of extra
# hcecks at any input just to find if we are in OSC mode or not
for out in inputgenerator :
while True :
# Entry Point 3
out = yield
if not out : return ( ) , " "
sout = out . split ( ' \033 ' )
self . _terminal . DCSstring + = sout [ 0 ]
escapeGenerator = ( i for i in sout [ 1 : ] )
ret , _slice , __oscString = __processOSCEscapeGenerator ( __oscString )
if ret :
return escapeGenerator , slice , __oscString
ret , slice , oscString = __checkOSCBell ( slice , oscString )
if not ret :
ret , slice , oscString = __processOSCEscapeGenerator ( oscString )
if not ret :
escapeGenerator , slice , oscString = __processOSCInputGenerator ( oscString )
ret , slice , oscString = __processOSCEscapeGenerator ( oscString )
if ret : break
_termLog . info ( f " OSC: { oscString } " )
@ -812,15 +703,12 @@ class TTkTerminalView(TTkAbstractScrollView, _TTkTerminal_CSI_DEC):
def pasteEvent ( self , txt : str ) :
if self . _terminal . bracketedMode :
txt = " \033 [200~ " + txt + " \033 [201~ "
if self . _inout :
self . _inout . write ( txt . encode ( ) )
self . termData . emit ( txt . encode ( ) )
return True
def keyEvent ( self , evt ) :
# _termLog.debug(f"Key: {evt.code=}")
_termLog . debug ( f " Key: { str ( evt ) =} " )
if not self . _inout :
return False
if evt . type == TTkK . SpecialKey :
if evt . mod == TTkK . ControlModifier and evt . key == TTkK . Key_V :
txt = self . _clipboard . text ( )
@ -831,16 +719,16 @@ class TTkTerminalView(TTkAbstractScrollView, _TTkTerminal_CSI_DEC):
TTkK . Key_Down : b " \033 OB " ,
TTkK . Key_Right : b " \033 OC " ,
TTkK . Key_Left : b " \033 OD " } . get ( evt . key ) :
self . _inout . write ( code )
self . termData . emit ( code )
return True
# if evt.key == TTkK.Key_Enter:
# _termLog.debug(f"Key: Enter")
# # self._inout.write (b'\n')
# # self._inout.write (evt.code.encode())
# # self.termData.emit (b'\n')
# # self.termData.emit (evt.code.encode())
else : # Input char
_termLog . debug ( f " Key: { evt . key =} " )
# self._inout.write (evt.key.encode())
self . _inout . write ( evt . code . encode ( ) )
# self.termData.emit (evt.key.encode())
self . termData . emit ( evt . code . encode ( ) )
return True
# Extended coordinates
@ -917,8 +805,6 @@ class TTkTerminalView(TTkAbstractScrollView, _TTkTerminal_CSI_DEC):
# report position in pixels rather than character cells.
def _sendMouse ( self , evt ) :
if not self . _inout :
return False
if not self . _mouse . reportPress :
return False
if ( not self . _mouse . reportDrag and
@ -947,7 +833,7 @@ class TTkTerminalView(TTkAbstractScrollView, _TTkTerminal_CSI_DEC):
TTkK . WHEEL_Down : ( k , 1 , ' M ' ) } . get (
evt . evt , ( 0 , 0 , ' M ' ) )
# _termLog.mouse(f'Mouse: <ESC>[<{k+km};{x};{y}{pr}')
self . _inout . write ( f ' \033 [< { k + km } ; { x } ; { y } { pr } ' . encode ( ) )
self . termData . emit ( f ' \033 [< { k + km } ; { x } ; { y } { pr } ' . encode ( ) )
else :
head = {
TTkK . Press : b ' \033 [M ' ,
@ -962,7 +848,7 @@ class TTkTerminalView(TTkAbstractScrollView, _TTkTerminal_CSI_DEC):
bah . append ( ( x + 32 ) % 0xff )
bah . append ( ( y + 32 ) % 0xff )
# _termLog.mouse(f'Mouse: '+bah.decode().replace('\033','<ESC>'))
self . _inout . write ( bah )
self . termData . emit ( bah )
return True
def mousePressEvent ( self , evt ) :
@ -1050,9 +936,9 @@ class TTkTerminalView(TTkAbstractScrollView, _TTkTerminal_CSI_DEC):
def _CSI_n_DSR ( self , ps , _ ) :
x , y = self . _screen_current . getCursor ( )
if ps == 6 :
self . _inout . write ( f " \033 [ { y + 1 } ; { x + 1 } R " . encode ( ) )
self . termData . emit ( f " \033 [ { y + 1 } ; { x + 1 } R " . encode ( ) )
elif ps == 5 :
self . _inout . write ( f " \033 [0n " . encode ( ) )
self . termData . emit ( f " \033 [0n " . encode ( ) )
# CSI Ps ; Ps ; Ps t
# Window manipulation (XTWINOPS), dtterm, extended by xterm.