Browse Source

Added Terminal Quit

pull/161/head
Eugenio Parodi 3 years ago
parent
commit
b34febb87e
  1. 273
      TermTk/TTkWidgets/TTkTerminal/terminal.py
  2. 7
      TermTk/TTkWidgets/TTkTerminal/terminal_alt.py
  3. 0
      tests/test.pty.004.openpty.01.py
  4. 135
      tests/test.pty.004.openpty.02.py
  5. 5
      tests/test.pty.006.terminal.02.py

273
TermTk/TTkWidgets/TTkTerminal/terminal.py

@ -55,13 +55,15 @@ from TermTk.TTkCore.TTkTerm.colors_ansi_map import ansiMap16, ansiMap256
__all__ = ['TTkTerminal']
class TTkTerminal(TTkWidget):
__slots__ = ('_shell', '_fd', '_inout', '_mode_normal'
__slots__ = ('_shell', '_fd', '_inout', '_proc', '_quit_pipe', '_mode_normal'
'_buffer_lines', '_buffer_screen',
'_screen_current', '_screen_normal', '_screen_alt')
def __init__(self, *args, **kwargs):
self._shell = os.environ.get('SHELL', 'sh')
self._fd = None
self._proc = None
self._mode_normal = True
self._quit_pipe = None
self._buffer_lines = [TTkString()]
self._screen_normal = _TTkTerminalNormalScreen()
self._screen_alt = _TTkTerminalAltScreen()
@ -78,6 +80,7 @@ class TTkTerminal(TTkWidget):
self.setFocusPolicy(TTkK.ClickFocus + TTkK.TabFocus)
self.enableWidgetCursor()
TTkHelper.quitEvent.connect(self._quit)
def resizeEvent(self, w: int, h: int):
if self._fd:
@ -98,13 +101,21 @@ class TTkTerminal(TTkWidget):
pid, self._fd = pty.fork()
if pid == 0:
argv = [self._shell]
env = os.environ
os.execvpe(argv[0], argv, env)
def _spawnTerminal(argv=[self._shell], env=os.environ):
os.execvpe(argv[0], argv, env)
threading.Thread(target=_spawnTerminal).start()
TTkHelper.quit()
# os.execvpe(argv[0], argv, env)
# os.execvpe(argv[0], argv, env)
# self._proc = subprocess.Popen(self._shell)
# TTkLog.debug(f"Terminal PID={self._proc.pid=}")
# self._proc.wait()
else:
self._inout = os.fdopen(self._fd, "w+b", 0)
name = os.ttyname(self._fd)
TTkLog.debug(f"{self._fd=} {name}")
TTkLog.debug(f"{pid=} {self._fd=} {name}")
self._quit_pipe = os.pipe()
threading.Thread(target=self.loop,args=[self]).start()
@ -123,132 +134,140 @@ class TTkTerminal(TTkWidget):
re_DEC_SET_RST = re.compile('^\[\?(\d+)([lh])')
# re_CURSOR_1 = re.compile(r'^(\d+)([ABCDEFGIJKLMPSTXZHf])')
@pyTTkSlot()
def _quit(self):
if self._quit_pipe:
os.write(self._quit_pipe[1], b'quit')
def loop(self, _):
while rs := select( [self._inout], [], [])[0]:
while rs := select( [self._inout,self._quit_pipe[0]], [], [])[0]:
if self._quit_pipe[0] in rs:
break
# TTkLog.debug(f"Select - {rs=}")
for r in rs:
if r is self._inout:
try:
out = self._inout.read(10240)
except Exception as e:
TTkLog.error(f"Error: {e=}")
return
# out = out.decode('utf-8','ignore')
out = out.decode()
for bi, bout in enumerate(out.split('\a')): # grab the bells
# TTkLog.debug(f'Eugenio->{out}')
# sout = bout.decode('utf-8','ignore')
if bi:
TTkLog.debug("BELL!!! 🔔🔔🔔")
sout = bout.split('\033')
TTkLog.debug(f"{sout[0]=}")
self._screen_current.pushLine(sout[0])
for slice in sout[1:]:
ssss = slice.replace('\033','<ESC>').replace('\n','\\n').replace('\r','\\r')
TTkLog.debug(f"slice: {ssss}")
if m := TTkTerminal.re_DEC_SET_RST.match(slice):
en = m.end()
ps = int(m.group(1))
sr = (m.group(2) == 'h')
self._CSI_DEC_SET_RST(ps,sr)
slice = slice[en:]
elif ( (m := TTkTerminal.re_CURSOR.match(slice)) and
(mg := m.groups()) and
mg[-1] == 'm' ):
# TTkLog.debug(f"Handle color {mg[0]=}")
en = m.end()
color=TTkColor.RST
if mg[0] not in ['0','']:
values = mg[0].split(';')
fg = None
bg = None
mod = 0
clean = False
while values:
s = int(values.pop(0))
if s==0: # Reset Color/Format
fg = None
bg = None
mod = 0
clean = True
elif ( # Ansi 16 colors
30 <= s <= 37 or # fg [ 30 - 37]
90 <= s <= 97 ): # fg [ 90 - 97] Bright
fg = ansiMap16.get(s)
elif ( # Ansi 16 colors
40 <= s <= 47 or # bg [ 40 - 47]
100 <= s <= 107 ) : # bg [100 - 107] Bright
bg = ansiMap16.get(s)
elif s == 38:
t = int(values.pop(0))
if t == 5:# 256 fg
fg = ansiMap256.get(int(values.pop(0)))
if t == 2:# 24 bit fg
fg = (int(values.pop(0)),int(values.pop(0)),int(values.pop(0)))
elif s == 48:
t = int(values.pop(0))
if t == 5:# 256 bg
bg = ansiMap256.get(int(values.pop(0)))
if t == 2:# 24 bit bg
bg = (int(values.pop(0)),int(values.pop(0)),int(values.pop(0)))
elif s==1: mod += TTkTermColor.BOLD
elif s==3: mod += TTkTermColor.ITALIC
elif s==4: mod += TTkTermColor.UNDERLINE
elif s==9: mod += TTkTermColor.STRIKETROUGH
elif s==5: mod += TTkTermColor.BLINKING
color = TTkColor(fg=fg, bg=bg, mod=mod, clean=clean)
self._screen_alt.setColor(color)
self._screen_normal.setColor(color)
# TTkLog.debug(TTkString(f"color - <ESC>[{mg[0]}",color).toAnsi())
slice = slice[en:]
elif m:
en = m.end()
y = ps = int(y) if (y:=m.group(2)) else 1
sep = m.group(3)
x = int(x) if (x:=m.group(4)) else 1
fn = m.group(5)
TTkLog.debug(f"{mg[0]}{fn} = ps:{y=} {sep=} {x=} {fn=}")
if fn in ['n']:
_ex = self._CSI_MAP.get(
fn,
lambda a,b,c: TTkLog.warn(f"Unhandled <ESC>[{mg[0]}{fn} = ps:{y=} {sep=} {x=} {fn=}"))
_ex(self,y,x)
else:
_ex = self._screen_current._CSI_MAP.get(
fn,
lambda a,b,c: TTkLog.warn(f"Unhandled <ESC>[{mg[0]}{fn} = ps:{y=} {sep=} {x=} {fn=}"))
_ex(self._screen_current,y,x)
slice = slice[en:]
else:
slice = '\033' + slice.replace('\r','')
ssss = slice.replace('\033','<ESC>').replace('\n','\\n')
TTkLog.warn(f"Unhandled Slice:{ssss}")
# TTkLog.debug(f'Eugenio->{slice}')
if '\033' in slice:
# import time
# ssss = slice.replace('\033','<ESC>').replace('\n','\\n')
# TTkLog.warn(f"WAIT FOR Unhandled Slice:{ssss}")
# time.sleep(0.5)
# self._screen_current.pushLine(slice)
# TTkLog.warn(f"DONE!!!! Unhandled Slice:{ssss}")
slice = slice.replace('\033','<ESC>').replace('\n','\\n')
TTkLog.warn(f"Unhandled slice: {slice=}")
# time.sleep(1.5)
if r is not self._inout:
continue
try:
out = self._inout.read(10240)
except Exception as e:
TTkLog.error(f"Error: {e=}")
return
# out = out.decode('utf-8','ignore')
out = out.decode()
for bi, bout in enumerate(out.split('\a')): # grab the bells
# TTkLog.debug(f'Eugenio->{out}')
# sout = bout.decode('utf-8','ignore')
if bi:
TTkLog.debug("BELL!!! 🔔🔔🔔")
sout = bout.split('\033')
TTkLog.debug(f"{sout[0]=}")
self._screen_current.pushLine(sout[0])
for slice in sout[1:]:
ssss = slice.replace('\033','<ESC>').replace('\n','\\n').replace('\r','\\r')
TTkLog.debug(f"slice: {ssss}")
if m := TTkTerminal.re_DEC_SET_RST.match(slice):
en = m.end()
ps = int(m.group(1))
sr = (m.group(2) == 'h')
self._CSI_DEC_SET_RST(ps,sr)
slice = slice[en:]
elif ( (m := TTkTerminal.re_CURSOR.match(slice)) and
(mg := m.groups()) and
mg[-1] == 'm' ):
# TTkLog.debug(f"Handle color {mg[0]=}")
en = m.end()
color=TTkColor.RST
if mg[0] not in ['0','']:
values = mg[0].split(';')
fg = None
bg = None
mod = 0
clean = False
while values:
s = int(values.pop(0))
if s==0: # Reset Color/Format
fg = None
bg = None
mod = 0
clean = True
elif ( # Ansi 16 colors
30 <= s <= 37 or # fg [ 30 - 37]
90 <= s <= 97 ): # fg [ 90 - 97] Bright
fg = ansiMap16.get(s)
elif ( # Ansi 16 colors
40 <= s <= 47 or # bg [ 40 - 47]
100 <= s <= 107 ) : # bg [100 - 107] Bright
bg = ansiMap16.get(s)
elif s == 38:
t = int(values.pop(0))
if t == 5:# 256 fg
fg = ansiMap256.get(int(values.pop(0)))
if t == 2:# 24 bit fg
fg = (int(values.pop(0)),int(values.pop(0)),int(values.pop(0)))
elif s == 48:
t = int(values.pop(0))
if t == 5:# 256 bg
bg = ansiMap256.get(int(values.pop(0)))
if t == 2:# 24 bit bg
bg = (int(values.pop(0)),int(values.pop(0)),int(values.pop(0)))
elif s==1: mod += TTkTermColor.BOLD
elif s==3: mod += TTkTermColor.ITALIC
elif s==4: mod += TTkTermColor.UNDERLINE
elif s==9: mod += TTkTermColor.STRIKETROUGH
elif s==5: mod += TTkTermColor.BLINKING
color = TTkColor(fg=fg, bg=bg, mod=mod, clean=clean)
self._screen_alt.setColor(color)
self._screen_normal.setColor(color)
# TTkLog.debug(TTkString(f"color - <ESC>[{mg[0]}",color).toAnsi())
slice = slice[en:]
elif m:
en = m.end()
y = ps = int(y) if (y:=m.group(2)) else 1
sep = m.group(3)
x = int(x) if (x:=m.group(4)) else 1
fn = m.group(5)
TTkLog.debug(f"{mg[0]}{fn} = ps:{y=} {sep=} {x=} {fn=}")
if fn in ['n']:
_ex = self._CSI_MAP.get(
fn,
lambda a,b,c: TTkLog.warn(f"Unhandled <ESC>[{mg[0]}{fn} = ps:{y=} {sep=} {x=} {fn=}"))
_ex(self,y,x)
else:
self._screen_current.pushLine(slice)
slice = slice.replace('\033','<ESC>').replace('\n','\\n')
# TTkLog.debug(f"{slice=}")
self.update()
self.setWidgetCursor(pos=self._screen_current.getCursor())
TTkLog.debug(f"wc:{self._screen_current.getCursor()}")
_ex = self._screen_current._CSI_MAP.get(
fn,
lambda a,b,c: TTkLog.warn(f"Unhandled <ESC>[{mg[0]}{fn} = ps:{y=} {sep=} {x=} {fn=}"))
_ex(self._screen_current,y,x)
slice = slice[en:]
else:
slice = '\033' + slice.replace('\r','')
ssss = slice.replace('\033','<ESC>').replace('\n','\\n')
TTkLog.warn(f"Unhandled Slice:{ssss}")
# TTkLog.debug(f'Eugenio->{slice}')
if '\033' in slice:
# import time
# ssss = slice.replace('\033','<ESC>').replace('\n','\\n')
# TTkLog.warn(f"WAIT FOR Unhandled Slice:{ssss}")
# time.sleep(0.5)
# self._screen_current.pushLine(slice)
# TTkLog.warn(f"DONE!!!! Unhandled Slice:{ssss}")
slice = slice.replace('\033','<ESC>').replace('\n','\\n')
TTkLog.warn(f"Unhandled slice: {slice=}")
# time.sleep(1.5)
else:
self._screen_current.pushLine(slice)
slice = slice.replace('\033','<ESC>').replace('\n','\\n')
# TTkLog.debug(f"{slice=}")
self.update()
self.setWidgetCursor(pos=self._screen_current.getCursor())
TTkLog.debug(f"wc:{self._screen_current.getCursor()} {self._proc=}")
def mousePressEvent(self, evt):
return True

7
TermTk/TTkWidgets/TTkTerminal/terminal_alt.py

@ -157,7 +157,6 @@ class _TTkTerminalAltScreen():
# CSI Ps ; Ps H
# Cursor Position [row;column] (default = [1,1]) (CUP).
def _CSI_H_CUP(self, row, col):
x,y = self._terminalCursor
w,h = self._w, self._h
self._terminalCursor = (min(col-1,w-1), min(row-1,h-1))
@ -421,7 +420,9 @@ class _TTkTerminalAltScreen():
# CSI Ps ; Ps f
# Horizontal and Vertical Position [row;column] (default =
# [1,1]) (HVP).
def _CSI_f_HVP(self, row, col): pass
def _CSI_f_HVP(self, row, col):
w,h = self._w, self._h
self._terminalCursor = (min(col-1,w-1), min(row-1,h-1))
# CSI Ps g Tab Clear (TBC). ECMA-48 defines additional codes, but the
# VT100 user manual notes that it ignores other codes. DEC's
@ -1475,7 +1476,7 @@ class _TTkTerminalAltScreen():
# 'c': _CSI_c_Pri_DA,
# 'd': _CSI_d_VPA,
# 'e': _CSI_e_VPR,
# 'f': _CSI_f_HVP,
'f': _CSI_f_HVP,
# 'g': _CSI_g_TBC,
# 'h': _CSI_h_SM,
# 'i': _CSI_i_MC,

0
tests/test.pty.004.openpty.py → tests/test.pty.004.openpty.01.py

135
tests/test.pty.004.openpty.02.py

@ -0,0 +1,135 @@
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2023 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the"Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED"AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# This test is based on:
# pty — Pseudo-terminal utilities¶
# https://docs.python.org/3/library/pty.html#example
#
# Using a pseudo-terminal to interact with interactive Python in a subprocess
# by Thomas Billinger
# https://gist.github.com/thomasballinger/7979808
#
# Run interactive Bash with popen and a dedicated TTY Python
# https://stackoverflow.com/questions/41542960/run-interactive-bash-with-popen-and-a-dedicated-tty-python
import os
import pty
import tty
import sys
import time
import subprocess
import threading
from select import select
sys.path.append(os.path.join(sys.path[0],'..'))
import TermTk as ttk
class TermThread(threading.Thread):
def __init__(self):
super().__init__()
self.shell = os.environ.get('SHELL', 'sh')
self.master, self.slave = pty.openpty()
pty.spawn(self.shell, self.read_pty)
# self.p = subprocess.Popen(
# [self.shell],
# shell=True,
# # preexec_fn=os.setsid,
# # universal_newlines=True,
# stdin=self.slave,
# stdout=self.slave,
# stderr=self.slave)
self.pin = os.fdopen(self.master, 'w')
name = os.ttyname(self.master)
ttk.TTkLog.debug(f"{self.master=} {name}")
name = os.ttyname(self.slave)
ttk.TTkLog.debug(f"{self.slave=} {name}")
def read_pty(self, fds):
pass
def setTextEdit(self, te):
self.textEdit = te
def run(self):
while self.p.poll() is None:
rs, ws, es = select([self.master], [], [])
ttk.TTkLog.debug(f"Select - {rs=}")
for r in rs:
if r is self.master:
o = os.read(self.master, 10240)
if o:
# ttk.TTkLog.debug(f'Eugenio->{o}')
# self.textEdit.append(o.decode('utf-8').replace('\r','').replace('\033[?2004h','').replace('\033[?2004l',''))
cursor = self.textEdit.textCursor()
cursor.insertText(o.decode('utf-8').replace('\r','').replace('\033[?2004h','').replace('\033[?2004l',''))
cursor.movePosition(ttk.TTkTextCursor.End)
self.textEdit.textEditView()._updateSize()
self.textEdit.textEditView().viewMoveTo(0, cursor.position().line)
class TerminalView(ttk.TTkTextEditView):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.termThread = kwargs.get('termThread')
self.setReadOnly(False)
def mousePressEvent(self, evt):
return True
def keyEvent(self, evt):
if evt.type == ttk.TTkK.SpecialKey:
if evt.key == ttk.TTkK.Key_Enter:
ttk.TTkLog.debug(f"Key: {evt}")
self.termThread.pin.write('\n')
else: # Input char
ttk.TTkLog.debug(f"Key: {evt.key}")
self.termThread.pin.write(evt.key)
return True
ttk.TTkLog.use_default_file_logging()
root = ttk.TTk()
win1 = ttk.TTkWindow(parent=root, pos=(1,1), size=(70,15), title="Terminallo n.1", border=True, layout=ttk.TTkVBoxLayout(), flags = ttk.TTkK.WindowFlag.WindowMinMaxButtonsHint)
tt1 = TermThread()
te1 = ttk.TTkTextEdit(lineNumber=True, textEditView=TerminalView(termThread=tt1))
win1.layout().addWidget(te1)
tt1.setTextEdit(te1)
tt1.start()
win2 = ttk.TTkWindow(parent=root, pos=(10,5), size=(70,15), title="Terminallo n.2", border=True, layout=ttk.TTkVBoxLayout(), flags = ttk.TTkK.WindowFlag.WindowMinMaxButtonsHint)
tt2 = TermThread()
te2 = ttk.TTkTextEdit(lineNumber=True, textEditView=TerminalView(termThread=tt2))
win2.layout().addWidget(te2)
tt2.setTextEdit(te2)
tt2.start()
wlog = ttk.TTkWindow(parent=root,pos = (32,12), size=(90,20), title="Log Window", flags=ttk.TTkK.WindowFlag.WindowCloseButtonHint)
wlog.setLayout(ttk.TTkHBoxLayout())
ttk.TTkLogViewer(parent=wlog, follow=True )
root.mainloop()

5
tests/test.pty.006.terminal.02.py

@ -56,6 +56,9 @@ split.addItem(top := ttk.TTkLayout())
split.addWidget(ttk.TTkLogViewer(follow=False ), title='Log', size=20)
quitBtn = ttk.TTkButton(text="QUIT", border=True)
quitBtn.clicked.connect(ttk.TTkHelper.quit)
win1 = ttk.TTkWindow(pos=(90,5), size=(70,15), title="Terminallo n.1", border=True, layout=ttk.TTkVBoxLayout(), flags = ttk.TTkK.WindowFlag.WindowMinMaxButtonsHint)
term1 = ttk.TTkTerminal(parent=win1)
term1.runShell()
@ -64,7 +67,7 @@ win2 = ttk.TTkWindow(pos=(0,0), size=(150,30), title="Terminallo n.2", border=T
term2 = ttk.TTkTerminal(parent=win2)
term2.runShell()
top.addWidgets([win1,win2])
top.addWidgets([quitBtn, win1, win2])
term2.setFocus()

Loading…
Cancel
Save