import sys
import os
import time
import platform
from datetime import datetime
import re
from pathlib import Path
import subprocess
from collections import defaultdict
from resources.global_var import ROOT_CONFIG_FILE
from log_util import LogUtil
from io_util import IoUtil
GB_CONSTANT = 1024 ** 3
MEM_CONSTANT = 1024
RET_CONSTANT = 0
MONITOR_TIME_CONSTANT = 30
SNOP_TIME_CONSTANT = 5
memory_info = [RET_CONSTANT] * 3
target_keys = ['MemTotal', 'SwapTotal', 'MemFree']
key_indices = {key: index for index, key in enumerate(target_keys)}
class Monitor():
def __init__(self):
self.now_times = []
self.usr_cpus = []
self.sys_cpus = []
self.idle_cpus = []
self.total_mems = []
self.swap_mems = []
self.free_mems = []
self.log_path = ""
def collect_cpu_info(self):
if platform.system() != "Linux":
return RET_CONSTANT, RET_CONSTANT, RET_CONSTANT
try:
top_command = ["top", "-bn1"]
grep_command = ["grep", "%Cpu(s)"]
top_output = subprocess.check_output(top_command, universal_newlines=True)
result = subprocess.check_output(grep_command, input=top_output, universal_newlines=True).strip()
if result:
parts = result.split()
if len(parts) >= 5:
usr_cpu_str = parts[1]
sys_cpu_str = parts[3]
idle_cpu_str = parts[7]
usr_cpu_str = usr_cpu_str.split(',')[0].rstrip('%')
sys_cpu_str = sys_cpu_str.split(',')[0].rstrip('%')
idle_cpu_str = idle_cpu_str.split(',')[0].rstrip('%')
usr_cpu = float(usr_cpu_str) if usr_cpu_str.replace('.', '', 1).isdigit() else RET_CONSTANT
sys_cpu = float(sys_cpu_str) if sys_cpu_str.replace('.', '', 1).isdigit() else RET_CONSTANT
idle_cpu = float(idle_cpu_str) if idle_cpu_str.replace('.', '', 1).isdigit() else RET_CONSTANT
self.usr_cpu = usr_cpu
self.sys_cpu = sys_cpu
self.idle_cpu = idle_cpu
return self.usr_cpu, self.sys_cpu, self.idle_cpu
else:
return RET_CONSTANT, RET_CONSTANT, RET_CONSTANT
else:
return RET_CONSTANT, RET_CONSTANT, RET_CONSTANT
except subprocess.CalledProcessError:
return RET_CONSTANT, RET_CONSTANT, RET_CONSTANT
def extract_memory_value(self, line: str):
match = re.search(r'\d+', line)
return int(match.group()) * MEM_CONSTANT if match else RET_CONSTANT
def get_ret_num(self, line: str):
key = line.split(':')[0]
if key in target_keys:
value = self.extract_memory_value(line)
memory_info[key_indices[key]] = value
def get_linux_mem_info(self):
try:
with open('/proc/meminfo', 'r') as f:
for line in f:
self.get_ret_num(line)
return memory_info[0], memory_info[1], memory_info[2]
except FileNotFoundError:
return RET_CONSTANT, RET_CONSTANT, RET_CONSTANT
except Exception as e:
print(f"Error occurred while getting memory info: {e}")
return RET_CONSTANT, RET_CONSTANT, RET_CONSTANT
def collect_linux_mem_info(self):
total_memory, swap_memory, free_memory = self.get_linux_mem_info()
total_mem = round(total_memory / GB_CONSTANT, 1)
free_mem = round(free_memory / GB_CONSTANT, 1)
swap_mem = round(swap_memory / GB_CONSTANT, 1)
return total_mem, free_mem, swap_mem
def get_current_time(self):
now_time = datetime.now().strftime("%H:%M:%S")
self.now_times.append(now_time)
def get_current_cpu(self):
usr_cpu, sys_cpu, idle_cpu = self.collect_cpu_info()
self.usr_cpus.append(usr_cpu)
self.sys_cpus.append(sys_cpu)
self.idle_cpus.append(idle_cpu)
LogUtil.write_log(self.log_path, f"User Cpu%: {usr_cpu}%", "info")
LogUtil.write_log(self.log_path, f"System Cpu%: {sys_cpu}%", "info")
LogUtil.write_log(self.log_path, f"Idle CPU%: {idle_cpu}%", "info")
def get_current_memory(self):
total_mem, free_mem, swap_mem = self.collect_linux_mem_info()
self.total_mems.append(total_mem)
self.free_mems.append(free_mem)
self.swap_mems.append(swap_mem)
LogUtil.write_log(self.log_path, f"Total Memory: {total_mem}GB", "info")
LogUtil.write_log(self.log_path, f"Free Memory: {free_mem}GB", "info")
LogUtil.write_log(self.log_path, f"Swap Memory: {swap_mem}GB", "info")
def get_log_path(self):
count = 0
config_content = IoUtil.read_json_file(ROOT_CONFIG_FILE)
while not os.path.exists(ROOT_CONFIG_FILE) or not config_content.get('out_path', None) and count <= 60:
time.sleep(SNAP_TIME_CONSTANT)
count += 1
return config_content.get('out_path', None)
def get_disk_usage(self):
result = subprocess.run(['df', '-h'], stdout=subprocess.PIPE, text=True)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')[1:]
for line in lines:
columns = line.split()
if len(columns) > 5:
filesystem, size, used, available, percent, mountpoint = columns[:6]
LogUtil.write_log(self.log_path,
f"Filesystem: {filesystem}, "
f"Size: {size}, "
f"Used: {used}, "
f"Available: {available}, "
f"Use%: {percent}, "
f"Mounted on: {mountpoint}",
"info")
else:
LogUtil.write_log(self.log_path, f"Error running df command:{result.stderr}", "info")
def run(self):
if platform.system() != "Linux":
return
out_path = self.get_log_path()
self.log_path = os.path.join(out_path, "build.log")
self.get_current_time()
self.get_current_cpu()
self.get_current_memory()
self.get_disk_usage()