From b34febb87e2a586a515779969dae2a623e9e6e32 Mon Sep 17 00:00:00 2001 From: Eugenio Parodi Date: Thu, 3 Aug 2023 15:50:37 +0100 Subject: [PATCH] Added Terminal Quit --- TermTk/TTkWidgets/TTkTerminal/terminal.py | 273 ++++++++++-------- TermTk/TTkWidgets/TTkTerminal/terminal_alt.py | 7 +- ....openpty.py => test.pty.004.openpty.01.py} | 0 tests/test.pty.004.openpty.02.py | 135 +++++++++ tests/test.pty.006.terminal.02.py | 5 +- 5 files changed, 289 insertions(+), 131 deletions(-) rename tests/{test.pty.004.openpty.py => test.pty.004.openpty.01.py} (100%) create mode 100755 tests/test.pty.004.openpty.02.py diff --git a/TermTk/TTkWidgets/TTkTerminal/terminal.py b/TermTk/TTkWidgets/TTkTerminal/terminal.py index 5a5bc806..e7cdb352 100644 --- a/TermTk/TTkWidgets/TTkTerminal/terminal.py +++ b/TermTk/TTkWidgets/TTkTerminal/terminal.py @@ -55,13 +55,15 @@ from TermTk.TTkCore.TTkTerm.colors_ansi_map import ansiMap16, ansiMap256 __all__ = ['TTkTerminal'] class TTkTerminal(TTkWidget): - __slots__ = ('_shell', '_fd', '_inout', '_mode_normal' + __slots__ = ('_shell', '_fd', '_inout', '_proc', '_quit_pipe', '_mode_normal' '_buffer_lines', '_buffer_screen', '_screen_current', '_screen_normal', '_screen_alt') def __init__(self, *args, **kwargs): self._shell = os.environ.get('SHELL', 'sh') self._fd = None + self._proc = None self._mode_normal = True + self._quit_pipe = None self._buffer_lines = [TTkString()] self._screen_normal = _TTkTerminalNormalScreen() self._screen_alt = _TTkTerminalAltScreen() @@ -78,6 +80,7 @@ class TTkTerminal(TTkWidget): self.setFocusPolicy(TTkK.ClickFocus + TTkK.TabFocus) self.enableWidgetCursor() + TTkHelper.quitEvent.connect(self._quit) def resizeEvent(self, w: int, h: int): if self._fd: @@ -98,13 +101,21 @@ class TTkTerminal(TTkWidget): pid, self._fd = pty.fork() if pid == 0: - argv = [self._shell] - env = os.environ - os.execvpe(argv[0], argv, env) + def _spawnTerminal(argv=[self._shell], env=os.environ): + os.execvpe(argv[0], argv, env) + threading.Thread(target=_spawnTerminal).start() + TTkHelper.quit() + # os.execvpe(argv[0], argv, env) + # os.execvpe(argv[0], argv, env) + # self._proc = subprocess.Popen(self._shell) + # TTkLog.debug(f"Terminal PID={self._proc.pid=}") + # self._proc.wait() else: self._inout = os.fdopen(self._fd, "w+b", 0) name = os.ttyname(self._fd) - TTkLog.debug(f"{self._fd=} {name}") + TTkLog.debug(f"{pid=} {self._fd=} {name}") + + self._quit_pipe = os.pipe() threading.Thread(target=self.loop,args=[self]).start() @@ -123,132 +134,140 @@ class TTkTerminal(TTkWidget): re_DEC_SET_RST = re.compile('^\[\?(\d+)([lh])') # re_CURSOR_1 = re.compile(r'^(\d+)([ABCDEFGIJKLMPSTXZHf])') + @pyTTkSlot() + def _quit(self): + if self._quit_pipe: + os.write(self._quit_pipe[1], b'quit') + def loop(self, _): - while rs := select( [self._inout], [], [])[0]: + while rs := select( [self._inout,self._quit_pipe[0]], [], [])[0]: + if self._quit_pipe[0] in rs: + break # TTkLog.debug(f"Select - {rs=}") for r in rs: - if r is self._inout: - try: - out = self._inout.read(10240) - except Exception as e: - TTkLog.error(f"Error: {e=}") - return - # out = out.decode('utf-8','ignore') - out = out.decode() - for bi, bout in enumerate(out.split('\a')): # grab the bells - # TTkLog.debug(f'Eugenio->{out}') - # sout = bout.decode('utf-8','ignore') - if bi: - TTkLog.debug("BELL!!! πŸ””πŸ””πŸ””") - sout = bout.split('\033') - TTkLog.debug(f"{sout[0]=}") - self._screen_current.pushLine(sout[0]) - for slice in sout[1:]: - ssss = slice.replace('\033','').replace('\n','\\n').replace('\r','\\r') - TTkLog.debug(f"slice: {ssss}") - if m := TTkTerminal.re_DEC_SET_RST.match(slice): - en = m.end() - ps = int(m.group(1)) - sr = (m.group(2) == 'h') - self._CSI_DEC_SET_RST(ps,sr) - slice = slice[en:] - elif ( (m := TTkTerminal.re_CURSOR.match(slice)) and - (mg := m.groups()) and - mg[-1] == 'm' ): - # TTkLog.debug(f"Handle color {mg[0]=}") - en = m.end() - - color=TTkColor.RST - - if mg[0] not in ['0','']: - values = mg[0].split(';') - - fg = None - bg = None - mod = 0 - clean = False - - while values: - s = int(values.pop(0)) - if s==0: # Reset Color/Format - fg = None - bg = None - mod = 0 - clean = True - elif ( # Ansi 16 colors - 30 <= s <= 37 or # fg [ 30 - 37] - 90 <= s <= 97 ): # fg [ 90 - 97] Bright - fg = ansiMap16.get(s) - elif ( # Ansi 16 colors - 40 <= s <= 47 or # bg [ 40 - 47] - 100 <= s <= 107 ) : # bg [100 - 107] Bright - bg = ansiMap16.get(s) - elif s == 38: - t = int(values.pop(0)) - if t == 5:# 256 fg - fg = ansiMap256.get(int(values.pop(0))) - if t == 2:# 24 bit fg - fg = (int(values.pop(0)),int(values.pop(0)),int(values.pop(0))) - elif s == 48: - t = int(values.pop(0)) - if t == 5:# 256 bg - bg = ansiMap256.get(int(values.pop(0))) - if t == 2:# 24 bit bg - bg = (int(values.pop(0)),int(values.pop(0)),int(values.pop(0))) - elif s==1: mod += TTkTermColor.BOLD - elif s==3: mod += TTkTermColor.ITALIC - elif s==4: mod += TTkTermColor.UNDERLINE - elif s==9: mod += TTkTermColor.STRIKETROUGH - elif s==5: mod += TTkTermColor.BLINKING - color = TTkColor(fg=fg, bg=bg, mod=mod, clean=clean) - - self._screen_alt.setColor(color) - self._screen_normal.setColor(color) - - # TTkLog.debug(TTkString(f"color - [{mg[0]}",color).toAnsi()) - - slice = slice[en:] - elif m: - en = m.end() - y = ps = int(y) if (y:=m.group(2)) else 1 - sep = m.group(3) - x = int(x) if (x:=m.group(4)) else 1 - fn = m.group(5) - TTkLog.debug(f"{mg[0]}{fn} = ps:{y=} {sep=} {x=} {fn=}") - if fn in ['n']: - _ex = self._CSI_MAP.get( - fn, - lambda a,b,c: TTkLog.warn(f"Unhandled [{mg[0]}{fn} = ps:{y=} {sep=} {x=} {fn=}")) - _ex(self,y,x) - else: - _ex = self._screen_current._CSI_MAP.get( - fn, - lambda a,b,c: TTkLog.warn(f"Unhandled [{mg[0]}{fn} = ps:{y=} {sep=} {x=} {fn=}")) - _ex(self._screen_current,y,x) - slice = slice[en:] - else: - slice = '\033' + slice.replace('\r','') - ssss = slice.replace('\033','').replace('\n','\\n') - TTkLog.warn(f"Unhandled Slice:{ssss}") - - # TTkLog.debug(f'Eugenio->{slice}') - if '\033' in slice: - # import time - # ssss = slice.replace('\033','').replace('\n','\\n') - # TTkLog.warn(f"WAIT FOR Unhandled Slice:{ssss}") - # time.sleep(0.5) - # self._screen_current.pushLine(slice) - # TTkLog.warn(f"DONE!!!! Unhandled Slice:{ssss}") - slice = slice.replace('\033','').replace('\n','\\n') - TTkLog.warn(f"Unhandled slice: {slice=}") - # time.sleep(1.5) + if r is not self._inout: + continue + try: + out = self._inout.read(10240) + except Exception as e: + TTkLog.error(f"Error: {e=}") + return + # out = out.decode('utf-8','ignore') + out = out.decode() + for bi, bout in enumerate(out.split('\a')): # grab the bells + # TTkLog.debug(f'Eugenio->{out}') + # sout = bout.decode('utf-8','ignore') + if bi: + TTkLog.debug("BELL!!! πŸ””πŸ””πŸ””") + sout = bout.split('\033') + TTkLog.debug(f"{sout[0]=}") + self._screen_current.pushLine(sout[0]) + for slice in sout[1:]: + ssss = slice.replace('\033','').replace('\n','\\n').replace('\r','\\r') + TTkLog.debug(f"slice: {ssss}") + if m := TTkTerminal.re_DEC_SET_RST.match(slice): + en = m.end() + ps = int(m.group(1)) + sr = (m.group(2) == 'h') + self._CSI_DEC_SET_RST(ps,sr) + slice = slice[en:] + elif ( (m := TTkTerminal.re_CURSOR.match(slice)) and + (mg := m.groups()) and + mg[-1] == 'm' ): + # TTkLog.debug(f"Handle color {mg[0]=}") + en = m.end() + + color=TTkColor.RST + + if mg[0] not in ['0','']: + values = mg[0].split(';') + + fg = None + bg = None + mod = 0 + clean = False + + while values: + s = int(values.pop(0)) + if s==0: # Reset Color/Format + fg = None + bg = None + mod = 0 + clean = True + elif ( # Ansi 16 colors + 30 <= s <= 37 or # fg [ 30 - 37] + 90 <= s <= 97 ): # fg [ 90 - 97] Bright + fg = ansiMap16.get(s) + elif ( # Ansi 16 colors + 40 <= s <= 47 or # bg [ 40 - 47] + 100 <= s <= 107 ) : # bg [100 - 107] Bright + bg = ansiMap16.get(s) + elif s == 38: + t = int(values.pop(0)) + if t == 5:# 256 fg + fg = ansiMap256.get(int(values.pop(0))) + if t == 2:# 24 bit fg + fg = (int(values.pop(0)),int(values.pop(0)),int(values.pop(0))) + elif s == 48: + t = int(values.pop(0)) + if t == 5:# 256 bg + bg = ansiMap256.get(int(values.pop(0))) + if t == 2:# 24 bit bg + bg = (int(values.pop(0)),int(values.pop(0)),int(values.pop(0))) + elif s==1: mod += TTkTermColor.BOLD + elif s==3: mod += TTkTermColor.ITALIC + elif s==4: mod += TTkTermColor.UNDERLINE + elif s==9: mod += TTkTermColor.STRIKETROUGH + elif s==5: mod += TTkTermColor.BLINKING + color = TTkColor(fg=fg, bg=bg, mod=mod, clean=clean) + + self._screen_alt.setColor(color) + self._screen_normal.setColor(color) + + # TTkLog.debug(TTkString(f"color - [{mg[0]}",color).toAnsi()) + + slice = slice[en:] + elif m: + en = m.end() + y = ps = int(y) if (y:=m.group(2)) else 1 + sep = m.group(3) + x = int(x) if (x:=m.group(4)) else 1 + fn = m.group(5) + TTkLog.debug(f"{mg[0]}{fn} = ps:{y=} {sep=} {x=} {fn=}") + if fn in ['n']: + _ex = self._CSI_MAP.get( + fn, + lambda a,b,c: TTkLog.warn(f"Unhandled [{mg[0]}{fn} = ps:{y=} {sep=} {x=} {fn=}")) + _ex(self,y,x) else: - self._screen_current.pushLine(slice) - slice = slice.replace('\033','').replace('\n','\\n') - # TTkLog.debug(f"{slice=}") - self.update() - self.setWidgetCursor(pos=self._screen_current.getCursor()) - TTkLog.debug(f"wc:{self._screen_current.getCursor()}") + _ex = self._screen_current._CSI_MAP.get( + fn, + lambda a,b,c: TTkLog.warn(f"Unhandled [{mg[0]}{fn} = ps:{y=} {sep=} {x=} {fn=}")) + _ex(self._screen_current,y,x) + slice = slice[en:] + else: + slice = '\033' + slice.replace('\r','') + ssss = slice.replace('\033','').replace('\n','\\n') + TTkLog.warn(f"Unhandled Slice:{ssss}") + + # TTkLog.debug(f'Eugenio->{slice}') + if '\033' in slice: + # import time + # ssss = slice.replace('\033','').replace('\n','\\n') + # TTkLog.warn(f"WAIT FOR Unhandled Slice:{ssss}") + # time.sleep(0.5) + # self._screen_current.pushLine(slice) + # TTkLog.warn(f"DONE!!!! Unhandled Slice:{ssss}") + slice = slice.replace('\033','').replace('\n','\\n') + TTkLog.warn(f"Unhandled slice: {slice=}") + # time.sleep(1.5) + else: + self._screen_current.pushLine(slice) + slice = slice.replace('\033','').replace('\n','\\n') + # TTkLog.debug(f"{slice=}") + self.update() + self.setWidgetCursor(pos=self._screen_current.getCursor()) + TTkLog.debug(f"wc:{self._screen_current.getCursor()} {self._proc=}") def mousePressEvent(self, evt): return True diff --git a/TermTk/TTkWidgets/TTkTerminal/terminal_alt.py b/TermTk/TTkWidgets/TTkTerminal/terminal_alt.py index 30eebbb5..86ab5c1a 100644 --- a/TermTk/TTkWidgets/TTkTerminal/terminal_alt.py +++ b/TermTk/TTkWidgets/TTkTerminal/terminal_alt.py @@ -157,7 +157,6 @@ class _TTkTerminalAltScreen(): # CSI Ps ; Ps H # Cursor Position [row;column] (default = [1,1]) (CUP). def _CSI_H_CUP(self, row, col): - x,y = self._terminalCursor w,h = self._w, self._h self._terminalCursor = (min(col-1,w-1), min(row-1,h-1)) @@ -421,7 +420,9 @@ class _TTkTerminalAltScreen(): # CSI Ps ; Ps f # Horizontal and Vertical Position [row;column] (default = # [1,1]) (HVP). - def _CSI_f_HVP(self, row, col): pass + def _CSI_f_HVP(self, row, col): + w,h = self._w, self._h + self._terminalCursor = (min(col-1,w-1), min(row-1,h-1)) # CSI Ps g Tab Clear (TBC). ECMA-48 defines additional codes, but the # VT100 user manual notes that it ignores other codes. DEC's @@ -1475,7 +1476,7 @@ class _TTkTerminalAltScreen(): # 'c': _CSI_c_Pri_DA, # 'd': _CSI_d_VPA, # 'e': _CSI_e_VPR, - # 'f': _CSI_f_HVP, + 'f': _CSI_f_HVP, # 'g': _CSI_g_TBC, # 'h': _CSI_h_SM, # 'i': _CSI_i_MC, diff --git a/tests/test.pty.004.openpty.py b/tests/test.pty.004.openpty.01.py similarity index 100% rename from tests/test.pty.004.openpty.py rename to tests/test.pty.004.openpty.01.py diff --git a/tests/test.pty.004.openpty.02.py b/tests/test.pty.004.openpty.02.py new file mode 100755 index 00000000..10f9dcc7 --- /dev/null +++ b/tests/test.pty.004.openpty.02.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 + +# MIT License +# +# Copyright (c) 2023 Eugenio Parodi +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the"Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED"AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +# This test is based on: +# pty β€” Pseudo-terminal utilitiesΒΆ +# https://docs.python.org/3/library/pty.html#example +# +# Using a pseudo-terminal to interact with interactive Python in a subprocess +# by Thomas Billinger +# https://gist.github.com/thomasballinger/7979808 +# +# Run interactive Bash with popen and a dedicated TTY Python +# https://stackoverflow.com/questions/41542960/run-interactive-bash-with-popen-and-a-dedicated-tty-python + +import os +import pty +import tty +import sys +import time +import subprocess +import threading +from select import select + + +sys.path.append(os.path.join(sys.path[0],'..')) +import TermTk as ttk + +class TermThread(threading.Thread): + def __init__(self): + super().__init__() + self.shell = os.environ.get('SHELL', 'sh') + self.master, self.slave = pty.openpty() + pty.spawn(self.shell, self.read_pty) + # self.p = subprocess.Popen( + # [self.shell], + # shell=True, + # # preexec_fn=os.setsid, + # # universal_newlines=True, + # stdin=self.slave, + # stdout=self.slave, + # stderr=self.slave) + self.pin = os.fdopen(self.master, 'w') + + name = os.ttyname(self.master) + ttk.TTkLog.debug(f"{self.master=} {name}") + + name = os.ttyname(self.slave) + ttk.TTkLog.debug(f"{self.slave=} {name}") + + def read_pty(self, fds): + pass + + def setTextEdit(self, te): + self.textEdit = te + + def run(self): + while self.p.poll() is None: + rs, ws, es = select([self.master], [], []) + ttk.TTkLog.debug(f"Select - {rs=}") + for r in rs: + if r is self.master: + o = os.read(self.master, 10240) + if o: + # ttk.TTkLog.debug(f'Eugenio->{o}') + # self.textEdit.append(o.decode('utf-8').replace('\r','').replace('\033[?2004h','').replace('\033[?2004l','')) + cursor = self.textEdit.textCursor() + cursor.insertText(o.decode('utf-8').replace('\r','').replace('\033[?2004h','').replace('\033[?2004l','')) + cursor.movePosition(ttk.TTkTextCursor.End) + self.textEdit.textEditView()._updateSize() + self.textEdit.textEditView().viewMoveTo(0, cursor.position().line) + + + +class TerminalView(ttk.TTkTextEditView): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.termThread = kwargs.get('termThread') + self.setReadOnly(False) + + def mousePressEvent(self, evt): + return True + + def keyEvent(self, evt): + if evt.type == ttk.TTkK.SpecialKey: + if evt.key == ttk.TTkK.Key_Enter: + ttk.TTkLog.debug(f"Key: {evt}") + self.termThread.pin.write('\n') + else: # Input char + ttk.TTkLog.debug(f"Key: {evt.key}") + self.termThread.pin.write(evt.key) + return True + +ttk.TTkLog.use_default_file_logging() +root = ttk.TTk() + +win1 = ttk.TTkWindow(parent=root, pos=(1,1), size=(70,15), title="Terminallo n.1", border=True, layout=ttk.TTkVBoxLayout(), flags = ttk.TTkK.WindowFlag.WindowMinMaxButtonsHint) +tt1 = TermThread() +te1 = ttk.TTkTextEdit(lineNumber=True, textEditView=TerminalView(termThread=tt1)) +win1.layout().addWidget(te1) +tt1.setTextEdit(te1) +tt1.start() + +win2 = ttk.TTkWindow(parent=root, pos=(10,5), size=(70,15), title="Terminallo n.2", border=True, layout=ttk.TTkVBoxLayout(), flags = ttk.TTkK.WindowFlag.WindowMinMaxButtonsHint) +tt2 = TermThread() +te2 = ttk.TTkTextEdit(lineNumber=True, textEditView=TerminalView(termThread=tt2)) +win2.layout().addWidget(te2) +tt2.setTextEdit(te2) +tt2.start() + +wlog = ttk.TTkWindow(parent=root,pos = (32,12), size=(90,20), title="Log Window", flags=ttk.TTkK.WindowFlag.WindowCloseButtonHint) +wlog.setLayout(ttk.TTkHBoxLayout()) +ttk.TTkLogViewer(parent=wlog, follow=True ) + +root.mainloop() \ No newline at end of file diff --git a/tests/test.pty.006.terminal.02.py b/tests/test.pty.006.terminal.02.py index 51fed843..419e7921 100755 --- a/tests/test.pty.006.terminal.02.py +++ b/tests/test.pty.006.terminal.02.py @@ -56,6 +56,9 @@ split.addItem(top := ttk.TTkLayout()) split.addWidget(ttk.TTkLogViewer(follow=False ), title='Log', size=20) +quitBtn = ttk.TTkButton(text="QUIT", border=True) +quitBtn.clicked.connect(ttk.TTkHelper.quit) + win1 = ttk.TTkWindow(pos=(90,5), size=(70,15), title="Terminallo n.1", border=True, layout=ttk.TTkVBoxLayout(), flags = ttk.TTkK.WindowFlag.WindowMinMaxButtonsHint) term1 = ttk.TTkTerminal(parent=win1) term1.runShell() @@ -64,7 +67,7 @@ win2 = ttk.TTkWindow(pos=(0,0), size=(150,30), title="Terminallo n.2", border=T term2 = ttk.TTkTerminal(parent=win2) term2.runShell() -top.addWidgets([win1,win2]) +top.addWidgets([quitBtn, win1, win2]) term2.setFocus()