From 669c9bfebd520032e82cac805bf09a5e29c5d6e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Barcicki?= Date: Mon, 29 Dec 2025 11:30:36 +0100 Subject: [PATCH] [#85496] Use sockets for communication --- README.md | 5 +- common.py | 6 +- sargraph.py | 163 +++++---- watch.py | 971 +++++++++++++++++++++++++++------------------------- 4 files changed, 605 insertions(+), 540 deletions(-) diff --git a/README.md b/README.md index 18a1ca5..96c2835 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # sargraph -Copyright (c) 2019-2023 [Antmicro](https://www.antmicro.com) +Copyright (c) 2019-2026 [Antmicro](https://www.antmicro.com) This is a simple python tool that uses "sysstat" ("sar") to save information on CPU, RAM and disk usage. The process runs in background and can be controlled with a set of sargraph sub-commands. @@ -10,7 +10,7 @@ Supported plot formats are PNG, SVG and ASCII, they are determined by filename e # Install requirements -The sargraph requires `gnuplot`, `sysstat` (`sar`), `python3`, `coreutils` and `screen` to operate. +The sargraph requires `gnuplot`, `sysstat` (`sar`), `python3` and `coreutils` to operate. In Debian you can install them with: ``` @@ -22,7 +22,6 @@ apt-get install -qqy --no-install-recommends \ gnuplot-nox \ python3 \ python3-pip \ - screen \ sysstat # install Python dependencies diff --git a/common.py b/common.py index 5eccb80..3a01b62 100644 --- a/common.py +++ b/common.py @@ -39,7 +39,11 @@ def run_or_fail(*argv, **kwargs): # Check if a process is running def pid_running(pid): - return os.path.exists(f"/proc/{pid}") + return file_exists(f"/proc/{pid}") + +# Check whether path exists +def file_exists(filename: str): + return os.path.exists(filename) # Convert a string to float, also when the separator is a comma diff --git a/sargraph.py b/sargraph.py index fdb5c01..f7ac1a8 100755 --- a/sargraph.py +++ b/sargraph.py @@ -11,12 +11,13 @@ import time import graph import watch +import warnings from common import * # Declare and parse command line flags parser = argparse.ArgumentParser() -parser.add_argument('session', metavar='SESSION-NAME', type=str, nargs='?', default=None, help='sargraph session name') +parser.add_argument('session', metavar='SESSION-NAME', type=str, nargs='?', help='sargraph session name') parser.add_argument('command', metavar='COMMAND', type=str, nargs='*', help='send command') parser.add_argument('-f', metavar='DEVICE-NAME', type=str, nargs='?', default=None, dest='fsdev', help='observe a chosen filesystem') parser.add_argument('-m', metavar='MOUNT-DIR', type=str, nargs='?', default=None, dest='fspath', help='observe a chosen filesystem') @@ -26,26 +27,19 @@ parser.add_argument('-t', metavar='TMPFS-COLOR', type=str, nargs='?', defa parser.add_argument('-c', metavar='CACHE-COLOR', type=str, nargs='?', default='#ee7af0', dest='cache', help='set cache plot color' ) parser.add_argument('-u', metavar='UDP', type=str, nargs='?', default=None, dest='udp', help='set udp server address') parser.add_argument('-C', metavar='UDP_COOKIE', type=str, nargs='?', default=None, dest='udp_cookie', help='set udp message cookie') -parser.add_argument('-p', action='store_true', dest='psutil', help='use psutil instead of sar') +parser.add_argument('-p', action='store_true', dest='psutil', help='use psutil instead of sar') args = parser.parse_args() -def send(sid, msg): - p = subprocess.Popen(["screen", "-S", sid, "-X", "stuff", f"{msg}\n"]) - while p.poll() is None: - time.sleep(0.1) - -# Check if sar is available -if not is_darwin(): - p = run_or_fail("sar", "-V", stdout=subprocess.PIPE) +def send(session: str, message: str): + sock, socket_path = watch.get_socket(session) + if not file_exists(socket_path): + fail(f"Session '{session}' does not exist") -# Check if screen is available -p = run_or_fail("screen", "-v", stdout=subprocess.PIPE) -version = scan("Screen version (\\d+)", int, p.stdout.readline().decode()) -if version is None: - fail("'screen' tool returned unknown output") + sock.connect(socket_path) + sock.send(message.encode("utf-8")) + sock.close() -# If the script was run with no parameters, run in background and gather data -if args.session is None: +def create_session(): # Find requested disk device if args.fspath: args.fspath = os.path.realpath(args.fspath) @@ -53,69 +47,98 @@ if args.session is None: while args.fsdev is None: args.fsdev = scan(f"^(/dev/\\S+)\\s+{re.escape(args.fspath)}\\s+", str, f.readline()) if not args.fsdev: - fail(f"no device is mounted on {args.fspath}") + fail(f"No device is mounted on {args.fspath}") - watch.watch(args.name, args.fsdev, args.iface, args.tmpfs, args.cache, args.psutil, args.udp, args.udp_cookie) + params = (args.session, args.fsdev, args.iface, args.tmpfs, args.cache, args.udp, args.udp_cookie) + if is_darwin() or args.psutil: + watcher = watch.PsUtilWatcher(*params) + else: + watcher = watch.SarWatcher(*params) + + watcher.start() sys.exit(0) -# Now handle the commands +# Check if sar is available +if not is_darwin(): + p = run_or_fail("sar", "-V", stdout=subprocess.PIPE) -# Check if a command was provided -if len(args.command) <= 0: - fail("command not provided") +if args.name != "data": + warnings.warn("'-o' is deprecated, session name is default output base name") -# Get session name and command name -sid = args.session -cmd = args.command +# Check if a command was provided, if that session exists, yell at user for lack of commands, else spawn +if len(args.command) == 0: + sock, socket_path = watch.get_socket(args.session) + if file_exists(socket_path): + fail("Command not provided") + + else: + print(f"Starting sargraph session '{args.session}'") + create_session() + +if args.command[0] == "start": + sock, socket_path = watch.get_socket(args.session) + if file_exists(socket_path): + fail("Session with this name already exists") + + # Start watcher process + p = subprocess.Popen( + args=[sys.executable, os.path.realpath(__file__), args.session, *sys.argv[3:]], + stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, + start_new_session=True + ) + + # Spinloop to see whether the subprocess even starts + attempts = 5 + while attempts: + time.sleep(0.1) + if file_exists(socket_path): + print(f"Session '{args.session}' started") + sys.exit(0) + attempts -= 1 + + fail("Session did not start") + +elif args.command[0] == "stop": + _, socket_path = watch.get_socket(args.session) + + print(f"Terminating sargraph session '{args.session}'") + if len(args.command) < 2: + send(args.session, "command:q:") + else: + send(args.session, f"command:q:{args.command[1]}") -if cmd[0] == "start": - print(f"Starting sargraph session '{sid}'") + # Spinloop to see whether the subprocess even dies + attempts = 5 + while attempts: + time.sleep(0.5) + if not file_exists(socket_path): + print(f"Session '{args.session}' killed") + sys.exit(0) + attempts -= 1 + + fail("Session did not respond") - # Spawn watcher process, *sys.argv[3:] is all arguments after 'chart start' + '-o [log name]' if not given - if "-o" not in sys.argv: - sys.argv += ["-o", sid] - p = subprocess.Popen(["screen", "-Logfile", f"{sid}.log", "-dmSL", sid, os.path.realpath(__file__), *sys.argv[3:]]) - while p.poll() is None: - time.sleep(0.1) - gpid = 0 - j = 0 - time.sleep(1) - print(f"Session '{sid}' started") -elif cmd[0] == "stop": - print(f"Terminating sargraph session '{sid}'") - - try: - gpid = int(os.popen(f"screen -ls | grep '.{sid}' | tr -d ' \t' | cut -f 1 -d '.'").read()) - except: - print("Warning: cannot find pid.") - gpid = -1 - if len(cmd) < 2: - send(sid, "command:q:") - else: - send(sid, f"command:q:{cmd[1]}") - if gpid == -1: - print("Waiting 3 seconds.") - time.sleep(3) - else: - while pid_running(gpid): - time.sleep(0.25) -elif cmd[0] == "label": +elif args.command[0] == "label": # Check if the label name was provided - if len(cmd) < 2: + if len(args.command) < 2: fail("label command requires an additional parameter") - print(f"Adding label '{cmd[1]}' to sargraph session '{sid}'.") - send(sid, f"label:{cmd[1]}") -elif cmd[0] == 'save': - print(f"Saving graph from session '{sid}'.") - if len(cmd) < 2: - send(sid, "command:s:") + + print(f"Adding label '{args.command[1]}' to sargraph session '{args.session}'.") + send(args.session, f"label:{args.command[1]}") + + +elif args.command[0] == 'save': + print(f"Saving graph from session '{args.session}'.") + if len(args.command) < 2: + send(args.session, "command:s:") else: - send(sid, f"command:s:{cmd[1]}") -elif cmd[0] == 'plot': - if len(cmd) < 2: - graph.graph(sid, args.tmpfs, args.cache) + send(args.session, f"command:s:{args.command[1]}") + +elif args.command[0] == 'plot': + if len(args.command) < 2: + graph.graph(args.session, args.tmpfs, args.cache) else: - graph.graph(sid, args.tmpfs, args.cache, cmd[1]) + graph.graph(args.session, args.tmpfs, args.cache, args.command[1]) else: - fail(f"unknown command '{cmd[0]}'") + fail(f"unknown command '{args.command[0]}'") diff --git a/watch.py b/watch.py index 69488fe..0a628ae 100644 --- a/watch.py +++ b/watch.py @@ -7,29 +7,25 @@ import datetime -import fcntl import os -import re import select import signal import subprocess -import sys import time import psutil import sched import platform import logging -from threading import Thread, Lock -import threading +import socket +import abc +import traceback +from threading import Thread from logging.handlers import DatagramHandler import graph from common import * -die = 0 - - # Initialize summary variables SAMPLE_NUMBER = 0 TOTAL_RAM = 0 @@ -56,11 +52,6 @@ FS_SAR_INDEX = None IFACE_NAME = None IFACE_SAR_INDEX = None -# Handle SIGTERM -def kill_handler(a, b): - global die - die = 1 - class UDPHandler(DatagramHandler): def emit(self, msg): try: @@ -70,10 +61,6 @@ class UDPHandler(DatagramHandler): except Exception as e: pass - -logger = logging.getLogger("sargraph") -logger.setLevel(logging.INFO) - # Read a single table from sar output def read_table(psar): # Find the header @@ -114,489 +101,541 @@ def read_iface_stats(iface): tx = scan(r"(\d+)", int, f.readline()) return rx, tx +def get_socket(session): + # TODO: when using on other platforms make sure this path exist; namely windows + path = f"/tmp/sargraph-{session}.sock" + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + return sock, path -# Initialize 'data.txt' where the data is dumped -def initialize(session, machine): - global TOTAL_RAM - global TOTAL_GPU_RAM - with open("/proc/meminfo") as f: - TOTAL_RAM = int(scan("MemTotal:\s+(\d+)", float, f.read())) +class Watcher(abc.ABC): - uname = machine.split(" ")[0:2] - uname = f"{uname[0]} {uname[1]}" + def __init__(self, session, fsdev, iface, tmpfs_color, other_cache_color, udp=None, udp_cookie=None): + super().__init__() - cpus = int(machine.split(" CPU)")[0].split("(")[-1]) + self.session = session - cpu_name = "unknown" + self.fsdev = fsdev + self.iface = iface + self.tmpfs_color = tmpfs_color + self.other_cache_color = other_cache_color - with open("/proc/cpuinfo") as f: - for line in f: - if "model name" in line: - cpu_name = line.replace("\n", "").split(": ")[1] - break - header = [ - f"# sargraph version: {SARGRAPH_VERSION}", - f"pid: {os.getpid()}", - f"machine: {uname}", - f"cpu count: {cpus}", - f"cpu: {cpu_name}" - ] - try: - pgpu = subprocess.run( - 'nvidia-smi --query-gpu=name,driver_version,memory.total --format=csv,noheader,nounits'.split(' '), - capture_output=True - ) - if pgpu.returncode == 0: - gpuname, gpudriver, memory_total = pgpu.stdout.decode('utf-8').rsplit(', ', 2) - header.extend([ - f"gpu: {gpuname}", - f"gpu driver: {gpudriver}" - ]) - TOTAL_GPU_RAM = int(memory_total) - except Exception as e: - print(e) - pass + self.logger = logging.getLogger("sargraph") + self.logger.setLevel(logging.INFO) - logger.info(", ".join(header)) + file_handler = logging.FileHandler(f"{session}.txt") + file_handler.setFormatter(logging.Formatter("%(message)s")) + self.logger.addHandler(file_handler) -def initialize_darwin(session): - global TOTAL_RAM - global TOTAL_GPU_RAM + self.socket, self.socket_path = get_socket(self.session) - TOTAL_RAM = int(psutil.virtual_memory().total / 1024) + # Was a graph alreay produced by save command from sargraph? + self.dont_plot = False - cpus = psutil.cpu_count(logical=True) + # Should we die? + self.die = False - cpu_name = platform.processor() or "unknown" - - header = [ - f"# psutil version: {psutil.__version__}", - f"pid: {os.getpid()}", - f"machine: {platform.system()}", - f"cpu count: {cpus}", - f"cpu: {cpu_name}" - ] - logger.info(", ".join(header)) + if udp is not None: + spl = udp.rsplit(':', 1) + udp_handler = UDPHandler(spl[0], int(spl[1])) + if udp_cookie is None: + udp_handler.setFormatter(logging.Formatter("%(message)s\n")) + else: + udp_handler.setFormatter(logging.Formatter(f"[{udp_cookie}] %(message)s\n")) + self.logger.addHandler(udp_handler) + # Initialize 'data.txt' where the data is dumped + @abc.abstractmethod + def initialize(self, machine): + pass -# Add a summary comment to 'data.txt' -def summarize(session): - # Is there anything to be summarized? - if SAMPLE_NUMBER == 0: - return + # Implement the watcher method + @abc.abstractmethod + def watch(self): + pass - average_load = TOTAL_LOAD / float(SAMPLE_NUMBER) - max_used_ram = MAX_USED_RAM * 1024.0 - total_ram = TOTAL_RAM * 1024.0 - max_used_fs = MAX_USED_FS * 1024.0 * 1024.0 - total_fs = TOTAL_FS * 1024 * 1024 - max_tx = MAX_TX / 128 # kB/s to Mb/s - max_rx = MAX_RX / 128 # kB/s to Mb/s - total_tx = END_TX-START_TX - total_rx = END_RX-START_RX - - sdt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d %H:%M:%S') - edt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d %H:%M:%S') - delta_t = (edt - sdt).total_seconds() - - summary = [ - f"# total ram: {total_ram:.2f} B", - f"total disk space: {total_fs:.2f} B", - f"max ram used: {max_used_ram:.2f} B", - f"max disk used: {max_used_fs:.2f} B", - f"average load: {average_load:.2f} %", - f"observed disk: {FS_NAME}", - f"max received: {max_rx:.2f} Mb/s", - f"max sent: {max_tx:.2f} Mb/s", - f"observed network: {IFACE_NAME}", - f"duration: {delta_t} seconds", - f"total received: {total_rx} b", - f"total sent: {total_tx} b" - ] - - if TOTAL_GPU_RAM != 0: - summary.extend([ - f"total gpu ram: {TOTAL_GPU_RAM * 1024 * 1024:.2f} B", # default units are MiB - f"max gpu ram used: {MAX_USED_GPU_RAM * 1024 * 1024:.2f} B", # default units are MiB - f"average gpu load: {TOTAL_GPU_LOAD / SAMPLE_NUMBER:.2f} %" - ]) - - logger.info(", ".join([str(i) for i in summary])) - -def get_meminfo(scheduler): - global MAX_USED_RAM - scheduler.enter(0.1, 1, get_meminfo, (scheduler,)) - now = datetime.datetime.now() - date = now.strftime("%Y-%m-%d") - daytime = now.strftime("%H:%M:%S.%f") - ram_data = psutil.virtual_memory() - used = (ram_data.total - ram_data.free) - if used // 1024 > MAX_USED_RAM: - MAX_USED_RAM = used // 1024 - if is_darwin(): - line = [ - date + "-" + daytime, - 100 * ram_data.free / ram_data.total, - 0, - 100 * used / ram_data.total, - 0 - ] - else: - line = [ - date + "-" + daytime, - 100 * ram_data.free / ram_data.total, - 100 * ram_data.cached / ram_data.total, - 100 * ram_data.used / ram_data.total, - 100 * ram_data.shared / ram_data.total + def kill_handler(self, *_): + self.die = True + + def recv_data(self) -> str: + data = self.socket.recv(1 << 10) # 1024 bytes should be enough + return data.decode("utf-8").replace("\n", "").strip() + + # Add a summary comment to 'data.txt' + def summarize(self): + # Is there anything to be summarized? + if SAMPLE_NUMBER == 0: + return + + average_load = TOTAL_LOAD / float(SAMPLE_NUMBER) + max_used_ram = MAX_USED_RAM * 1024.0 + total_ram = TOTAL_RAM * 1024.0 + max_used_fs = MAX_USED_FS * 1024.0 * 1024.0 + total_fs = TOTAL_FS * 1024 * 1024 + max_tx = MAX_TX / 128 # kB/s to Mb/s + max_rx = MAX_RX / 128 # kB/s to Mb/s + total_tx = END_TX-START_TX + total_rx = END_RX-START_RX + + sdt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d %H:%M:%S') + edt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d %H:%M:%S') + delta_t = (edt - sdt).total_seconds() + + summary = [ + f"# total ram: {total_ram:.2f} B", + f"total disk space: {total_fs:.2f} B", + f"max ram used: {max_used_ram:.2f} B", + f"max disk used: {max_used_fs:.2f} B", + f"average load: {average_load:.2f} %", + f"observed disk: {FS_NAME}", + f"max received: {max_rx:.2f} Mb/s", + f"max sent: {max_tx:.2f} Mb/s", + f"observed network: {IFACE_NAME}", + f"duration: {delta_t} seconds", + f"total received: {total_rx} b", + f"total sent: {total_tx} b" ] - msg = " ".join(["psu"]+[str(i) for i in line]) - logger.info(msg) + if TOTAL_GPU_RAM != 0: + summary.extend([ + f"total gpu ram: {TOTAL_GPU_RAM * 1024 * 1024:.2f} B", # default units are MiB + f"max gpu ram used: {MAX_USED_GPU_RAM * 1024 * 1024:.2f} B", # default units are MiB + f"average gpu load: {TOTAL_GPU_LOAD / SAMPLE_NUMBER:.2f} %" + ]) -def watch(session, fsdev, iface, tmpfs_color, other_cache_color, use_psutil, udp=None, udp_cookie=None): - file_handler = logging.FileHandler(f"{session}.txt") - file_handler.setFormatter(logging.Formatter("%(message)s")) - logger.addHandler(file_handler) + self.logger.info(", ".join([str(i) for i in summary])) - if udp is not None: - spl = udp.rsplit(':', 1) - udp_handler = UDPHandler(spl[0], int(spl[1])) - if udp_cookie is None: - udp_handler.setFormatter(logging.Formatter("%(message)s\n")) - else: - udp_handler.setFormatter(logging.Formatter(f"[{udp_cookie}] %(message)s\n")) - logger.addHandler(udp_handler) - - if is_darwin() or use_psutil: - return watch_psutil(session, fsdev, iface, tmpfs_color, other_cache_color) - return watch_sar(session, fsdev, iface, tmpfs_color, other_cache_color) - -# Run sar and gather data from it -def watch_sar(session, fsdev, iface, tmpfs_color, other_cache_color): - global SAMPLE_NUMBER - global START_DATE - global END_DATE - global TOTAL_LOAD - global MAX_USED_RAM - global MAX_USED_FS - global MAX_RX - global MAX_TX - global TOTAL_FS - global START_RX - global START_TX - global END_RX - global END_TX - global TOTAL_RAM - global FS_SAR_INDEX - global FS_NAME - global IFACE_NAME - global IFACE_SAR_INDEX - global TOTAL_GPU_LOAD - global TOTAL_GPU_RAM - global MAX_USED_GPU_RAM - - global die - - # Was a graph alreay produced by save command from sargraph? - dont_plot = False - - my_env = os.environ - my_env["S_TIME_FORMAT"] = "ISO" - - psar = run_or_fail("sar", "-F", "-u", "-n", "DEV", "1", stdout=subprocess.PIPE, env=my_env) - - s = sched.scheduler(time.time, time.sleep) - mem_ev = s.enter(0, 1, get_meminfo, (s,)) - thread = Thread(target = s.run) - thread.start() - - # subprocess for GPU data fetching in the background - try: - pgpu = subprocess.Popen( - 'nvidia-smi --query-gpu=utilization.gpu,memory.used --format=csv,noheader,nounits -l 1'.split(' '), - stdout=subprocess.PIPE, - env=my_env - ) - except: - pgpu = None - - machine = psar.stdout.readline().decode() - initialize(session, machine) - psar.stdout.readline() - - signal.signal(signal.SIGTERM, kill_handler) - - # Make stdin nonblocking to continue working when no command is sent - flags = fcntl.fcntl(sys.stdin, fcntl.F_GETFL) - fcntl.fcntl(sys.stdin, fcntl.F_SETFL, flags | os.O_NONBLOCK) - - # Gather data from sar output - curr_gpu_util = 0 - curr_gpu_mem = 0 - - while 1: - # Await sar output or a command sent from command handler in sargraph.py - readlist = [psar.stdout, sys.stdin] - if pgpu: - readlist.append(pgpu.stdout) - rlist, _, _ = select.select(readlist, [], [], 0.25) + def get_meminfo(self, scheduler): + global MAX_USED_RAM + scheduler.enter(0.1, 1, self.get_meminfo, (scheduler,)) now = datetime.datetime.now() - if sys.stdin in rlist: - if handle_command(session, s, dont_plot, tmpfs_color, other_cache_color, now): - break - if psar.stdout not in rlist: - continue - date = now.strftime("%Y-%m-%d") - daytime = now.strftime("%H:%M:%S") + daytime = now.strftime("%H:%M:%S.%f") + ram_data = psutil.virtual_memory() + used = (ram_data.total - ram_data.free) + if used // 1024 > MAX_USED_RAM: + MAX_USED_RAM = used // 1024 + if is_darwin(): + line = [ + date + "-" + daytime, + 100 * ram_data.free / ram_data.total, + 0, + 100 * used / ram_data.total, + 0 + ] + else: + line = [ + date + "-" + daytime, + 100 * ram_data.free / ram_data.total, + 100 * ram_data.cached / ram_data.total, + 100 * ram_data.used / ram_data.total, + 100 * ram_data.shared / ram_data.total + ] + msg = " ".join(["psu"]+[str(i) for i in line]) + self.logger.info(msg) + + def start(self): + self.socket.bind(self.socket_path) + signal.signal(signal.SIGTERM, self.kill_handler) - # Read and process CPU data try: - cpu_data = read_table(psar) - if START_DATE == "": - START_DATE = date + " " + daytime - TOTAL_LOAD += stof(cpu_data["%user"][0]) - SAMPLE_NUMBER += 1 - - if TOTAL_RAM == 0: - TOTAL_RAM = psutil.virtual_memory().total // 1024 - - # Read and process network data - net_data = read_table(psar) - if IFACE_SAR_INDEX is None: - if iface: - IFACE_SAR_INDEX = net_data['IFACE'].index(iface) - else: - maxj, maxv = 0, 0 - for j, used in enumerate(net_data['IFACE']): - v = stof(net_data['rxkB/s'][j]) - if maxv < v: - maxj, maxv = j, v - IFACE_SAR_INDEX = maxj - if IFACE_NAME is None: - IFACE_NAME = net_data['IFACE'][IFACE_SAR_INDEX] - if START_RX <= 0 or START_TX <= 0: - START_RX, START_TX = read_iface_stats(IFACE_NAME) - END_RX, END_TX = read_iface_stats(IFACE_NAME) - if MAX_RX < stof(net_data['rxkB/s'][IFACE_SAR_INDEX]): - MAX_RX = stof(net_data['rxkB/s'][IFACE_SAR_INDEX]) - if MAX_TX < stof(net_data['txkB/s'][IFACE_SAR_INDEX]): - MAX_TX = stof(net_data['txkB/s'][IFACE_SAR_INDEX]) - - # Read and process FS data - fs_data = read_table(psar) - if FS_SAR_INDEX is None: - if fsdev: - FS_SAR_INDEX = fs_data['FILESYSTEM'].index(fsdev) + self.watch() + except Exception as e: + # make sure we prepend '#' to every line, to make reading file work + self.logger.error("# Exception while watching!") + for line in traceback.format_exception(type(e), e, e.__traceback__): + self.logger.error(f"# {line}") + + try: # clean up after ourselves + os.unlink(self.socket_path) + except OSError: + pass + + return + + def handle_command(self, label_line: str, s: sched.scheduler, now: datetime.datetime): + if label_line.startswith("command:"): + label_line = label_line[len("command:"):] + if label_line.startswith("q:"): + label_line = label_line[len("q:"):] + + list(map(s.cancel, s.queue)) + self.summarize() + if label_line == "none": + pass + elif label_line: + graph.graph(self.session, self.tmpfs_color, self.other_cache_color, label_line) + elif not self.dont_plot: + graph.graph(self.session, self.tmpfs_color, self.other_cache_color) + self.dont_plot = True + self.die = 1 + return True + elif label_line.startswith("s:"): + label_line = label_line[len("s:"):] + + self.dont_plot = True + + if label_line != "none": + self.summarize() + if not label_line: + graph.graph(self.session, self.tmpfs_color, self.other_cache_color) else: - maxj, maxv = 0, 0 - for j, free in enumerate(fs_data['MBfsfree']): - v = stof(fs_data['MBfsfree'][j]) + stof(fs_data['MBfsused'][j]) - # Skip shared memory device - if fs_data["FILESYSTEM"][j] == "/dev/shm": - continue - if maxv < v: - maxj, maxv = j, v - FS_SAR_INDEX = maxj - if FS_NAME is None: - FS_NAME = fs_data["FILESYSTEM"][FS_SAR_INDEX] - if TOTAL_FS == 0: - TOTAL_FS = (stof(fs_data['MBfsused'][FS_SAR_INDEX]) + stof(fs_data['MBfsfree'][FS_SAR_INDEX])) - if MAX_USED_FS < int(fs_data['MBfsused'][FS_SAR_INDEX]): - MAX_USED_FS = int(fs_data['MBfsused'][FS_SAR_INDEX]) - - END_DATE = date + " " + daytime - timestamp = date + "-" + daytime - except ValueError as e: - print("Sar process has exited - quitting sargraph") - break + graph.graph(self.session, self.tmpfs_color, self.other_cache_color, label_line) + elif label_line.startswith('label:'): + label_line = label_line[len('label:'):] + with open(f"{self.session}.txt", "a") as f: + timestamp = now.strftime("%Y-%m-%d-%H:%M:%S") + print(f"# {timestamp} label: {label_line}", file=f) + return False + +class SarWatcher(Watcher): + + def initialize(self, machine): + global TOTAL_RAM + global TOTAL_GPU_RAM + + with open("/proc/meminfo") as f: + TOTAL_RAM = int(scan("MemTotal:\s+(\d+)", float, f.read())) + + uname = machine.split(" ")[0:2] + uname = f"{uname[0]} {uname[1]}" + + cpus = int(machine.split(" CPU)")[0].split("(")[-1]) + + cpu_name = "unknown" + + with open("/proc/cpuinfo") as f: + for line in f: + if "model name" in line: + cpu_name = line.replace("\n", "").split(": ")[1] + break + header = [ + f"# sargraph version: {SARGRAPH_VERSION}", + f"pid: {os.getpid()}", + f"machine: {uname}", + f"cpu count: {cpus}", + f"cpu: {cpu_name}" + ] + try: + pgpu = subprocess.run( + 'nvidia-smi --query-gpu=name,driver_version,memory.total --format=csv,noheader,nounits'.split(' '), + capture_output=True + ) + if pgpu.returncode == 0: + gpuname, gpudriver, memory_total = pgpu.stdout.decode('utf-8').rsplit(', ', 2) + header.extend([ + f"gpu: {gpuname}", + f"gpu driver: {gpudriver}" + ]) + TOTAL_GPU_RAM = int(memory_total) + except Exception as e: + print(e) + pass - if pgpu and pgpu.stdout in rlist: - line = pgpu.stdout.readline().decode('utf-8') - if pgpu.poll() is not None: - print("nvidia-smi stopped working, reason:") - print(line) - print(f"Error code: {pgpu.returncode}") - print("Closing the GPU statistics collection") - pgpu = None - else: - try: - curr_gpu_util, curr_gpu_mem = [ - int(val.strip()) for val in line.split(', ') - ] - if MAX_USED_GPU_RAM < curr_gpu_mem: - MAX_USED_GPU_RAM = curr_gpu_mem - TOTAL_GPU_LOAD += curr_gpu_util - except ValueError: - print(f"nvidia-smi error readout: {line}") - if "Unknown Error" in line: - # No valid readouts from now on, let's terminate current nvidia-smi session - pgpu.terminate() + self.logger.info(", ".join(header)) + + def watch(self): + global SAMPLE_NUMBER + global START_DATE + global END_DATE + global TOTAL_LOAD + global MAX_USED_RAM + global MAX_USED_FS + global MAX_RX + global MAX_TX + global TOTAL_FS + global START_RX + global START_TX + global END_RX + global END_TX + global TOTAL_RAM + global FS_SAR_INDEX + global FS_NAME + global IFACE_NAME + global IFACE_SAR_INDEX + global TOTAL_GPU_LOAD + global TOTAL_GPU_RAM + global MAX_USED_GPU_RAM + + my_env = os.environ + my_env["S_TIME_FORMAT"] = "ISO" + + psar = run_or_fail("sar", "-F", "-u", "-n", "DEV", "1", stdout=subprocess.PIPE, env=my_env) + + s = sched.scheduler(time.time, time.sleep) + mem_ev = s.enter(0, 1, self.get_meminfo, (s,)) + thread = Thread(target = s.run) + thread.start() + + # subprocess for GPU data fetching in the background + try: + pgpu = subprocess.Popen( + 'nvidia-smi --query-gpu=utilization.gpu,memory.used --format=csv,noheader,nounits -l 1'.split(' '), + stdout=subprocess.PIPE, + env=my_env + ) + except: + pgpu = None + + machine = psar.stdout.readline().decode() + self.initialize(machine) + psar.stdout.readline() + + # Gather data from sar output + curr_gpu_util = 0 + curr_gpu_mem = 0 + + socket_fd = self.socket.fileno() + + while 1: + # Await sar output or a command sent from command handler in sargraph.py + readlist = [psar.stdout, socket_fd] + if pgpu: + readlist.append(pgpu.stdout) + rlist, _, _ = select.select(readlist, [], [], 0.25) + now = datetime.datetime.now() + + if psar.stdout not in rlist: + continue + + if socket_fd in rlist: + data = self.recv_data() + now = datetime.datetime.now() + if self.handle_command(data, s, now): + break + + + date = now.strftime("%Y-%m-%d") + daytime = now.strftime("%H:%M:%S") + + # Read and process CPU data + try: + cpu_data = read_table(psar) + if START_DATE == "": + START_DATE = date + " " + daytime + TOTAL_LOAD += stof(cpu_data["%user"][0]) + SAMPLE_NUMBER += 1 + + if TOTAL_RAM == 0: + TOTAL_RAM = psutil.virtual_memory().total // 1024 + + # Read and process network data + net_data = read_table(psar) + if IFACE_SAR_INDEX is None: + if self.iface: + IFACE_SAR_INDEX = net_data['IFACE'].index(self.iface) + else: + maxj, maxv = 0, 0 + for j, used in enumerate(net_data['IFACE']): + v = stof(net_data['rxkB/s'][j]) + if maxv < v: + maxj, maxv = j, v + IFACE_SAR_INDEX = maxj + if IFACE_NAME is None: + IFACE_NAME = net_data['IFACE'][IFACE_SAR_INDEX] + if START_RX <= 0 or START_TX <= 0: + START_RX, START_TX = read_iface_stats(IFACE_NAME) + END_RX, END_TX = read_iface_stats(IFACE_NAME) + if MAX_RX < stof(net_data['rxkB/s'][IFACE_SAR_INDEX]): + MAX_RX = stof(net_data['rxkB/s'][IFACE_SAR_INDEX]) + if MAX_TX < stof(net_data['txkB/s'][IFACE_SAR_INDEX]): + MAX_TX = stof(net_data['txkB/s'][IFACE_SAR_INDEX]) + + # Read and process FS data + fs_data = read_table(psar) + if FS_SAR_INDEX is None: + if self.fsdev: + FS_SAR_INDEX = fs_data['FILESYSTEM'].index(self.fsdev) + else: + maxj, maxv = 0, 0 + for j, free in enumerate(fs_data['MBfsfree']): + v = stof(fs_data['MBfsfree'][j]) + stof(fs_data['MBfsused'][j]) + # Skip shared memory device + if fs_data["FILESYSTEM"][j] == "/dev/shm": + continue + if maxv < v: + maxj, maxv = j, v + FS_SAR_INDEX = maxj + if FS_NAME is None: + FS_NAME = fs_data["FILESYSTEM"][FS_SAR_INDEX] + if TOTAL_FS == 0: + TOTAL_FS = (stof(fs_data['MBfsused'][FS_SAR_INDEX]) + stof(fs_data['MBfsfree'][FS_SAR_INDEX])) + if MAX_USED_FS < int(fs_data['MBfsused'][FS_SAR_INDEX]): + MAX_USED_FS = int(fs_data['MBfsused'][FS_SAR_INDEX]) + + END_DATE = date + " " + daytime + timestamp = date + "-" + daytime + except ValueError as e: + print("Sar process has exited - quitting sargraph") + break + + if pgpu and pgpu.stdout in rlist: + line = pgpu.stdout.readline().decode('utf-8') + if pgpu.poll() is not None: + print("nvidia-smi stopped working, reason:") + print(line) + print(f"Error code: {pgpu.returncode}") + print("Closing the GPU statistics collection") pgpu = None + else: + try: + curr_gpu_util, curr_gpu_mem = [ + int(val.strip()) for val in line.split(', ') + ] + if MAX_USED_GPU_RAM < curr_gpu_mem: + MAX_USED_GPU_RAM = curr_gpu_mem + TOTAL_GPU_LOAD += curr_gpu_util + except ValueError: + print(f"nvidia-smi error readout: {line}") + if "Unknown Error" in line: + # No valid readouts from now on, let's terminate current nvidia-smi session + pgpu.terminate() + pgpu = None + + line = [ + timestamp, + cpu_data['%user'][0], + fs_data['%fsused'][FS_SAR_INDEX], + stof(net_data['rxkB/s'][IFACE_SAR_INDEX])/128, # kB/s to Mb/s + stof(net_data['txkB/s'][IFACE_SAR_INDEX])/128, # kB/s to Mb/s + ] + if pgpu and TOTAL_GPU_RAM != 0: + line.extend([ + f'{curr_gpu_util:.2f}', + f'{curr_gpu_mem / TOTAL_GPU_RAM * 100.0:.2f}' + ]) + self.logger.info(" ".join(["sar"]+[str(i) for i in line])) + + if self.die: + break + + list(map(s.cancel, s.queue)) + thread.join() + + # This runs if we were stopped by SIGTERM and no plot was made so far + if not self.dont_plot: + self.summarize() + graph.graph(self.session, self.tmpfs_color, self.other_cache_color) + +class PsUtilWatcher(Watcher): + + def initialize(self, _ = None): + global TOTAL_RAM + global TOTAL_GPU_RAM + + TOTAL_RAM = int(psutil.virtual_memory().total / 1024) + + cpus = psutil.cpu_count(logical=True) + + cpu_name = platform.processor() or "unknown" + + header = [ + f"# psutil version: {psutil.__version__}", + f"pid: {os.getpid()}", + f"machine: {platform.system()}", + f"cpu count: {cpus}", + f"cpu: {cpu_name}" + ] + self.logger.info(", ".join(header)) + + # sar is not available on macOS. This function creates the sar behavior, but use psutil instead. + def psutil_sar_simulation(self, scheduler: sched.scheduler): + global START_DATE + global TOTAL_LOAD + global SAMPLE_NUMBER + global TOTAL_RAM + global START_RX + global START_TX + global END_TX + global END_RX + global MAX_RX + global MAX_TX + global IFACE_NAME + global TOTAL_FS + global MAX_USED_FS + global FS_NAME + global END_DATE + + scheduler.enter(1, 1, self.psutil_sar_simulation, (scheduler,)) + now = datetime.datetime.now() + date = now.strftime("%Y-%m-%d") + daytime = now.strftime("%H:%M:%S") + if START_DATE == "": + START_DATE = date + " " + daytime + cpu_used = psutil.cpu_percent() + TOTAL_LOAD += cpu_used + SAMPLE_NUMBER += 1 + if TOTAL_RAM == 0: + TOTAL_RAM = psutil.virtual_memory().total // 1024 + IFACE_NAME = "all" + net_stats = psutil.net_io_counters() + if START_RX <= 0 or START_TX <= 0: + START_RX, START_TX = net_stats.bytes_recv, net_stats.bytes_sent + END_RX, END_TX = net_stats.bytes_recv, net_stats.bytes_sent + curr_rx, curr_tx = (net_stats.bytes_recv - END_RX) / (1024 * 8), (net_stats.bytes_sent - END_TX) / (1024 * 8) + END_RX, END_TX = net_stats.bytes_recv, net_stats.bytes_sent + if MAX_RX < curr_rx: + MAX_RX = curr_rx + if MAX_TX < curr_tx: + MAX_TX = curr_tx + # apfs implements lvm, so it's a better option for visualizing the place in the container (which is shared by all partitions). + if is_darwin(): + FS_NAME = "apfs container" + disk_stats = psutil.disk_usage('/') + else: + largest_partition = max( + psutil.disk_partitions(all=False), + key=lambda p: psutil.disk_usage(p.mountpoint).total + ) + disk_stats = psutil.disk_usage(largest_partition.mountpoint) + FS_NAME = largest_partition.device + + curr_used = (disk_stats.total - disk_stats.free) / (1024 * 1024) + if TOTAL_FS == 0: + TOTAL_FS = disk_stats.total / (1024 * 1024) + if MAX_USED_FS < curr_used: + MAX_USED_FS = curr_used + END_DATE = date + " " + daytime + timestamp = date + "-" + daytime line = [ timestamp, - cpu_data['%user'][0], - fs_data['%fsused'][FS_SAR_INDEX], - stof(net_data['rxkB/s'][IFACE_SAR_INDEX])/128, # kB/s to Mb/s - stof(net_data['txkB/s'][IFACE_SAR_INDEX])/128, # kB/s to Mb/s + cpu_used, + ((disk_stats.total - disk_stats.free) / disk_stats.total) * 100, + curr_rx / 128, + curr_tx / 128, ] - if pgpu and TOTAL_GPU_RAM != 0: - line.extend([ - f'{curr_gpu_util:.2f}', - f'{curr_gpu_mem / TOTAL_GPU_RAM * 100.0:.2f}' - ]) - logger.info(" ".join(["sar"]+[str(i) for i in line])) - if die: - break - - list(map(s.cancel, s.queue)) - thread.join() + self.logger.info(" ".join(["sar"]+[str(i) for i in line])) - # This runs if we were stopped by SIGTERM and no plot was made so far - if not dont_plot: - summarize(session) - graph.graph(session, tmpfs_color, other_cache_color) + def watch(self): + # Was a graph already produced by save command from sargraph? + dont_plot = False -def watch_psutil(session, fsdev, iface, tmpfs_color, other_cache_color): - # Was a graph already produced by save command from sargraph? - dont_plot = False + s = sched.scheduler(time.time, time.sleep) + sar_ev = s.enter(0, 1, self.psutil_sar_simulation, (s,)) + mem_ev = s.enter(0, 1, self.get_meminfo, (s,)) + thread = Thread(target = s.run) + thread.start() - s = sched.scheduler(time.time, time.sleep) - sar_ev = s.enter(0, 1, psutil_sar_simulation, (s,)) - mem_ev = s.enter(0, 1, get_meminfo, (s,)) - thread = Thread(target = s.run) - thread.start() + self.initialize() + socket_fd = self.socket.fileno() + while 1: + # Await sar output or a command sent from command handler in sargraph.py + readlist = [socket_fd] + rlist, _, _ = select.select(readlist, [], [], 0.25) - initialize_darwin(session) - signal.signal(signal.SIGTERM, kill_handler) + if socket_fd in rlist: + data = self.recv_data() + now = datetime.datetime.now() + if self.handle_command(data, s, now): + break - # Make stdin nonblocking to continue working when no command is sent - flags = fcntl.fcntl(sys.stdin, fcntl.F_GETFL) - fcntl.fcntl(sys.stdin, fcntl.F_SETFL, flags | os.O_NONBLOCK) + list(map(s.cancel, s.queue)) + thread.join() - - while 1: - # Await sar output or a command sent from command handler in sargraph.py - readlist = [sys.stdin] - rlist, _, _ = select.select(readlist, [], [], 0.25) - now = datetime.datetime.now() - if handle_command(session, s, dont_plot, tmpfs_color, other_cache_color, now): - break - list(map(s.cancel, s.queue)) - thread.join() - - # This runs if we were stopped by SIGTERM and no plot was made so far - if not dont_plot: - summarize(session) - graph.graph(session, tmpfs_color, other_cache_color) - -def handle_command(session, s, dont_plot, tmpfs_color, other_cache_color, now): - global die - label_line = sys.stdin.readline().replace("\n", "") - if label_line.startswith("command:"): - label_line = label_line[len("command:"):] - if label_line.startswith("q:"): - label_line = label_line[len("q:"):] - - list(map(s.cancel, s.queue)) - summarize(session) - if label_line == "none": - pass - elif label_line: - graph.graph(session, tmpfs_color, other_cache_color, label_line) - elif not dont_plot: - graph.graph(session, tmpfs_color, other_cache_color) - dont_plot = True - die = 1 - return True - elif label_line.startswith("s:"): - label_line = label_line[len("s:"):] - - dont_plot = True - - if label_line != "none": - summarize(session) - if not label_line: - graph.graph(session, tmpfs_color, other_cache_color) - else: - graph.graph(session, tmpfs_color, other_cache_color, label_line) - elif label_line.startswith('label:'): - label_line = label_line[len('label:'):] - with open(f"{session}.txt", "a") as f: - timestamp = now.strftime("%Y-%m-%d-%H:%M:%S") - print(f"# {timestamp} label: {label_line}", file=f) - return False - -# sar is not available on macOS. This function creates the sar behavior, but use psutil instead. -def psutil_sar_simulation(scheduler): - global START_DATE - global TOTAL_LOAD - global SAMPLE_NUMBER - global TOTAL_RAM - global START_RX - global START_TX - global END_TX - global END_RX - global MAX_RX - global MAX_TX - global IFACE_NAME - global TOTAL_FS - global MAX_USED_FS - global FS_NAME - global END_DATE - - scheduler.enter(1, 1, psutil_sar_simulation, (scheduler,)) - now = datetime.datetime.now() - date = now.strftime("%Y-%m-%d") - daytime = now.strftime("%H:%M:%S") - if START_DATE == "": - START_DATE = date + " " + daytime - cpu_used = psutil.cpu_percent() - TOTAL_LOAD += cpu_used - SAMPLE_NUMBER += 1 - if TOTAL_RAM == 0: - TOTAL_RAM = psutil.virtual_memory().total // 1024 - IFACE_NAME = "all" - net_stats = psutil.net_io_counters() - if START_RX <= 0 or START_TX <= 0: - START_RX, START_TX = net_stats.bytes_recv, net_stats.bytes_sent - END_RX, END_TX = net_stats.bytes_recv, net_stats.bytes_sent - curr_rx, curr_tx = (net_stats.bytes_recv - END_RX) / (1024 * 8), (net_stats.bytes_sent - END_TX) / (1024 * 8) - END_RX, END_TX = net_stats.bytes_recv, net_stats.bytes_sent - if MAX_RX < curr_rx: - MAX_RX = curr_rx - if MAX_TX < curr_tx: - MAX_TX = curr_tx - # apfs implements lvm, so it's a better option for visualizing the place in the container (which is shared by all partitions). - if is_darwin(): - FS_NAME = "apfs container" - disk_stats = psutil.disk_usage('/') - else: - largest_partition = max( - psutil.disk_partitions(all=False), - key=lambda p: psutil.disk_usage(p.mountpoint).total - ) - disk_stats = psutil.disk_usage(largest_partition.mountpoint) - FS_NAME = largest_partition.device - - curr_used = (disk_stats.total - disk_stats.free) / (1024 * 1024) - if TOTAL_FS == 0: - TOTAL_FS = disk_stats.total / (1024 * 1024) - if MAX_USED_FS < curr_used: - MAX_USED_FS = curr_used - END_DATE = date + " " + daytime - timestamp = date + "-" + daytime - - line = [ - timestamp, - cpu_used, - ((disk_stats.total - disk_stats.free) / disk_stats.total) * 100, - curr_rx / 128, - curr_tx / 128, - ] - - logger.info(" ".join(["sar"]+[str(i) for i in line])) + # This runs if we were stopped by SIGTERM and no plot was made so far + if not dont_plot: + self.summarize() + graph.graph(self.session, self.tmpfs_color, self.other_cache_color) \ No newline at end of file