12 changed files with 717 additions and 1 deletions
@ -0,0 +1,15 @@
|
||||
# Generate the new schema: |
||||
using [datamodel-code-generator](https://github.com/koxudaxi/datamodel-code-generator) |
||||
```bash |
||||
pip install datamodel-code-generator |
||||
``` |
||||
ß |
||||
```bash |
||||
rm libs/DAP/debug_adapter_protocol.py |
||||
datamodel-codegen --input libs/DAP/debugAdapterProtocol.json --input-file-type jsonschema --output libs/DAP/debug_adapter_protocol.py --output-model-type pydantic_v2.BaseModel |
||||
``` |
||||
or |
||||
```bash |
||||
rm libs/DAP/debug_adapter_protocol.py |
||||
datamodel-codegen --url https://microsoft.github.io/debug-adapter-protocol/debugAdapterProtocol.json --output libs/DAP/debug_adapter_protocol.py --output-model-type pydantic_v2.BaseModel |
||||
``` |
||||
@ -0,0 +1,29 @@
|
||||
import asyncio |
||||
from dap_python import DebugAdapterClient |
||||
|
||||
async def main(): |
||||
# Create a Debug Adapter Client |
||||
client = DebugAdapterClient() |
||||
|
||||
# Connect to the debug server |
||||
await client.connect('localhost', 5678) |
||||
|
||||
# Initialize the debug session |
||||
await client.initialize() |
||||
|
||||
# Set a breakpoint |
||||
await client.set_breakpoints('example.py', [10]) |
||||
|
||||
# Launch the debug session |
||||
await client.launch({ |
||||
'program': 'example.py' |
||||
}) |
||||
|
||||
# Continue execution |
||||
await client.continue_() |
||||
|
||||
# Wait for the debug session to end |
||||
await client.wait_for_termination() |
||||
|
||||
# Run the main function |
||||
asyncio.run(main()) |
||||
@ -0,0 +1,72 @@
|
||||
import asyncio |
||||
import websockets |
||||
import json |
||||
|
||||
|
||||
from websockets.sync.client import connect |
||||
|
||||
def hello(): |
||||
print('peppo 1') |
||||
with connect("ws://localhost:12345") as websocket: |
||||
print('peppo 2') |
||||
websocket.send("Hello world!") |
||||
message = websocket.recv() |
||||
print(f"Received: {message}") |
||||
|
||||
hello() |
||||
|
||||
|
||||
async def dap_client(): |
||||
print('pippo1') |
||||
async with websockets.connect("ws://localhost:12345") as websocket: |
||||
# Initialize the debug session |
||||
print('pippo2') |
||||
initialize_request = { |
||||
"seq": 1, |
||||
"type": "request", |
||||
"command": "initialize", |
||||
"arguments": {} |
||||
} |
||||
await websocket.send(json.dumps(initialize_request)) |
||||
response = await websocket.recv() |
||||
print("Initialize response:", response) |
||||
|
||||
# Set a breakpoint |
||||
set_breakpoints_request = { |
||||
"seq": 2, |
||||
"type": "request", |
||||
"command": "setBreakpoints", |
||||
"arguments": { |
||||
"source": {"path": "example.py"}, |
||||
"breakpoints": [{"line": 10}] |
||||
} |
||||
} |
||||
await websocket.send(json.dumps(set_breakpoints_request)) |
||||
response = await websocket.recv() |
||||
print("Set breakpoints response:", response) |
||||
|
||||
# Launch the debug session |
||||
launch_request = { |
||||
"seq": 3, |
||||
"type": "request", |
||||
"command": "launch", |
||||
"arguments": { |
||||
"program": "example.py" |
||||
} |
||||
} |
||||
await websocket.send(json.dumps(launch_request)) |
||||
response = await websocket.recv() |
||||
print("Launch response:", response) |
||||
|
||||
# Continue execution |
||||
continue_request = { |
||||
"seq": 4, |
||||
"type": "request", |
||||
"command": "continue", |
||||
"arguments": {} |
||||
} |
||||
await websocket.send(json.dumps(continue_request)) |
||||
response = await websocket.recv() |
||||
print("Continue response:", response) |
||||
|
||||
asyncio.run(dap_client()) |
||||
@ -0,0 +1,20 @@
|
||||
# example.py |
||||
import debugpy |
||||
|
||||
# Start the debug server |
||||
debugpy.listen(("localhost", 64321)) |
||||
print("Waiting for debugger to attach...") |
||||
debugpy.wait_for_client() |
||||
print("Attached!!!") |
||||
|
||||
def add(a, b): |
||||
return a + b |
||||
|
||||
def main(): |
||||
x = 10 |
||||
y = 20 |
||||
result = add(x, y) |
||||
print(f"The result is {result}") |
||||
|
||||
if __name__ == "__main__": |
||||
main() |
||||
@ -0,0 +1,20 @@
|
||||
import asyncio |
||||
from dap import AsyncServer |
||||
|
||||
async def main(): |
||||
print("Eugenio") |
||||
server = AsyncServer("debugpy", port=64321) |
||||
# server = AsyncServer("debugpy", port=53430) |
||||
# server = AsyncServer("debugpy", port=12345) |
||||
print("Parodi") |
||||
try: |
||||
await server.start() |
||||
print(f"a - {server}") |
||||
except asyncio.CancelledError: |
||||
print(f"b - {server}") |
||||
await server.stop() |
||||
print(f"c - {server}") |
||||
|
||||
if __name__ == "__main__": |
||||
asyncio.run(main()) |
||||
print('END') |
||||
@ -0,0 +1,13 @@
|
||||
import asyncio |
||||
from dap import AsyncServer |
||||
|
||||
async def debug_session(): |
||||
server = AsyncServer("debugpy", port=12345) |
||||
await server.start() |
||||
client = server.client |
||||
|
||||
client.launch() |
||||
client.disconnect() |
||||
server.stop() |
||||
|
||||
asyncio.run(debug_session()) |
||||
@ -0,0 +1,84 @@
|
||||
import asyncio |
||||
import json |
||||
import re |
||||
|
||||
async def send_request(writer, request): |
||||
content = json.dumps(request) |
||||
header = f"Content-Length: {len(content)}\r\n\r\n" |
||||
message = header + content |
||||
print("Sending:") |
||||
print(" header:", header) |
||||
print(" content:", content) |
||||
writer.write(message.encode('utf-8')) |
||||
await writer.drain() |
||||
|
||||
|
||||
header_re = re.compile(r'^Content-Length: (\d+)[\r\n]*', re.MULTILINE) |
||||
|
||||
async def receive_response(reader): |
||||
response = await reader.read(4096) |
||||
response_string = response.decode('utf-8') |
||||
while m:=header_re.search(response_string): |
||||
header = m.group(0) |
||||
header_length = len(header) |
||||
content_length = int(m.group(1)) |
||||
content = response_string[header_length:header_length+content_length] |
||||
response_string = response_string[header_length+content_length:] |
||||
print("Received:") |
||||
print(" header length:", header_length) |
||||
# print(" content:", content) |
||||
print(" json:", json.loads(content)) |
||||
return {} |
||||
|
||||
async def main(): |
||||
host = 'localhost' |
||||
port = 64321 |
||||
|
||||
reader, writer = await asyncio.open_connection(host, port) |
||||
print("Connected to the debug adapter server.") |
||||
|
||||
# Example request: Initialize |
||||
initialize_request = { |
||||
'seq': 1, |
||||
'type': 'request', |
||||
'command': 'initialize', |
||||
'arguments': { |
||||
'clientID': 'example-client', |
||||
'adapterID': 'debugpy', |
||||
'pathFormat': 'path', |
||||
|
||||
"locale": "en", |
||||
|
||||
'linesStartAt1': True, |
||||
'columnsStartAt1': True, |
||||
|
||||
'supportsVariableType': True, |
||||
'supportsVariablePaging': True, |
||||
'supportsRunInTerminalRequest': True, |
||||
"supportsRunInTerminalRequest": True, |
||||
"supportsProgressReporting": True, |
||||
"supportsInvalidatedEvent": True, |
||||
"supportsMemoryReferences": True, |
||||
"supportsArgsCanBeInterpretedByShell": True, |
||||
"supportsMemoryEvent": True, |
||||
"supportsStartDebuggingRequest": True, |
||||
"supportsANSIStyling": True |
||||
} |
||||
} |
||||
|
||||
await send_request(writer, initialize_request) |
||||
response = await receive_response(reader) |
||||
print("Response 1:", response) |
||||
|
||||
response = await receive_response(reader) |
||||
print("Response 2:", response) |
||||
|
||||
response = await receive_response(reader) |
||||
print("Response 3:", response) |
||||
|
||||
# Close the connection |
||||
writer.close() |
||||
await writer.wait_closed() |
||||
|
||||
if __name__ == "__main__": |
||||
asyncio.run(main()) |
||||
@ -0,0 +1,120 @@
|
||||
# run the test which open the debubpy on the port 64321 |
||||
# tests/t.generic/test.generic.013.DAP.Debugpy.05.client.01.py |
||||
|
||||
import asyncio |
||||
import json |
||||
import re |
||||
|
||||
async def send_request(writer, request): |
||||
content = json.dumps(request) |
||||
header = f"Content-Length: {len(content)}\r\n\r\n" |
||||
message = header + content |
||||
print("Sending:") |
||||
print(" header:", header) |
||||
print(" content:", content) |
||||
writer.write(message.encode('utf-8')) |
||||
await writer.drain() |
||||
|
||||
header_re = re.compile(r'^Content-Length: (\d+)[\r\n]*', re.MULTILINE) |
||||
|
||||
async def receive_response(reader): |
||||
response = await reader.read(4096) |
||||
response_string = response.decode('utf-8') |
||||
ret = [] |
||||
while m:=header_re.search(response_string): |
||||
header = m.group(0) |
||||
header_length = len(header) |
||||
content_length = int(m.group(1)) |
||||
content = response_string[header_length:header_length+content_length] |
||||
response_string = response_string[header_length+content_length:] |
||||
print("Received:") |
||||
print(" header length:", header_length) |
||||
# print(" content:", content) |
||||
print(" json:", jcontent:=json.loads(content)) |
||||
ret.append(jcontent) |
||||
return ret |
||||
|
||||
|
||||
def sm_initialize(): |
||||
return { |
||||
'seq': 1, |
||||
'type': 'request', |
||||
'command': 'initialize', |
||||
'arguments': { |
||||
'clientID': 'example-client', |
||||
'adapterID': 'debugpy', |
||||
'pathFormat': 'path', |
||||
|
||||
"locale": "en", |
||||
|
||||
'linesStartAt1': True, |
||||
'columnsStartAt1': True, |
||||
|
||||
'supportsVariableType': True, |
||||
'supportsVariablePaging': True, |
||||
'supportsRunInTerminalRequest': True, |
||||
"supportsRunInTerminalRequest": True, |
||||
"supportsProgressReporting": True, |
||||
"supportsInvalidatedEvent": True, |
||||
"supportsMemoryReferences": True, |
||||
"supportsArgsCanBeInterpretedByShell": True, |
||||
"supportsMemoryEvent": True, |
||||
"supportsStartDebuggingRequest": True, |
||||
"supportsANSIStyling": True |
||||
} |
||||
} |
||||
|
||||
def sm_attach(): |
||||
return { |
||||
"command":"attach", |
||||
"arguments":{ |
||||
"name":"Python Debugger: Attach", |
||||
"type":"debugpy", |
||||
"request":"attach", |
||||
"connect":{ |
||||
"host":"localhost", |
||||
"port":64321}, |
||||
"__configurationTarget":6, |
||||
"clientOS":"unix", |
||||
"debugOptions":["RedirectOutput","ShowReturnValue"], |
||||
"justMyCode":True, |
||||
"showReturnValue":True, |
||||
"workspaceFolder":"/Users/epd02/github/Varie/pyTermTk.003", |
||||
# "__sessionId":"7a6b4b23-74ac-41b3-8f89-4b13e9061b5d" |
||||
"__sessionId":"aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" |
||||
}, |
||||
"type":"request", |
||||
"seq": 2 |
||||
} |
||||
|
||||
async def sm_loop(reader, writer): |
||||
while response := await receive_response(reader): |
||||
for content in response: |
||||
if 'type' not in content: return |
||||
if content['type'] == 'event': |
||||
if content['event'] == 'output': |
||||
print("Response:", content['body']) |
||||
if content['event'] == 'debugpyWaitingForServer': |
||||
print("Terminated") |
||||
return |
||||
if content['type'] == 'response': |
||||
if content['command'] == 'initialize': |
||||
await send_request(writer, sm_attach()) |
||||
|
||||
async def main(): |
||||
host = 'localhost' |
||||
port = 64321 |
||||
|
||||
reader, writer = await asyncio.open_connection(host, port) |
||||
print("Connected to the debug adapter server.") |
||||
|
||||
await send_request(writer, sm_initialize()) |
||||
|
||||
await sm_loop(reader, writer) |
||||
|
||||
# Close the connection |
||||
writer.close() |
||||
await writer.wait_closed() |
||||
|
||||
if __name__ == "__main__": |
||||
asyncio.run(main()) |
||||
@ -0,0 +1,148 @@
|
||||
# run the test which open the debubpy on the port 64321 |
||||
# tests/t.generic/test.generic.013.DAP.Debugpy.05.client.01.py |
||||
|
||||
import asyncio |
||||
import json |
||||
import re |
||||
import uuid |
||||
|
||||
|
||||
class ClientDAP(): |
||||
__slots__ = ('_host', '_port', |
||||
'_seq', |
||||
'_reader', '_writer') |
||||
_reader:asyncio.StreamReader |
||||
_writer:asyncio.StreamWriter |
||||
def __init__(self, host:str, port:int) -> None: |
||||
self._host = host |
||||
self._port = port |
||||
self._seq = 1 |
||||
self._reader = None |
||||
self._writer = None |
||||
|
||||
async def open(self) -> None: |
||||
self._reader, self._writer = await asyncio.open_connection(self._host, self._port) |
||||
|
||||
async def close(self) -> None: |
||||
self._writer.close() |
||||
await self._writer.wait_closed() |
||||
|
||||
async def send_request(self, request:dict) -> None: |
||||
request = request.copy() | {'seq': self._seq} |
||||
self._seq += 1 |
||||
content = json.dumps(request) |
||||
header = f"Content-Length: {len(content)}\r\n\r\n" |
||||
message = header + content |
||||
print("Sending:") |
||||
print(" header:", header) |
||||
print(" content:", content) |
||||
self._writer.write(message.encode('utf-8')) |
||||
await self._writer.drain() |
||||
|
||||
_header_re = re.compile(r'^Content-Length: (\d+)[\r\n]*', re.MULTILINE) |
||||
|
||||
async def receive_response(self) -> list[dict]: |
||||
response = await self._reader.read(4096) |
||||
response_string = response.decode('utf-8') |
||||
ret = [] |
||||
while m:=self._header_re.search(response_string): |
||||
header = m.group(0) |
||||
header_length = len(header) |
||||
content_length = int(m.group(1)) |
||||
content = response_string[header_length:header_length+content_length] |
||||
response_string = response_string[header_length+content_length:] |
||||
print("Received:") |
||||
print(" header length:", header_length) |
||||
# print(" content:", content) |
||||
print(" json:", jcontent:=json.loads(content)) |
||||
ret.append(jcontent) |
||||
return ret |
||||
|
||||
|
||||
def sm_configuration_done() -> dict: |
||||
return { |
||||
'type': 'request', |
||||
'command': 'configurationDone', |
||||
'arguments': {} |
||||
} |
||||
|
||||
def sm_initialize() -> dict: |
||||
return { |
||||
'type': 'request', |
||||
'command': 'initialize', |
||||
'arguments': { |
||||
'clientID': 'example-client', |
||||
'adapterID': 'debugpy', |
||||
'pathFormat': 'path', |
||||
|
||||
"locale": "en", |
||||
|
||||
'linesStartAt1': True, |
||||
'columnsStartAt1': True, |
||||
|
||||
'supportsVariableType': True, |
||||
'supportsVariablePaging': True, |
||||
'supportsRunInTerminalRequest': True, |
||||
"supportsRunInTerminalRequest": True, |
||||
"supportsProgressReporting": True, |
||||
"supportsInvalidatedEvent": True, |
||||
"supportsMemoryReferences": True, |
||||
"supportsArgsCanBeInterpretedByShell": True, |
||||
"supportsMemoryEvent": True, |
||||
"supportsStartDebuggingRequest": True, |
||||
"supportsANSIStyling": True |
||||
} |
||||
} |
||||
|
||||
def sm_attach() -> dict: |
||||
return { |
||||
"command":"attach", |
||||
"type":"request", |
||||
"arguments":{ |
||||
"name":"Python Debugger: Attach", |
||||
"type":"debugpy", |
||||
"request":"attach", |
||||
"connect":{ |
||||
"host":"localhost", |
||||
"port":64321}, |
||||
"__configurationTarget":6, |
||||
"clientOS":"unix", |
||||
"debugOptions":["RedirectOutput","ShowReturnValue"], |
||||
"justMyCode":True, |
||||
"showReturnValue":True, |
||||
"workspaceFolder":"/Users/epd02/github/Varie/pyTermTk.003", |
||||
# "__sessionId":"7a6b4b23-74ac-41b3-8f89-4b13e9061b5d" |
||||
"__sessionId":str(uuid.uuid4()) |
||||
}, |
||||
} |
||||
|
||||
async def sm_loop(client:ClientDAP): |
||||
while response := await client.receive_response(): |
||||
for content in response: |
||||
if 'type' not in content: return |
||||
elif content['type'] == 'event': |
||||
if content['event'] == 'output': |
||||
print("Response:", content['body']) |
||||
elif content['event'] == 'debugpyWaitingForServer': |
||||
await client.send_request(sm_configuration_done()) |
||||
elif content['event'] == 'terminated': |
||||
print("Terminated") |
||||
return |
||||
elif content['type'] == 'response': |
||||
if content['command'] == 'initialize': |
||||
await client.send_request(sm_attach()) |
||||
|
||||
async def main(): |
||||
host = 'localhost' |
||||
port = 64321 |
||||
|
||||
client = ClientDAP(host, port) |
||||
|
||||
await client.open() |
||||
await client.send_request(sm_initialize()) |
||||
await sm_loop(client) |
||||
await client.close() |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
asyncio.run(main()) |
||||
@ -0,0 +1,153 @@
|
||||
# run the test which open the debubpy on the port 64321 |
||||
# tests/t.generic/test.generic.013.DAP.Debugpy.05.client.01.py |
||||
|
||||
import asyncio |
||||
import json |
||||
import re |
||||
import uuid |
||||
import os, sys |
||||
|
||||
sys.path.append(os.path.join(sys.path[0],'../../libs')) |
||||
import DAP as dap |
||||
|
||||
class ClientDAP(): |
||||
__slots__ = ('_host', '_port', |
||||
'_seq', |
||||
'_reader', '_writer') |
||||
_reader:asyncio.StreamReader |
||||
_writer:asyncio.StreamWriter |
||||
def __init__(self, host:str, port:int) -> None: |
||||
self._host = host |
||||
self._port = port |
||||
self._seq = 1 |
||||
self._reader = None |
||||
self._writer = None |
||||
|
||||
async def open(self) -> None: |
||||
self._reader, self._writer = await asyncio.open_connection(self._host, self._port) |
||||
|
||||
async def close(self) -> None: |
||||
self._writer.close() |
||||
await self._writer.wait_closed() |
||||
|
||||
async def send_request(self, request) -> None: |
||||
request.seq = self._seq |
||||
self._seq += 1 |
||||
content = request.json() |
||||
header = f"Content-Length: {len(content)}\r\n\r\n" |
||||
message = header + content |
||||
print("Sending:") |
||||
print(" header:", header) |
||||
print(" content:", content) |
||||
self._writer.write(message.encode('utf-8')) |
||||
await self._writer.drain() |
||||
|
||||
_header_re = re.compile(r'^Content-Length: (\d+)[\r\n]*', re.MULTILINE) |
||||
|
||||
async def receive_response(self) -> list[dict]: |
||||
response = await self._reader.read(4096) |
||||
response_string = response.decode('utf-8') |
||||
ret = [] |
||||
while m:=self._header_re.search(response_string): |
||||
header = m.group(0) |
||||
header_length = len(header) |
||||
content_length = int(m.group(1)) |
||||
content = response_string[header_length:header_length+content_length] |
||||
response_string = response_string[header_length+content_length:] |
||||
print("Received:") |
||||
print(" header length:", header_length) |
||||
# print(" content:", content) |
||||
print(" json:", jcontent:=json.loads(content)) |
||||
ret.append(jcontent) |
||||
return ret |
||||
|
||||
|
||||
def sm_configuration_done() -> dap.ConfigurationDoneRequest: |
||||
return dap.ConfigurationDoneRequest.parse_obj({ |
||||
'seq':0, |
||||
'type':'request', |
||||
'command':'configurationDone' |
||||
}) |
||||
|
||||
def sm_initialize() -> dap.InitializeRequest: |
||||
return dap.InitializeRequest.parse_obj({ |
||||
'seq': 0, |
||||
'type': 'request', |
||||
'command': 'initialize', |
||||
'arguments': { |
||||
'clientID': 'example-client', |
||||
'adapterID': 'debugpy', |
||||
'pathFormat': 'path', |
||||
|
||||
"locale": "en", |
||||
|
||||
'linesStartAt1': True, |
||||
'columnsStartAt1': True, |
||||
|
||||
'supportsVariableType': True, |
||||
'supportsVariablePaging': True, |
||||
'supportsRunInTerminalRequest': True, |
||||
"supportsRunInTerminalRequest": True, |
||||
"supportsProgressReporting": True, |
||||
"supportsInvalidatedEvent": True, |
||||
"supportsMemoryReferences": True, |
||||
"supportsArgsCanBeInterpretedByShell": True, |
||||
"supportsMemoryEvent": True, |
||||
"supportsStartDebuggingRequest": True, |
||||
"supportsANSIStyling": True |
||||
} |
||||
}) |
||||
|
||||
def sm_attach() -> dap.AttachRequest: |
||||
return dap.AttachRequest.parse_obj({ |
||||
'seq': 0, |
||||
"command":"attach", |
||||
"type":"request", |
||||
"arguments":{ |
||||
"name":"Python Debugger: Attach", |
||||
"type":"debugpy", |
||||
"request":"attach", |
||||
"connect":{ |
||||
"host":"localhost", |
||||
"port":64321}, |
||||
"__configurationTarget":6, |
||||
"clientOS":"unix", |
||||
"debugOptions":["RedirectOutput","ShowReturnValue"], |
||||
"justMyCode":True, |
||||
"showReturnValue":True, |
||||
"workspaceFolder":"/Users/epd02/github/Varie/pyTermTk.003", |
||||
# "__sessionId":"7a6b4b23-74ac-41b3-8f89-4b13e9061b5d" |
||||
"__sessionId":str(uuid.uuid4()) |
||||
}, |
||||
}) |
||||
|
||||
async def sm_loop(client:ClientDAP): |
||||
while response := await client.receive_response(): |
||||
for content in response: |
||||
if 'type' not in content: return |
||||
elif content['type'] == 'event': |
||||
if content['event'] == 'output': |
||||
print("Response:", content['body']) |
||||
elif content['event'] == 'debugpyWaitingForServer': |
||||
await client.send_request(sm_configuration_done()) |
||||
elif content['event'] == 'terminated': |
||||
print("Terminated") |
||||
return |
||||
elif content['type'] == 'response': |
||||
if content['command'] == 'initialize': |
||||
await client.send_request(sm_attach()) |
||||
|
||||
async def main(): |
||||
host = 'localhost' |
||||
port = 64321 |
||||
|
||||
client = ClientDAP(host, port) |
||||
|
||||
await client.open() |
||||
await client.send_request(sm_initialize()) |
||||
await sm_loop(client) |
||||
await client.close() |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
asyncio.run(main()) |
||||
@ -0,0 +1,35 @@
|
||||
import sys |
||||
import tty |
||||
import termios |
||||
|
||||
def enable_mouse_tracking(): |
||||
# Enable SGR mouse mode (1006) and extended mouse mode (1015) |
||||
sys.stdout.write("\033[?1003h\033[?1015h\033[?1006h") |
||||
sys.stdout.flush() |
||||
|
||||
def disable_mouse_tracking(): |
||||
sys.stdout.write("\033[?1003l\033[?1015l\033[?1006l") |
||||
sys.stdout.flush() |
||||
|
||||
def read_mouse_events(): |
||||
fd = sys.stdin.fileno() |
||||
old_settings = termios.tcgetattr(fd) |
||||
try: |
||||
tty.setcbreak(fd) |
||||
event = "" |
||||
while True: |
||||
event += sys.stdin.read(1) |
||||
if event[-1]=='\033': |
||||
print(f"Mouse event: {event[:-1].replace('\033','<ESC>')}") |
||||
event = '\033' |
||||
finally: |
||||
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) |
||||
disable_mouse_tracking() |
||||
|
||||
if __name__ == "__main__": |
||||
enable_mouse_tracking() |
||||
try: |
||||
read_mouse_events() |
||||
except KeyboardInterrupt: |
||||
disable_mouse_tracking() |
||||
print("\nMouse tracking disabled.") |
||||
Loading…
Reference in new issue