Browse Source

Added basic gpm driver

pull/296/head
Eugenio Parodi 🌶️ 1 year ago
parent
commit
83207346ae
  1. 10
      .vscode/launch.json
  2. 17
      TermTk/TTkCore/drivers/__init__.py
  3. 2
      TermTk/TTkCore/drivers/unix.py
  4. 301
      TermTk/TTkCore/drivers/unix_gpm.py
  5. 2
      TermTk/TTkCore/drivers/unix_thread.py
  6. 36
      tests/t.generic/test.ctypes.01.gpm.01.py
  7. 242
      tests/t.generic/test.ctypes.01.gpm.02.py
  8. 250
      tests/t.generic/test.ctypes.01.gpm.03.thread.py
  9. 263
      tests/t.generic/test.ctypes.01.gpm.04.thread.queue.py
  10. 270
      tests/t.generic/test.ctypes.01.gpm.05.thread.queue.generator.py
  11. 264
      tests/t.generic/test.ctypes.01.gpm.06.getch_rewrite.py
  12. 70
      tests/t.generic/test.ctypes.02.fd.conversion.py
  13. 11
      tests/t.input/test.input.py

10
.vscode/launch.json vendored

@ -20,6 +20,16 @@
"console": "integratedTerminal",
"justMyCode": true,
"args": ["tools/webExporter/js/ttkproxy.js"]
},{
"name": "Python: Current File with Env",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": true,
"env": {
"TERMTK_GPM": "1"
},
},{
"name": "Python: Test Player",
"type": "debugpy",

17
TermTk/TTkCore/drivers/__init__.py

@ -4,16 +4,25 @@ import platform
if importlib.util.find_spec('pyodideProxy'):
from .pyodide import *
from .term_pyodide import *
elif platform.system() == 'Linux':
from .unix import *
import os
if os.environ.get("TERMTK_FORCESERIAL",False):
from .term_unix_serial import *
if os.environ.get("TERMTK_GPM",False):
from .unix_gpm import *
# from .term_gpm import *
from .term_unix import *
else:
from .term_unix import *
from .unix import *
if os.environ.get("TERMTK_FORCESERIAL",False):
from .term_unix_serial import *
else:
from .term_unix import *
elif platform.system() == 'Darwin':
from .unix import *
from .term_unix import *
elif platform.system() == 'Windows':
from .windows import *
from .term_windows import *

2
TermTk/TTkCore/drivers/unix.py

@ -51,7 +51,7 @@ class TTkInputDriver():
def read(self):
rm = re.compile('(\033?[^\033]+)')
while self._readPipe[0] not in (list := select( [sys.stdin, self._readPipe[0]], [], [] )[0]):
while self._readPipe[0] not in (_rlist := select( [sys.stdin, self._readPipe[0]], [], [] )[0]):
# Read all the full input
_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, _fl | os.O_NONBLOCK) # Set the input as NONBLOCK to read the full sequence

301
TermTk/TTkCore/drivers/unix_gpm.py

@ -0,0 +1,301 @@
# MIT License
#
# Copyright (c) 2024 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__all__ = ['TTkSignalDriver','TTkInputDriver']
import sys, os, re
import signal
from select import select
import ctypes
try: import fcntl, termios, tty
except Exception as e:
print(f'ERROR: {e}')
exit(1)
from TermTk.TTkCore.signal import pyTTkSignal, pyTTkSlot
'''
#define GPM_MAGIC 0x47706D4C /* "GpmL" */
typedef struct Gpm_Connect {
unsigned short eventMask, defaultMask;
unsigned short minMod, maxMod;
int pid;
int vc;
} Gpm_Connect;
'''
class _Gpm_Connect(ctypes.Structure):
_fields_ = [
("eventMask", ctypes.c_ushort),
("defaultMask", ctypes.c_ushort),
("minMod", ctypes.c_ushort),
("maxMod", ctypes.c_ushort),
("pid", ctypes.c_int),
("vc", ctypes.c_int)]
'''
enum Gpm_Etype {
GPM_MOVE=1,
GPM_DRAG=2, /* exactly one of the bare ones is active at a time */
GPM_DOWN=4,
GPM_UP= 8,
#define GPM_BARE_EVENTS(type) ((type)&(0x0f|GPM_ENTER|GPM_LEAVE))
GPM_SINGLE=16, /* at most one in three is set */
GPM_DOUBLE=32,
GPM_TRIPLE=64, /* WARNING: I depend on the values */
GPM_MFLAG=128, /* motion during click? */
GPM_HARD=256, /* if set in the defaultMask, force an already
used event to pass over to another handler */
GPM_ENTER=512, /* enter event, user in Roi's */
GPM_LEAVE=1024 /* leave event, used in Roi's */
};
enum Gpm_Margin {GPM_TOP=1, GPM_BOT=2, GPM_LFT=4, GPM_RGT=8};
typedef struct Gpm_Event {
unsigned char buttons, modifiers; /* try to be a multiple of 4 */
unsigned short vc;
short dx, dy, x, y; /* displacement x,y for this event, and absolute x,y */
enum Gpm_Etype type;
/* clicks e.g. double click are determined by time-based processing */
int clicks;
enum Gpm_Margin margin;
/* wdx/y: displacement of wheels in this event. Absolute values are not
* required, because wheel movement is typically used for scrolling
* or selecting fields, not for cursor positioning. The application
* can determine when the end of file or form is reached, and not
* go any further.
* A single mouse will use wdy, "vertical scroll" wheel. */
short wdx, wdy;
} Gpm_Event;
'''
class _Gpm_Event(ctypes.Structure):
_fields_ = [
("buttons", ctypes.c_ubyte),
("modifiers", ctypes.c_ubyte),
("vc", ctypes.c_short),
("dx", ctypes.c_short),
("dy", ctypes.c_short),
("x", ctypes.c_short),
("y", ctypes.c_short),
("type", ctypes.c_int),
("clicks", ctypes.c_int),
("margin", ctypes.c_int),
("wdx", ctypes.c_short),
("wdy", ctypes.c_short)]
_GPM_HANDLER_FUNC = ctypes.CFUNCTYPE(
ctypes.c_int,
ctypes.POINTER(_Gpm_Event),
ctypes.POINTER(ctypes.c_void_p))
class TTkInputDriver():
__slots__ = ('_readPipe', '_attr',
'_libgpm', '_libc', '_cstdin')
def __init__(self):
self._libgpm = ctypes.CDLL('libgpm.so.2')
self._libc = ctypes.cdll.LoadLibrary('libc.so.6')
self._cstdin = ctypes.c_void_p.in_dll(self._libc, 'stdin')
self._readPipe = os.pipe()
self._attr = termios.tcgetattr(sys.stdin)
tty.setcbreak(sys.stdin)
def close(self):
termios.tcsetattr(sys.stdin, termios.TCSANOW, self._attr)
os.write(self._readPipe[1], b'quit')
def cont(self):
tty.setcbreak(sys.stdin)
def _gpm_handler(self, event:_Gpm_Event) -> str:
code = 0x00
state = 'M'
x = event.x
y = event.y
# wdx = ec.wdx
wdy = event.wdy # mouse wheel
# Types:
# from: <https://github.com/telmich/gpm.git>/src/headers/gpm.h
# https://github.com/telmich/gpm/blob/master/src/headers/gpm.h
# MOVE = 0x0001
# DRAG = 0x0002
# DOWN = 0x0004
# UP = 0x0008
#
# SINGLE = 0x0010
# DOUBLE = 0x0020
# TRIPLE = 0x0040
# MFLAG = 0x0080
# HARD = 0x0100
# ENTER = 0x0200
# LEAVE = 0x0400
# # exactly one of the bare ones is active at a time
# BARE_EVENTS(type) ((type)&(0x0f|ENTER|LEAVE))
etype = event.type
if etype & 0x0008: # UP
state = 'm'
# Buttons:
# from: <https://github.com/telmich/gpm.git>/src/headers/gpm.h
# https://github.com/telmich/gpm/blob/master/src/headers/gpm.h
# DOWN 0x20
# UP 0x10
# FOURTH 0x08
# LEFT 0x04
# MIDDLE 0x02
# RIGHT 0x01
# NONE 0x00
buttons = event.buttons
if wdy == 1: # Wheel UP(1)
code |= 0x40
elif wdy == -1: # Wheel DOWN(-1)
code |= 0x41
elif etype & (0x0004|0x0008): # DOWN/UP
if buttons & 0x04: # LEFT
code |= 0x00
elif buttons & 0x01: # RIGHT
code |= 0x02
elif buttons & 0x02: # MIDDLE
code |= 0x01
elif etype & (0x0002): # MOVE
if buttons & 0x04: # LEFT
code |= 0x20
elif buttons & 0x01: # RIGHT
code |= 0x22
elif buttons & 0x02: # MIDDLE
code |= 0x21
elif etype & (0x0001): # MOVE
code |= 0x23
# Modifiers:
# From: /usr/include/linux/keyboard.h
# SHIFT 0x01 << 0x00 = 0x0001
# CTRL 0x01 << 0x02 = 0x0004
# ALT 0x01 << 0x03 = 0x0008
# ALTGR 0x01 << 0x01 = 0x0002
# SHIFTL 0x01 << 0x04 = 0x0010
# KANASHIFT 0x01 << 0x04 = 0x0010
# SHIFTR 0x01 << 0x05 = 0x0020
# CTRLL 0x01 << 0x06 = 0x0040
# CTRLR 0x01 << 0x07 = 0x0080
# CAPSSHIFT 0x01 << 0x08 = 0x0100
modifiers = event.modifiers
if modifiers & 0x0001: # SHIFT
code |= 0x27
if modifiers & 0x0004: # CTRL
code |= 0x10
if modifiers & (0x0008|0x0002): # ALT/ALTGR
code |= 0x08
return f"\033[<{code};{x};{y}{state}"
def read(self):
rm = re.compile('(\033?[^\033]+)')
_conn = _Gpm_Connect()
_conn.eventMask = ~0 # Want to know about all the events
_conn.defaultMask = 0 # don't handle anything by default
_conn.minMod = 0 # want everything
_conn.maxMod = ~0 # all modifiers included
if (_gpm_fd := self._libgpm.Gpm_Open(ctypes.pointer(_conn), 0)) == -1:
raise Exception("Cannot connect to the mouse server")
if _gpm_fd < 0:
self._libgpm.Gpm_Close()
raise Exception("Xterm GPM driver not supported")
_ev = _Gpm_Event()
with os.fdopen(_gpm_fd, "r") as gpm_file_obj:
while self._readPipe[0] not in (_rlist := select( [sys.stdin, gpm_file_obj, self._readPipe[0]], [], [] )[0]):
if gpm_file_obj in _rlist:
self._libgpm.Gpm_GetEvent(ctypes.pointer(_ev))
yield self._gpm_handler(_ev)
if sys.stdin in _rlist:
# Read all the full input
_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, _fl | os.O_NONBLOCK) # Set the input as NONBLOCK to read the full sequence
stdinRead = sys.stdin.read()
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, _fl)
# Split all the ansi sequences
# or yield any separate input char
if stdinRead == '\033':
yield '\033'
continue
for sr in rm.findall(stdinRead):
if '\033' == sr[0]:
yield sr
else:
for ch in sr:
yield ch
self._libgpm.Gpm_Close()
class TTkSignalDriver():
sigStop = pyTTkSignal()
sigCont = pyTTkSignal()
sigInt = pyTTkSignal()
@staticmethod
def init():
# Register events
signal.signal(signal.SIGTSTP, TTkSignalDriver._SIGSTOP) # Ctrl-Z
signal.signal(signal.SIGCONT, TTkSignalDriver._SIGCONT) # Resume
signal.signal(signal.SIGINT, TTkSignalDriver._SIGINT) # Ctrl-C
def exit():
signal.signal(signal.SIGINT, signal.SIG_DFL)
def _SIGSTOP(signum, frame): TTkSignalDriver.sigStop.emit()
def _SIGCONT(signum, frame): TTkSignalDriver.sigCont.emit()
def _SIGINT( signum, frame): TTkSignalDriver.sigInt.emit()
def _main():
inputDriver = TTkInputDriver()
for stdinRead in inputDriver.read():
out = stdinRead.replace('\033','<ESC>')
print(f"Input: {out}")
if stdinRead == 'q':
print('Break')
break
inputDriver.close()
if __name__ == "__main__":
_main()

2
TermTk/TTkCore/drivers/unix_thread.py

@ -72,7 +72,7 @@ class ReadInput():
def close(self):
os.write(self._readPipe[1], b'quit')
def _read(self):
def _read_old(self):
_fn = sys.stdin.fileno()
_attr = termios.tcgetattr(_fn)
tty.setcbreak(_fn)

36
tests/t.generic/test.ctypes.01.gpm.01.py

@ -32,14 +32,12 @@ from ctypes import CDLL, c_void_p, cdll, CFUNCTYPE
from ctypes import Structure, Union, pointer, POINTER, cast
from ctypes import c_short, c_int, c_char, c_ushort, c_ubyte, c_void_p
libgpm = CDLL('libgpm.so.2')
libgpm = CDLL('libgpm.so.2') # libgpm.so.2
libc = cdll.LoadLibrary('libc.so.6')
libc = cdll.LoadLibrary('libc.so.6') # libc.so.6
cstdout = c_void_p.in_dll(libc, 'stdout')
cstdin = c_void_p.in_dll(libc, 'stdin')
gpm_handler = c_void_p.in_dll(libgpm, 'gpm_handler')
'''
#define GPM_MAGIC 0x47706D4C /* "GpmL" */
typedef struct Gpm_Connect {
@ -136,12 +134,18 @@ def my_handler(event:Gpm_Event, data):
dy = ec.dy
x = ec.x
y = ec.y
type = ec.type
clicks = ec.clicks
margin = ec.margin
wdx = ec.wdx
wdy = ec.wdy
print(f"{buttons=}, {modifiers=}, {vc=}, {dx=}, {dy=}, {x=}, {y=}, {type=}, {clicks=}, {margin=}, {wdx=}, {wdy=}")
types = []
for t in (tt:={1:"Move",2:"Drag",4:"Down",8:"Up",
16:"Single",32:"Double",64:"Triple",
128:"MFlag",256:"HARD",512:"ENTER",1024:"LEAVE"}):
if t&ec.type:
types.append(tt[t])
margin = {1:"Top",2:"Bottom",4:"Left",8:"Right"}.get(ec.margin,f"UNDEF ({ec.margin})!!!")
print(f"{buttons=}, {modifiers=}, {vc=}, {dx=}, {dy=}, {x=}, {y=}, {types=}, {clicks=}, {margin=}, {wdx=}, {wdy=}")
return 0
'''
@ -173,27 +177,29 @@ def main():
conn.minMod = 0 # want everything
conn.maxMod = ~0 # all modifiers included
gpm_handler = c_void_p.in_dll(libgpm, 'gpm_handler')
gpm_fd = c_void_p.in_dll(libgpm, 'gpm_fd')
print("Open Connection")
print(f"{libgpm.gpm_handler=} {gpm_handler=} {gpm_handler.value=} {HANDLER_FUNC(my_handler)=}")
print(f"{libgpm.gpm_handler=} {gpm_handler=} {gpm_handler.value=} {gpm_fd=}")
# RDFM;
# This is one of the rare cases where "Readuing the Fucking Manual" was the only way to solve this issue
# Not just that but a basic knowledge of c casting annd function pointers
# https://docs.python.org/3/library/ctypes.html#type-conversions
gpm_handler.value = cast(HANDLER_FUNC(my_handler),c_void_p).value
print(f"{libgpm.gpm_handler=} {gpm_handler=}")
if (gpm_fd := libgpm.Gpm_Open(pointer(conn), 0)) == -1:
print(f"{libgpm.gpm_handler=} {gpm_handler=} {gpm_handler.value=} {gpm_fd=}")
if (_gpm_fd := libgpm.Gpm_Open(pointer(conn), 0)) == -1:
print("Cannot connect to mouse server\n")
gpm_fd.value = _gpm_fd
print("Starting Loop")
while c := libgpm.Gpm_Getc(cstdin):
print(f"Key: {c=:04X} ")
# event = Gpm_Event()
# while libgpm.Gpm_GetEvent(pointer(event)) > 0:
# print(event)
# print(event)
# while c := libgpm.Gpm_Getchar():
# print(f"Key: {c=:04X} ")
libgpm.Gpm_Close()

242
tests/t.generic/test.ctypes.01.gpm.02.py

@ -0,0 +1,242 @@
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2021 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# Example from:
# https://www.linuxjournal.com/article/4600
# https://stackoverflow.com/questions/3794309/python-ctypes-python-file-object-c-file
import sys
import termios, tty
from ctypes import ( CDLL, c_void_p, cdll, CFUNCTYPE )
from ctypes import ( Structure, Union, pointer, POINTER, cast )
from ctypes import ( c_short, c_int, c_char, c_ushort, c_ubyte, c_void_p )
libgpm = CDLL('libgpm.so.2') # libgpm.so.2
libc = cdll.LoadLibrary('libc.so.6') # libc.so.6
cstdout = c_void_p.in_dll(libc, 'stdout')
cstdin = c_void_p.in_dll(libc, 'stdin')
'''
#define GPM_MAGIC 0x47706D4C /* "GpmL" */
typedef struct Gpm_Connect {
unsigned short eventMask, defaultMask;
unsigned short minMod, maxMod;
int pid;
int vc;
} Gpm_Connect;
'''
class Gpm_Connect(Structure):
_fields_ = [
("eventMask", c_ushort),
("defaultMask", c_ushort),
("minMod", c_ushort),
("maxMod", c_ushort),
("pid", c_int),
("vc", c_int)]
'''
enum Gpm_Etype {
GPM_MOVE=1,
GPM_DRAG=2, /* exactly one of the bare ones is active at a time */
GPM_DOWN=4,
GPM_UP= 8,
#define GPM_BARE_EVENTS(type) ((type)&(0x0f|GPM_ENTER|GPM_LEAVE))
GPM_SINGLE=16, /* at most one in three is set */
GPM_DOUBLE=32,
GPM_TRIPLE=64, /* WARNING: I depend on the values */
GPM_MFLAG=128, /* motion during click? */
GPM_HARD=256, /* if set in the defaultMask, force an already
used event to pass over to another handler */
GPM_ENTER=512, /* enter event, user in Roi's */
GPM_LEAVE=1024 /* leave event, used in Roi's */
};
enum Gpm_Margin {GPM_TOP=1, GPM_BOT=2, GPM_LFT=4, GPM_RGT=8};
typedef struct Gpm_Event {
unsigned char buttons, modifiers; /* try to be a multiple of 4 */
unsigned short vc;
short dx, dy, x, y; /* displacement x,y for this event, and absolute x,y */
enum Gpm_Etype type;
/* clicks e.g. double click are determined by time-based processing */
int clicks;
enum Gpm_Margin margin;
/* wdx/y: displacement of wheels in this event. Absolute values are not
* required, because wheel movement is typically used for scrolling
* or selecting fields, not for cursor positioning. The application
* can determine when the end of file or form is reached, and not
* go any further.
* A single mouse will use wdy, "vertical scroll" wheel. */
short wdx, wdy;
} Gpm_Event;
'''
class Gpm_Event(Structure):
_fields_ = [
("buttons", c_ubyte),
("modifiers", c_ubyte),
("vc", c_short),
("dx", c_short),
("dy", c_short),
("x", c_short),
("y", c_short),
("type", c_int),
("clicks", c_int),
("margin", c_int),
("wdx", c_short),
("wdy", c_short)]
'''
int my_handler(Gpm_Event *event, void *data)
{
printf("Event Type : %d at x=%d y=%d\n", event->type, event->x, event->y);
return 0;
}
'''
# CFUNCTYPE(c_int, POINTER(Gpm_Event), c_void_p)
HANDLER_FUNC = CFUNCTYPE(c_int, POINTER(Gpm_Event), POINTER(c_void_p))
# gpm_handler = POINTER(HANDLER_FUNC).in_dll(libgpm, 'gpm_handler')
def my_handler(event:Gpm_Event, data):
# print(f"{event=} {data=} {dir(event)=} {event.contents=}")
ec = event.contents
buttons = ec.buttons
modifiers = ec.modifiers
vc = ec.vc
dx = ec.dx
dy = ec.dy
x = ec.x
y = ec.y
clicks = ec.clicks
wdx = ec.wdx
wdy = ec.wdy
types = []
for t in (tt:={1:"Move",2:"Drag",4:"Down",8:"Up",
16:"Single",32:"Double",64:"Triple",
128:"MFlag",256:"HARD",512:"ENTER",1024:"LEAVE"}):
if t&ec.type:
types.append(tt[t])
margin = {1:"Top",2:"Bottom",4:"Left",8:"Right"}.get(ec.margin,f"UNDEF ({ec.margin})!!!")
print(f"{buttons=}, {modifiers=}, {vc=}, {dx=}, {dy=}, {x=}, {y=}, {types=}, {clicks=}, {margin=}, {wdx=}, {wdy=}")
return 0
'''
int main()
{ Gpm_Connect conn;
int c;
conn.eventMask = ~0; /* Want to know about all the events */
conn.defaultMask = 0; /* don't handle anything by default */
conn.minMod = 0; /* want everything */
conn.maxMod = ~0; /* all modifiers included */
if(Gpm_Open(&conn, 0) == -1)
printf("Cannot connect to mouse server\n");
gpm_handler = my_handler;
while((c = Gpm_Getc(stdin)) != EOF)
printf("%c", c);
Gpm_Close();
return 0;
}
'''
def setEcho(val: bool):
# Set/Unset Terminal Input Echo
(i,o,c,l,isp,osp,cc) = termios.tcgetattr(sys.stdin.fileno())
if val: l |= termios.ECHO
else: l &= ~termios.ECHO
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, [i,o,c,l,isp,osp,cc])
def CRNL(val: bool):
#Translate carriage return to newline on input (unless IGNCR is set).
# '\n' CTRL-J
# '\r' CTRL-M (Enter)
(i,o,c,l,isp,osp,cc) = termios.tcgetattr(sys.stdin.fileno())
if val: i |= termios.ICRNL
else: i &= ~termios.ICRNL
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, [i,o,c,l,isp,osp,cc])
def main():
conn = Gpm_Connect()
c = 0
conn.eventMask = ~0 # Want to know about all the events
conn.defaultMask = 0 # don't handle anything by default
conn.minMod = 0 # want everything
conn.maxMod = ~0 # all modifiers included
gpm_handler = c_void_p.in_dll(libgpm, 'gpm_handler')
gpm_fd = c_int.in_dll(libgpm, 'gpm_fd')
print("Open Connection")
print(f"{libgpm.gpm_handler=} {gpm_handler=} {gpm_handler.value=} {gpm_fd=}")
# RDFM;
# This is one of the rare cases where "Readuing the Fucking Manual" was the only way to solve this issue
# Not just that but a basic knowledge of c casting annd function pointers
# https://docs.python.org/3/library/ctypes.html#type-conversions
gpm_handler.value = cast(HANDLER_FUNC(my_handler),c_void_p).value
print(f"{libgpm.gpm_handler=} {gpm_handler=} {gpm_handler.value=} {gpm_fd=}")
if (_gpm_fd := libgpm.Gpm_Open(pointer(conn), 0)) == -1:
print("Cannot connect to mouse server\n")
gpm_fd.value = _gpm_fd
print(f"{libgpm.gpm_handler=} {gpm_handler=} {gpm_handler.value=} {gpm_fd=}")
print("Starting Loop")
print(f"{sys.stdin=} {cstdin=}")
# setEcho(False)
old_settings = termios.tcgetattr(sys.stdin)
# new_settings = termios.tcgetattr(sys.stdin)
# new_settings[3] &= ~termios.ICANON
# new_settings[3] &= ~termios.ICRNL
# new_settings[3] &= ~termios.ECHO
# termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings)
tty.setcbreak(sys.stdin)
while (c:=libgpm.Gpm_Getc(cstdin)) and c != ord('q'):
print(f"Key: {c=:04X} ")
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
# setEcho(True)
# while c := libgpm.Gpm_Getchar():
# print(f"Key: {c=:04X} ")
libgpm.Gpm_Close()
if __name__ == "__main__":
main()

250
tests/t.generic/test.ctypes.01.gpm.03.thread.py

@ -0,0 +1,250 @@
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2021 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# Example from:
# https://www.linuxjournal.com/article/4600
# https://stackoverflow.com/questions/3794309/python-ctypes-python-file-object-c-file
import sys
import termios, tty
import threading, queue
from ctypes import ( CDLL, c_void_p, cdll, CFUNCTYPE )
from ctypes import ( Structure, Union, pointer, POINTER, cast )
from ctypes import ( c_short, c_int, c_char, c_ushort, c_ubyte, c_void_p )
libgpm = CDLL('libgpm.so.2') # libgpm.so.2
libc = cdll.LoadLibrary('libc.so.6') # libc.so.6
cstdout = c_void_p.in_dll(libc, 'stdout')
cstdin = c_void_p.in_dll(libc, 'stdin')
'''
#define GPM_MAGIC 0x47706D4C /* "GpmL" */
typedef struct Gpm_Connect {
unsigned short eventMask, defaultMask;
unsigned short minMod, maxMod;
int pid;
int vc;
} Gpm_Connect;
'''
class Gpm_Connect(Structure):
_fields_ = [
("eventMask", c_ushort),
("defaultMask", c_ushort),
("minMod", c_ushort),
("maxMod", c_ushort),
("pid", c_int),
("vc", c_int)]
'''
enum Gpm_Etype {
GPM_MOVE=1,
GPM_DRAG=2, /* exactly one of the bare ones is active at a time */
GPM_DOWN=4,
GPM_UP= 8,
#define GPM_BARE_EVENTS(type) ((type)&(0x0f|GPM_ENTER|GPM_LEAVE))
GPM_SINGLE=16, /* at most one in three is set */
GPM_DOUBLE=32,
GPM_TRIPLE=64, /* WARNING: I depend on the values */
GPM_MFLAG=128, /* motion during click? */
GPM_HARD=256, /* if set in the defaultMask, force an already
used event to pass over to another handler */
GPM_ENTER=512, /* enter event, user in Roi's */
GPM_LEAVE=1024 /* leave event, used in Roi's */
};
enum Gpm_Margin {GPM_TOP=1, GPM_BOT=2, GPM_LFT=4, GPM_RGT=8};
typedef struct Gpm_Event {
unsigned char buttons, modifiers; /* try to be a multiple of 4 */
unsigned short vc;
short dx, dy, x, y; /* displacement x,y for this event, and absolute x,y */
enum Gpm_Etype type;
/* clicks e.g. double click are determined by time-based processing */
int clicks;
enum Gpm_Margin margin;
/* wdx/y: displacement of wheels in this event. Absolute values are not
* required, because wheel movement is typically used for scrolling
* or selecting fields, not for cursor positioning. The application
* can determine when the end of file or form is reached, and not
* go any further.
* A single mouse will use wdy, "vertical scroll" wheel. */
short wdx, wdy;
} Gpm_Event;
'''
class Gpm_Event(Structure):
_fields_ = [
("buttons", c_ubyte),
("modifiers", c_ubyte),
("vc", c_short),
("dx", c_short),
("dy", c_short),
("x", c_short),
("y", c_short),
("type", c_int),
("clicks", c_int),
("margin", c_int),
("wdx", c_short),
("wdy", c_short)]
'''
int my_handler(Gpm_Event *event, void *data)
{
printf("Event Type : %d at x=%d y=%d\n", event->type, event->x, event->y);
return 0;
}
'''
# CFUNCTYPE(c_int, POINTER(Gpm_Event), c_void_p)
HANDLER_FUNC = CFUNCTYPE(c_int, POINTER(Gpm_Event), POINTER(c_void_p))
# gpm_handler = POINTER(HANDLER_FUNC).in_dll(libgpm, 'gpm_handler')
def my_handler(event:Gpm_Event, data):
# print(f"{event=} {data=} {dir(event)=} {event.contents=}")
ec = event.contents
buttons = ec.buttons
modifiers = ec.modifiers
vc = ec.vc
dx = ec.dx
dy = ec.dy
x = ec.x
y = ec.y
clicks = ec.clicks
wdx = ec.wdx
wdy = ec.wdy
types = []
for t in (tt:={1:"Move",2:"Drag",4:"Down",8:"Up",
16:"Single",32:"Double",64:"Triple",
128:"MFlag",256:"HARD",512:"ENTER",1024:"LEAVE"}):
if t&ec.type:
types.append(tt[t])
margin = {1:"Top",2:"Bottom",4:"Left",8:"Right"}.get(ec.margin,f"UNDEF ({ec.margin})!!!")
print(f"{buttons=}, {modifiers=}, {vc=}, {dx=}, {dy=}, {x=}, {y=}, {types=}, {clicks=}, {margin=}, {wdx=}, {wdy=}")
return 0
'''
int main()
{ Gpm_Connect conn;
int c;
conn.eventMask = ~0; /* Want to know about all the events */
conn.defaultMask = 0; /* don't handle anything by default */
conn.minMod = 0; /* want everything */
conn.maxMod = ~0; /* all modifiers included */
if(Gpm_Open(&conn, 0) == -1)
printf("Cannot connect to mouse server\n");
gpm_handler = my_handler;
while((c = Gpm_Getc(stdin)) != EOF)
printf("%c", c);
Gpm_Close();
return 0;
}
'''
def setEcho(val: bool):
# Set/Unset Terminal Input Echo
(i,o,c,l,isp,osp,cc) = termios.tcgetattr(sys.stdin.fileno())
if val: l |= termios.ECHO
else: l &= ~termios.ECHO
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, [i,o,c,l,isp,osp,cc])
def CRNL(val: bool):
#Translate carriage return to newline on input (unless IGNCR is set).
# '\n' CTRL-J
# '\r' CTRL-M (Enter)
(i,o,c,l,isp,osp,cc) = termios.tcgetattr(sys.stdin.fileno())
if val: i |= termios.ICRNL
else: i &= ~termios.ICRNL
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, [i,o,c,l,isp,osp,cc])
def main():
conn = Gpm_Connect()
c = 0
conn.eventMask = ~0 # Want to know about all the events
conn.defaultMask = 0 # don't handle anything by default
conn.minMod = 0 # want everything
conn.maxMod = ~0 # all modifiers included
gpm_handler = c_void_p.in_dll(libgpm, 'gpm_handler')
gpm_fd = c_int.in_dll(libgpm, 'gpm_fd')
print("Open Connection")
print(f"{libgpm.gpm_handler=} {gpm_handler=} {gpm_handler.value=} {gpm_fd=}")
# RDFM;
# This is one of the rare cases where "Readuing the Fucking Manual" was the only way to solve this issue
# Not just that but a basic knowledge of c casting annd function pointers
# https://docs.python.org/3/library/ctypes.html#type-conversions
gpm_handler.value = cast(HANDLER_FUNC(my_handler),c_void_p).value
print(f"{libgpm.gpm_handler=} {gpm_handler=} {gpm_handler.value=} {gpm_fd=}")
if (_gpm_fd := libgpm.Gpm_Open(pointer(conn), 0)) == -1:
print("Cannot connect to mouse server\n")
gpm_fd.value = _gpm_fd
print(f"{libgpm.gpm_handler=} {gpm_handler=} {gpm_handler.value=} {gpm_fd=}")
print("Starting Loop")
print(f"{sys.stdin=} {cstdin=}")
# setEcho(False)
old_settings = termios.tcgetattr(sys.stdin)
# new_settings = termios.tcgetattr(sys.stdin)
# new_settings[3] &= ~termios.ICANON
# new_settings[3] &= ~termios.ICRNL
# new_settings[3] &= ~termios.ECHO
# termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings)
tty.setcbreak(sys.stdin)
def _processGpm():
print(f"Thread Started")
while (c:=libgpm.Gpm_Getc(cstdin)) and c != ord('q'):
print(f"Key: {c=:04X} - '{chr(c)}'")
t = threading.Thread(target=_processGpm)
t.start()
t.join()
print(f"Thread Joined")
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
# setEcho(True)
# while c := libgpm.Gpm_Getchar():
# print(f"Key: {c=:04X} ")
libgpm.Gpm_Close()
if __name__ == "__main__":
main()

263
tests/t.generic/test.ctypes.01.gpm.04.thread.queue.py

@ -0,0 +1,263 @@
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2021 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# Example from:
# https://www.linuxjournal.com/article/4600
# https://stackoverflow.com/questions/3794309/python-ctypes-python-file-object-c-file
import sys
import termios, tty
import threading, queue
from ctypes import ( CDLL, c_void_p, cdll, CFUNCTYPE )
from ctypes import ( Structure, Union, pointer, POINTER, cast )
from ctypes import ( c_short, c_int, c_char, c_ushort, c_ubyte, c_void_p )
libgpm = CDLL('libgpm.so.2') # libgpm.so.2
libc = cdll.LoadLibrary('libc.so.6') # libc.so.6
cstdout = c_void_p.in_dll(libc, 'stdout')
cstdin = c_void_p.in_dll(libc, 'stdin')
gpmQueue = queue.Queue()
'''
#define GPM_MAGIC 0x47706D4C /* "GpmL" */
typedef struct Gpm_Connect {
unsigned short eventMask, defaultMask;
unsigned short minMod, maxMod;
int pid;
int vc;
} Gpm_Connect;
'''
class Gpm_Connect(Structure):
_fields_ = [
("eventMask", c_ushort),
("defaultMask", c_ushort),
("minMod", c_ushort),
("maxMod", c_ushort),
("pid", c_int),
("vc", c_int)]
'''
enum Gpm_Etype {
GPM_MOVE=1,
GPM_DRAG=2, /* exactly one of the bare ones is active at a time */
GPM_DOWN=4,
GPM_UP= 8,
#define GPM_BARE_EVENTS(type) ((type)&(0x0f|GPM_ENTER|GPM_LEAVE))
GPM_SINGLE=16, /* at most one in three is set */
GPM_DOUBLE=32,
GPM_TRIPLE=64, /* WARNING: I depend on the values */
GPM_MFLAG=128, /* motion during click? */
GPM_HARD=256, /* if set in the defaultMask, force an already
used event to pass over to another handler */
GPM_ENTER=512, /* enter event, user in Roi's */
GPM_LEAVE=1024 /* leave event, used in Roi's */
};
enum Gpm_Margin {GPM_TOP=1, GPM_BOT=2, GPM_LFT=4, GPM_RGT=8};
typedef struct Gpm_Event {
unsigned char buttons, modifiers; /* try to be a multiple of 4 */
unsigned short vc;
short dx, dy, x, y; /* displacement x,y for this event, and absolute x,y */
enum Gpm_Etype type;
/* clicks e.g. double click are determined by time-based processing */
int clicks;
enum Gpm_Margin margin;
/* wdx/y: displacement of wheels in this event. Absolute values are not
* required, because wheel movement is typically used for scrolling
* or selecting fields, not for cursor positioning. The application
* can determine when the end of file or form is reached, and not
* go any further.
* A single mouse will use wdy, "vertical scroll" wheel. */
short wdx, wdy;
} Gpm_Event;
'''
class Gpm_Event(Structure):
_fields_ = [
("buttons", c_ubyte),
("modifiers", c_ubyte),
("vc", c_short),
("dx", c_short),
("dy", c_short),
("x", c_short),
("y", c_short),
("type", c_int),
("clicks", c_int),
("margin", c_int),
("wdx", c_short),
("wdy", c_short)]
'''
int my_handler(Gpm_Event *event, void *data)
{
printf("Event Type : %d at x=%d y=%d\n", event->type, event->x, event->y);
return 0;
}
'''
# CFUNCTYPE(c_int, POINTER(Gpm_Event), c_void_p)
HANDLER_FUNC = CFUNCTYPE(c_int, POINTER(Gpm_Event), POINTER(c_void_p))
# gpm_handler = POINTER(HANDLER_FUNC).in_dll(libgpm, 'gpm_handler')
def my_handler(event:Gpm_Event, data):
# print(f"{event=} {data=} {dir(event)=} {event.contents=}")
ec = event.contents
buttons = ec.buttons
modifiers = ec.modifiers
vc = ec.vc
dx = ec.dx
dy = ec.dy
x = ec.x
y = ec.y
clicks = ec.clicks
wdx = ec.wdx
wdy = ec.wdy
types = []
for t in (tt:={1:"Move",2:"Drag",4:"Down",8:"Up",
16:"Single",32:"Double",64:"Triple",
128:"MFlag",256:"HARD",512:"ENTER",1024:"LEAVE"}):
if t&ec.type:
types.append(tt[t])
margin = {1:"Top",2:"Bottom",4:"Left",8:"Right"}.get(ec.margin,f"UNDEF ({ec.margin})!!!")
# print(f"{buttons=}, {modifiers=}, {vc=}, {dx=}, {dy=}, {x=}, {y=}, {types=}, {clicks=}, {margin=}, {wdx=}, {wdy=}")
gpmQueue.put(f"{buttons=}, {modifiers=}, {vc=}, {dx=}, {dy=}, {x=}, {y=}, {types=}, {clicks=}, {margin=}, {wdx=}, {wdy=}")
return 0
'''
int main()
{ Gpm_Connect conn;
int c;
conn.eventMask = ~0; /* Want to know about all the events */
conn.defaultMask = 0; /* don't handle anything by default */
conn.minMod = 0; /* want everything */
conn.maxMod = ~0; /* all modifiers included */
if(Gpm_Open(&conn, 0) == -1)
printf("Cannot connect to mouse server\n");
gpm_handler = my_handler;
while((c = Gpm_Getc(stdin)) != EOF)
printf("%c", c);
Gpm_Close();
return 0;
}
'''
def setEcho(val: bool):
# Set/Unset Terminal Input Echo
(i,o,c,l,isp,osp,cc) = termios.tcgetattr(sys.stdin.fileno())
if val: l |= termios.ECHO
else: l &= ~termios.ECHO
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, [i,o,c,l,isp,osp,cc])
def CRNL(val: bool):
#Translate carriage return to newline on input (unless IGNCR is set).
# '\n' CTRL-J
# '\r' CTRL-M (Enter)
(i,o,c,l,isp,osp,cc) = termios.tcgetattr(sys.stdin.fileno())
if val: i |= termios.ICRNL
else: i &= ~termios.ICRNL
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, [i,o,c,l,isp,osp,cc])
def main():
conn = Gpm_Connect()
c = 0
conn.eventMask = ~0 # Want to know about all the events
conn.defaultMask = 0 # don't handle anything by default
conn.minMod = 0 # want everything
conn.maxMod = ~0 # all modifiers included
gpm_handler = c_void_p.in_dll(libgpm, 'gpm_handler')
gpm_fd = c_int.in_dll(libgpm, 'gpm_fd')
print("Open Connection")
print(f"{libgpm.gpm_handler=} {gpm_handler=} {gpm_handler.value=} {gpm_fd=}")
# RDFM;
# This is one of the rare cases where "Readuing the Fucking Manual" was the only way to solve this issue
# Not just that but a basic knowledge of c casting annd function pointers
# https://docs.python.org/3/library/ctypes.html#type-conversions
gpm_handler.value = cast(HANDLER_FUNC(my_handler),c_void_p).value
print(f"{libgpm.gpm_handler=} {gpm_handler=} {gpm_handler.value=} {gpm_fd=}")
if (_gpm_fd := libgpm.Gpm_Open(pointer(conn), 0)) == -1:
print("Cannot connect to mouse server\n")
gpm_fd.value = _gpm_fd
print(f"{libgpm.gpm_handler=} {gpm_handler=} {gpm_handler.value=} {gpm_fd=}")
print("Starting Loop")
print(f"{sys.stdin=} {cstdin=}")
# setEcho(False)
old_settings = termios.tcgetattr(sys.stdin)
# new_settings = termios.tcgetattr(sys.stdin)
# new_settings[3] &= ~termios.ICANON
# new_settings[3] &= ~termios.ICRNL
# new_settings[3] &= ~termios.ECHO
# termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings)
tty.setcbreak(sys.stdin)
def _processGpm():
print(f"Thread Started")
while (c:=libgpm.Gpm_Getc(cstdin)) and c != ord('q'):
print(f"Key: {c=:04X} - '{chr(c)}'")
gpmQueue.put(chr(c))
gpmQueue.put(None)
t = threading.Thread(target=_processGpm)
t.start()
while True:
item = gpmQueue.get()
if item is None:
break
print(f"{item=}")
gpmQueue.task_done()
t.join()
print(f"Thread Joined")
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
# setEcho(True)
# while c := libgpm.Gpm_Getchar():
# print(f"Key: {c=:04X} ")
libgpm.Gpm_Close()
if __name__ == "__main__":
main()

270
tests/t.generic/test.ctypes.01.gpm.05.thread.queue.generator.py

@ -0,0 +1,270 @@
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2021 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# Example from:
# https://www.linuxjournal.com/article/4600
# https://stackoverflow.com/questions/3794309/python-ctypes-python-file-object-c-file
import sys
import termios, tty
import threading, queue
from ctypes import ( CDLL, c_void_p, cdll, CFUNCTYPE )
from ctypes import ( Structure, Union, pointer, POINTER, cast )
from ctypes import ( c_short, c_int, c_char, c_ushort, c_ubyte, c_void_p )
libgpm = CDLL('libgpm.so.2') # libgpm.so.2
libc = cdll.LoadLibrary('libc.so.6') # libc.so.6
cstdout = c_void_p.in_dll(libc, 'stdout')
cstdin = c_void_p.in_dll(libc, 'stdin')
gpmQueue = queue.Queue()
'''
#define GPM_MAGIC 0x47706D4C /* "GpmL" */
typedef struct Gpm_Connect {
unsigned short eventMask, defaultMask;
unsigned short minMod, maxMod;
int pid;
int vc;
} Gpm_Connect;
'''
class Gpm_Connect(Structure):
_fields_ = [
("eventMask", c_ushort),
("defaultMask", c_ushort),
("minMod", c_ushort),
("maxMod", c_ushort),
("pid", c_int),
("vc", c_int)]
'''
enum Gpm_Etype {
GPM_MOVE=1,
GPM_DRAG=2, /* exactly one of the bare ones is active at a time */
GPM_DOWN=4,
GPM_UP= 8,
#define GPM_BARE_EVENTS(type) ((type)&(0x0f|GPM_ENTER|GPM_LEAVE))
GPM_SINGLE=16, /* at most one in three is set */
GPM_DOUBLE=32,
GPM_TRIPLE=64, /* WARNING: I depend on the values */
GPM_MFLAG=128, /* motion during click? */
GPM_HARD=256, /* if set in the defaultMask, force an already
used event to pass over to another handler */
GPM_ENTER=512, /* enter event, user in Roi's */
GPM_LEAVE=1024 /* leave event, used in Roi's */
};
enum Gpm_Margin {GPM_TOP=1, GPM_BOT=2, GPM_LFT=4, GPM_RGT=8};
typedef struct Gpm_Event {
unsigned char buttons, modifiers; /* try to be a multiple of 4 */
unsigned short vc;
short dx, dy, x, y; /* displacement x,y for this event, and absolute x,y */
enum Gpm_Etype type;
/* clicks e.g. double click are determined by time-based processing */
int clicks;
enum Gpm_Margin margin;
/* wdx/y: displacement of wheels in this event. Absolute values are not
* required, because wheel movement is typically used for scrolling
* or selecting fields, not for cursor positioning. The application
* can determine when the end of file or form is reached, and not
* go any further.
* A single mouse will use wdy, "vertical scroll" wheel. */
short wdx, wdy;
} Gpm_Event;
'''
class Gpm_Event(Structure):
_fields_ = [
("buttons", c_ubyte),
("modifiers", c_ubyte),
("vc", c_short),
("dx", c_short),
("dy", c_short),
("x", c_short),
("y", c_short),
("type", c_int),
("clicks", c_int),
("margin", c_int),
("wdx", c_short),
("wdy", c_short)]
'''
int my_handler(Gpm_Event *event, void *data)
{
printf("Event Type : %d at x=%d y=%d\n", event->type, event->x, event->y);
return 0;
}
'''
# CFUNCTYPE(c_int, POINTER(Gpm_Event), c_void_p)
HANDLER_FUNC = CFUNCTYPE(c_int, POINTER(Gpm_Event), POINTER(c_void_p))
# gpm_handler = POINTER(HANDLER_FUNC).in_dll(libgpm, 'gpm_handler')
def my_handler(event:Gpm_Event, data):
# print(f"{event=} {data=} {dir(event)=} {event.contents=}")
ec = event.contents
buttons = ec.buttons
modifiers = ec.modifiers
vc = ec.vc
dx = ec.dx
dy = ec.dy
x = ec.x
y = ec.y
clicks = ec.clicks
wdx = ec.wdx
wdy = ec.wdy
types = []
for t in (tt:={1:"Move",2:"Drag",4:"Down",8:"Up",
16:"Single",32:"Double",64:"Triple",
128:"MFlag",256:"HARD",512:"ENTER",1024:"LEAVE"}):
if t&ec.type:
types.append(tt[t])
margin = {1:"Top",2:"Bottom",4:"Left",8:"Right"}.get(ec.margin,f"UNDEF ({ec.margin})!!!")
# print(f"{buttons=}, {modifiers=}, {vc=}, {dx=}, {dy=}, {x=}, {y=}, {types=}, {clicks=}, {margin=}, {wdx=}, {wdy=}")
gpmQueue.put(f"{buttons=}, {modifiers=}, {vc=}, {dx=}, {dy=}, {x=}, {y=}, {types=}, {clicks=}, {margin=}, {wdx=}, {wdy=}")
return 0
'''
int main()
{ Gpm_Connect conn;
int c;
conn.eventMask = ~0; /* Want to know about all the events */
conn.defaultMask = 0; /* don't handle anything by default */
conn.minMod = 0; /* want everything */
conn.maxMod = ~0; /* all modifiers included */
if(Gpm_Open(&conn, 0) == -1)
printf("Cannot connect to mouse server\n");
gpm_handler = my_handler;
while((c = Gpm_Getc(stdin)) != EOF)
printf("%c", c);
Gpm_Close();
return 0;
}
'''
def setEcho(val: bool):
# Set/Unset Terminal Input Echo
(i,o,c,l,isp,osp,cc) = termios.tcgetattr(sys.stdin.fileno())
if val: l |= termios.ECHO
else: l &= ~termios.ECHO
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, [i,o,c,l,isp,osp,cc])
def CRNL(val: bool):
#Translate carriage return to newline on input (unless IGNCR is set).
# '\n' CTRL-J
# '\r' CTRL-M (Enter)
(i,o,c,l,isp,osp,cc) = termios.tcgetattr(sys.stdin.fileno())
if val: i |= termios.ICRNL
else: i &= ~termios.ICRNL
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, [i,o,c,l,isp,osp,cc])
def main():
conn = Gpm_Connect()
c = 0
conn.eventMask = ~0 # Want to know about all the events
conn.defaultMask = 0 # don't handle anything by default
conn.minMod = 0 # want everything
conn.maxMod = ~0 # all modifiers included
gpm_handler = c_void_p.in_dll(libgpm, 'gpm_handler')
gpm_fd = c_int.in_dll(libgpm, 'gpm_fd')
print("Open Connection")
print(f"{libgpm.gpm_handler=} {gpm_handler=} {gpm_handler.value=} {gpm_fd=}")
# RDFM;
# This is one of the rare cases where "Readuing the Fucking Manual" was the only way to solve this issue
# Not just that but a basic knowledge of c casting annd function pointers
# https://docs.python.org/3/library/ctypes.html#type-conversions
gpm_handler.value = cast(HANDLER_FUNC(my_handler),c_void_p).value
print(f"{libgpm.gpm_handler=} {gpm_handler=} {gpm_handler.value=} {gpm_fd=}")
if (_gpm_fd := libgpm.Gpm_Open(pointer(conn), 0)) == -1:
print("Cannot connect to mouse server\n")
gpm_fd.value = _gpm_fd
print(f"{libgpm.gpm_handler=} {gpm_handler=} {gpm_handler.value=} {gpm_fd=}")
print("Starting Loop")
print(f"{sys.stdin=} {cstdin=}")
# setEcho(False)
old_settings = termios.tcgetattr(sys.stdin)
# new_settings = termios.tcgetattr(sys.stdin)
# new_settings[3] &= ~termios.ICANON
# new_settings[3] &= ~termios.ICRNL
# new_settings[3] &= ~termios.ECHO
# termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings)
tty.setcbreak(sys.stdin)
def _processGpm():
print(f"Thread Started")
while (c:=libgpm.Gpm_Getc(cstdin)):
print(f"Key: {c=:04X} - '{chr(c)}'")
gpmQueue.put(chr(c))
if c == ord('q'):
break
gpmQueue.put(None)
def _q_to_gen():
t = threading.Thread(target=_processGpm)
t.start()
while True:
item = gpmQueue.get()
if item is None:
break
print(f"1: {item=}")
gpmQueue.task_done()
yield item
t.join()
print(f"Thread Joined")
for item in _q_to_gen():
print(f"2: {item=}")
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
# setEcho(True)
# while c := libgpm.Gpm_Getchar():
# print(f"Key: {c=:04X} ")
print(f"GPM Close()")
libgpm.Gpm_Close()
if __name__ == "__main__":
main()

264
tests/t.generic/test.ctypes.01.gpm.06.getch_rewrite.py

@ -0,0 +1,264 @@
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2021 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# Example from:
# https://www.linuxjournal.com/article/4600
# https://stackoverflow.com/questions/3794309/python-ctypes-python-file-object-c-file
import sys, os
import termios, tty
import threading, queue
try: import fcntl, termios, tty
except Exception as e:
print(f'ERROR: {e}')
exit(1)
from select import select
from ctypes import ( CDLL, c_void_p, cdll, CFUNCTYPE )
from ctypes import ( Structure, Union, pointer, POINTER, cast )
from ctypes import ( c_short, c_int, c_char, c_ushort, c_ubyte, c_void_p )
libgpm = CDLL('libgpm.so.2') # libgpm.so.2
libc = cdll.LoadLibrary('libc.so.6') # libc.so.6
cstdout = c_void_p.in_dll(libc, 'stdout')
cstdin = c_void_p.in_dll(libc, 'stdin')
gpmQueue = queue.Queue()
'''
#define GPM_MAGIC 0x47706D4C /* "GpmL" */
typedef struct Gpm_Connect {
unsigned short eventMask, defaultMask;
unsigned short minMod, maxMod;
int pid;
int vc;
} Gpm_Connect;
'''
class Gpm_Connect(Structure):
_fields_ = [
("eventMask", c_ushort),
("defaultMask", c_ushort),
("minMod", c_ushort),
("maxMod", c_ushort),
("pid", c_int),
("vc", c_int)]
'''
enum Gpm_Etype {
GPM_MOVE=1,
GPM_DRAG=2, /* exactly one of the bare ones is active at a time */
GPM_DOWN=4,
GPM_UP= 8,
#define GPM_BARE_EVENTS(type) ((type)&(0x0f|GPM_ENTER|GPM_LEAVE))
GPM_SINGLE=16, /* at most one in three is set */
GPM_DOUBLE=32,
GPM_TRIPLE=64, /* WARNING: I depend on the values */
GPM_MFLAG=128, /* motion during click? */
GPM_HARD=256, /* if set in the defaultMask, force an already
used event to pass over to another handler */
GPM_ENTER=512, /* enter event, user in Roi's */
GPM_LEAVE=1024 /* leave event, used in Roi's */
};
enum Gpm_Margin {GPM_TOP=1, GPM_BOT=2, GPM_LFT=4, GPM_RGT=8};
typedef struct Gpm_Event {
unsigned char buttons, modifiers; /* try to be a multiple of 4 */
unsigned short vc;
short dx, dy, x, y; /* displacement x,y for this event, and absolute x,y */
enum Gpm_Etype type;
/* clicks e.g. double click are determined by time-based processing */
int clicks;
enum Gpm_Margin margin;
/* wdx/y: displacement of wheels in this event. Absolute values are not
* required, because wheel movement is typically used for scrolling
* or selecting fields, not for cursor positioning. The application
* can determine when the end of file or form is reached, and not
* go any further.
* A single mouse will use wdy, "vertical scroll" wheel. */
short wdx, wdy;
} Gpm_Event;
'''
class Gpm_Event(Structure):
_fields_ = [
("buttons", c_ubyte),
("modifiers", c_ubyte),
("vc", c_short),
("dx", c_short),
("dy", c_short),
("x", c_short),
("y", c_short),
("type", c_int),
("clicks", c_int),
("margin", c_int),
("wdx", c_short),
("wdy", c_short)]
'''
int my_handler(Gpm_Event *event, void *data)
{
printf("Event Type : %d at x=%d y=%d\n", event->type, event->x, event->y);
return 0;
}
'''
# CFUNCTYPE(c_int, POINTER(Gpm_Event), c_void_p)
HANDLER_FUNC = CFUNCTYPE(c_int, POINTER(Gpm_Event), POINTER(c_void_p))
# gpm_handler = POINTER(HANDLER_FUNC).in_dll(libgpm, 'gpm_handler')
def my_handler(event:Gpm_Event, data):
# print(f"{event=} {data=} {dir(event)=} {event.contents=}")
# ec = event.contents
ec = event
buttons = ec.buttons
modifiers = ec.modifiers
vc = ec.vc
dx = ec.dx
dy = ec.dy
x = ec.x
y = ec.y
clicks = ec.clicks
wdx = ec.wdx
wdy = ec.wdy
types = []
for t in (tt:={1:"Move",2:"Drag",4:"Down",8:"Up",
16:"Single",32:"Double",64:"Triple",
128:"MFlag",256:"HARD",512:"ENTER",1024:"LEAVE"}):
if t&ec.type:
types.append(tt[t])
margin = {1:"Top",2:"Bottom",4:"Left",8:"Right"}.get(ec.margin,f"UNDEF ({ec.margin})!!!")
print(f"{buttons=}, {modifiers=}, {vc=}, {dx=}, {dy=}, {x=}, {y=}, {types=}, {clicks=}, {margin=}, {wdx=}, {wdy=}")
return 0
'''
int main()
{ Gpm_Connect conn;
int c;
conn.eventMask = ~0; /* Want to know about all the events */
conn.defaultMask = 0; /* don't handle anything by default */
conn.minMod = 0; /* want everything */
conn.maxMod = ~0; /* all modifiers included */
if(Gpm_Open(&conn, 0) == -1)
printf("Cannot connect to mouse server\n");
gpm_handler = my_handler;
while((c = Gpm_Getc(stdin)) != EOF)
printf("%c", c);
Gpm_Close();
return 0;
}
'''
def setEcho(val: bool):
# Set/Unset Terminal Input Echo
(i,o,c,l,isp,osp,cc) = termios.tcgetattr(sys.stdin.fileno())
if val: l |= termios.ECHO
else: l &= ~termios.ECHO
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, [i,o,c,l,isp,osp,cc])
def CRNL(val: bool):
#Translate carriage return to newline on input (unless IGNCR is set).
# '\n' CTRL-J
# '\r' CTRL-M (Enter)
(i,o,c,l,isp,osp,cc) = termios.tcgetattr(sys.stdin.fileno())
if val: i |= termios.ICRNL
else: i &= ~termios.ICRNL
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, [i,o,c,l,isp,osp,cc])
def main():
conn = Gpm_Connect()
c = 0
conn.eventMask = ~0 # Want to know about all the events
conn.defaultMask = 0 # don't handle anything by default
conn.minMod = 0 # want everything
conn.maxMod = ~0 # all modifiers included
print("Open Connection")
if (_gpm_fd := libgpm.Gpm_Open(pointer(conn), 0)) == -1:
print("Cannot connect to mouse server\n")
print("Starting Loop")
print(f"{sys.stdin.fileno()=} {cstdin=}, {_gpm_fd=}")
# setEcho(False)
old_settings = termios.tcgetattr(sys.stdin)
# new_settings = termios.tcgetattr(sys.stdin)
# new_settings[3] &= ~termios.ICANON
# new_settings[3] &= ~termios.ICRNL
# new_settings[3] &= ~termios.ECHO
# termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings)
# tty.setcbreak(sys.stdin)
_fn = sys.stdin.fileno()
tty.setcbreak(_fn)
# Convert file descriptor to Python file object
if _gpm_fd <= 0:
print("GPM XTerm Daemon not supported")
else:
with os.fdopen(_gpm_fd, "r") as gpm_file_obj:
print(f"File obj: {gpm_file_obj=} {gpm_file_obj.fileno()=} ")
_ev = Gpm_Event()
while True:
rlist, _, _ = select( [sys.stdin, gpm_file_obj], [], [] )
if gpm_file_obj in rlist:
libgpm.Gpm_GetEvent(pointer(_ev))
my_handler(_ev, None)
if sys.stdin in rlist:
if (stdinRead := sys.stdin.read(1)) == "\033": # Check if the stream start with an escape sequence
_fl = fcntl.fcntl(_fn, fcntl.F_GETFL)
fcntl.fcntl(_fn, fcntl.F_SETFL, _fl | os.O_NONBLOCK) # Set the input as NONBLOCK to read the full sequence
stdinRead += sys.stdin.read(20) # Check if the stream start with an escape sequence
if stdinRead.startswith("\033[<"): # Clear the buffer if this is a mouse code
sys.stdin.read(0x40)
fcntl.fcntl(_fn, fcntl.F_SETFL, _fl)
out = stdinRead.replace('\033','<ESC>')
print(f"{out=}")
termios.tcsetattr(sys.stdin, termios.TCSANOW, old_settings)
# setEcho(True)
# while c := libgpm.Gpm_Getchar():
# print(f"Key: {c=:04X} ")
print(f"GPM Close()")
libgpm.Gpm_Close()
if __name__ == "__main__":
main()

70
tests/t.generic/test.ctypes.02.fd.conversion.py

@ -0,0 +1,70 @@
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2021 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# Example from:
# https://www.linuxjournal.com/article/4600
# https://stackoverflow.com/questions/3794309/python-ctypes-python-file-object-c-file
import os
import sys
import ctypes
libc = ctypes.cdll.LoadLibrary('libc.so.6') # libc.so.6
# libc = ctypes.CDLL(ctypes.util.find_library("c"))
cstdout = ctypes.c_void_p.in_dll(libc, 'stdout')
cstdin = ctypes.c_void_p.in_dll(libc, 'stdin')
print(f"{cstdout=} {cstdin=}")
print(f"{sys.stdout.fileno()=} {sys.stdin.fileno()=}")
print(f"{sys.stdout=} {sys.stdin=}")
# Get the file descriptor of stdin
# fd = sys.stdin.fileno()
# fd = sys.stdout.fileno()
fd = sys.stderr.fileno()
# Define the prototype of fdopen
libc.fdopen.argtypes = [ctypes.c_int, ctypes.c_char_p]
libc.fdopen.restype = ctypes.c_void_p # FILE * is a void pointer
# Convert to FILE *
mode = b"r" # File mode (read)
file_ptr = libc.fdopen(fd, mode)
print(f"FILE * pointer: {file_ptr}")
# Define the prototype of fileno
libc.fileno.argtypes = [ctypes.c_void_p]
libc.fileno.restype = ctypes.c_int
# Convert FILE * back to file descriptor
fd_back = libc.fileno(file_ptr)
print(f"Back to file descriptor: {fd_back}")
# Convert file descriptor to Python file object
with os.fdopen(fd_back, "r") as file_obj:
print(f"File obj: {file_obj=} {file_obj.fileno()=} ")

11
tests/t.input/test.input.py

@ -26,8 +26,7 @@ import sys, os
import logging
sys.path.append(os.path.join(sys.path[0],'../..'))
from TermTk import TTkLog, TTkK, TTkTerm
from TermTk.TTkCore.TTkTerm.input import TTkInput
from TermTk import TTkLog, TTkK, TTkTerm, TTkInput
def message_handler(mode, context, message):
log = logging.debug
@ -45,10 +44,10 @@ TTkLog.installMessageHandler(message_handler)
TTkLog.info("Retrieve Keyboard, Mouse press/drag/wheel Events")
TTkLog.info("Press q or <ESC> to exit")
TTkTerm.push(TTkTerm.Mouse.ON)
TTkTerm.push(TTkTerm.Mouse.DIRECT_ON)
TTkTerm.push(TTkTerm.SET_BRACKETED_PM)
TTkTerm.setEcho(False)
# TTkTerm.push(TTkTerm.Mouse.ON)
# TTkTerm.push(TTkTerm.Mouse.DIRECT_ON)
# TTkTerm.push(TTkTerm.SET_BRACKETED_PM)
# TTkTerm.setEcho(False)
def winCallback(width, height):
TTkLog.info(f"Resize: w:{width}, h:{height}")

Loading…
Cancel
Save