from datetime import datetime, timezone
from typing import List, Optional
import configparser
import logging
import logging.handlers
import os
import subprocess
import sys
import threading
import time
UNKNOWN_NODE_NAME = "unknown"
IDLE_EXIT_SECONDS = 30
g_name_space = ""
g_target_log = "./log/"
g_max_log_size = 10000000
g_backup_count = 10
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
logger_monitor = logging.getLogger("logger_monitor")
logger_monitor.setLevel(logging.INFO)
logger_monitor.handlers.clear()
logger_monitor.addHandler(console_handler)
def log_i(msg: str) -> None:
logger_monitor.info(msg)
def log_w(msg: str) -> None:
logger_monitor.warning(msg)
def log_e(msg: str) -> None:
logger_monitor.error(msg)
class LogMonitor:
def __init__(self):
self.encode_type = "utf-8"
self.cmd_kubectl = "/usr/bin/kubectl"
self.cmd_awk = "/usr/bin/awk"
self.thread_name = "thread-log-"
self.threads = []
self.exit_flag = threading.Event()
self._idle = False
self._logged_save_paths = set()
self._pod_log_next_slot: dict[str, int] = {}
self._pod_log_collector_generation: dict[str, int] = {}
@staticmethod
def _allocate_unique_log_path(
pod_name: str, node_name: str, start_index: int
) -> tuple[str, int]:
"""Return path and index for the first unused ``{pod}_{node}_{n}.log`` under ``g_target_log``."""
candidate_log_index = start_index
while True:
file_path = os.path.join(
g_target_log, f"{pod_name}_{node_name}_{candidate_log_index}.log"
)
abs_path = os.path.abspath(os.path.normpath(file_path))
if not os.path.exists(abs_path):
return file_path, candidate_log_index
candidate_log_index += 1
def setup_rotating_logger(self, pod_name: str, log_file: str) -> Optional[logging.Logger]:
"""
Configure a logger with a rotation function
Args:
log_file: Path to the log file
Returns:
The configured logger object or None (if creation fails)
"""
log_dir = os.path.dirname(log_file)
if log_dir and not os.path.exists(log_dir):
try:
os.makedirs(log_dir, exist_ok=True)
except OSError as e:
log_e(f"Unable to create log directory {log_dir}: {e}")
return None
logger = logging.getLogger(pod_name)
logger.setLevel(logging.INFO)
logger.handlers.clear()
handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=g_max_log_size,
backupCount=g_backup_count,
encoding=self.encode_type
)
formatter = logging.Formatter('%(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def shell_get_pod_node(self, pod_name: str) -> str:
"""
Get the K8s node name where the specified pod is running.
:param pod_name: Name of the pod
:return: Node name, or "unknown" if it cannot be determined
"""
try:
kubectl_cmd = subprocess.Popen(
[
self.cmd_kubectl,
'get', 'pod', pod_name,
'-n', g_name_space,
'-o', 'jsonpath={.spec.nodeName}'
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
output, _ = kubectl_cmd.communicate()
if kubectl_cmd.returncode != 0:
return UNKNOWN_NODE_NAME
node_name = output.decode(self.encode_type).strip()
return node_name if node_name else UNKNOWN_NODE_NAME
except Exception as e:
log_e(f"shell_get_pod_node Exception for {pod_name}: {e}")
return UNKNOWN_NODE_NAME
def check_pod_is_running(self, pod_name: str) -> Optional[bool]:
"""
Probe pod phase via kubectl.
:return: ``True`` if phase is Running; ``False`` if get succeeded but not Running;
``None`` if ``kubectl get pod`` failed (caller should end this collector thread).
"""
kubectl_cmd = subprocess.Popen(
[
self.cmd_kubectl,
'get', 'pod', pod_name,
'-n', g_name_space,
'-o', 'jsonpath={.status.phase}'
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
output, err = kubectl_cmd.communicate()
if kubectl_cmd.returncode != 0:
err_msg = err.decode(self.encode_type, errors="ignore").strip()
log_w(
f"{pod_name}: kubectl get pod phase failed "
f"(exit={kubectl_cmd.returncode}): {err_msg}"
)
return None
status = output.decode(self.encode_type).strip()
return status == "Running"
def shell_get_pod(self) -> Optional[List[str]]:
"""
Run the kubectl command to obtain the pod list.
Returns:
A list of pod names or None (if an error occurs)
"""
try:
kubectl_cmd = subprocess.Popen(
[self.cmd_kubectl, 'get', 'pods', '-n', g_name_space, '-o', 'wide'],
stdout=subprocess.PIPE
)
awk_cmd = subprocess.Popen(
[self.cmd_awk, 'NR>1 {print $1}'],
stdin=kubectl_cmd.stdout,
stdout=subprocess.PIPE
)
output, _ = awk_cmd.communicate()
return output.decode(self.encode_type).strip().splitlines()
except Exception as e:
log_e(f"shell_get_pod Exception: {e}")
return None
def shell_pull_log(self, pod_name: str, file_path: str, interval: float = 0.2) -> bool:
"""
Execute the kubectl command to obtain the log.
"""
b_write_flag = False
abs_path = os.path.abspath(os.path.normpath(file_path))
logger = self.setup_rotating_logger(pod_name, abs_path)
if logger is None:
return b_write_flag
process = None
try:
process = subprocess.Popen(
[self.cmd_kubectl, 'logs', '-f', '-n', g_name_space, pod_name],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
universal_newlines=True
)
size = g_max_log_size / 1024 / 1024
if abs_path not in self._logged_save_paths:
self._logged_save_paths.add(abs_path)
log_i(
f"{pod_name}: logs save to: {abs_path} "
f"(max {size:.1f}MB, keep {g_backup_count} backups)"
)
while not self.exit_flag.is_set():
line = process.stdout.readline()
if not line:
if process.poll() is not None:
log_i(f"{pod_name} : cmd has exited.")
break
time.sleep(interval)
continue
b_write_flag = True
log_line = line.rstrip('\n')
logger.info(log_line)
except Exception as e:
log_e(f"{pod_name} :Exception: {e}")
finally:
if process and process.poll() is None:
process.terminate()
log_i(f"{pod_name} :The thread has exited.")
return b_write_flag
def pull_log_and_save(self, pod_name: str, interval: float = 3) -> None:
"""
Collect logs from a specified pod and save them to a file.
:param pod_name: Name of the pod to collect logs from
"""
generation_floor = self._pod_log_collector_generation.get(pod_name, 0)
next_slot = self._pod_log_next_slot.get(pod_name, 0)
index = max(generation_floor, next_slot)
node_name = self.shell_get_pod_node(pod_name)
try:
while not self.exit_flag.is_set():
pod_running_state = self.check_pod_is_running(pod_name)
if pod_running_state is None:
log_w(f"{pod_name}: kubectl get pod failed, exiting collector thread.")
return
if not pod_running_state:
log_w(f"{pod_name} :The pod is not in the 'Running' state, waiting...")
time.sleep(interval)
continue
fetched_node_name = self.shell_get_pod_node(pod_name)
if fetched_node_name != node_name:
log_i(
f"{pod_name}: node_name refresh {node_name!r} -> {fetched_node_name!r}"
)
node_name = fetched_node_name
file_path, allocated_log_index = self._allocate_unique_log_path(
pod_name, node_name, index
)
if allocated_log_index != index:
log_i(
f"{pod_name}: log file slot bumped {index} -> {allocated_log_index} "
"(target path already exists)."
)
if self.shell_pull_log(pod_name, file_path):
next_after = allocated_log_index + 1
self._pod_log_next_slot[pod_name] = max(
self._pod_log_next_slot.get(pod_name, 0),
next_after,
)
index = self._pod_log_next_slot[pod_name]
log_i(
f"{pod_name}: Log stream ended; pausing {interval}s before "
"re-checking pod and reopening logs if still Running."
)
time.sleep(interval)
else:
try:
os.remove(file_path)
except OSError:
pass
log_w(
f"{pod_name}: Failed to pull logs; "
f"pausing {interval}s before retry."
)
time.sleep(interval)
except Exception as e:
log_e(f"{pod_name} :Exception: {e}")
finally:
self._pod_log_collector_generation[pod_name] = (
self._pod_log_collector_generation.get(pod_name, 0) + 1
)
def monitor_stop(self, file_path: str, interval: float = 1) -> None:
"""
Periodically check whether the specified file exists in the directory; if it exists, exit the program.
:param file_path: Path of the file to be searched
:param interval: Check interval (in seconds)
"""
while True:
if os.path.exists(file_path):
log_i(f"The file {file_path} exists, so the program will exit.")
self.exit_flag.set()
break
flag = False
for thread in self.threads:
if thread.is_alive():
flag = True
break
if not flag:
log_e("All thread have terminated abnormally, so the program will exit.")
self.exit_flag.set()
break
time.sleep(interval)
def start_log_thread(self) -> bool:
list_line = self.shell_get_pod()
if list_line is None:
log_e("Exiting: failed to get pod list from kubectl.")
return False
if not list_line:
self._idle = True
return True
self._idle = False
dead_log_thread_names = [
t.name
for t in self.threads
if (not t.is_alive()) and t.name.startswith(self.thread_name)
]
if dead_log_thread_names:
log_i(
f"Pruned {len(dead_log_thread_names)} finished log collector thread(s): "
f"{dead_log_thread_names}."
)
self.threads = [t for t in self.threads if t.is_alive()]
thread_names = [
thread.name
for thread in self.threads
if thread.name.startswith(self.thread_name)
]
list_line = [
pod_name
for pod_name in list_line
if f"{self.thread_name}{pod_name}" not in thread_names
]
if len(list_line) == 0:
return True
for pod_name in list_line:
log_i(f"pod_name: {pod_name}")
thread = threading.Thread(
target=self.pull_log_and_save,
args=(pod_name,),
name=f"{self.thread_name}{pod_name}",
daemon=True
)
self.threads.append(thread)
thread.start()
log_i(f" {len(list_line)} threads have been started.")
return True
def do(self, interval: float = 5):
if not self.start_log_thread():
return
stop_file = f"stop_log_{g_name_space}"
threading.Thread(
target=self.monitor_stop,
args=(stop_file,),
name="thread-monitor",
daemon=True
).start()
idle_since = time.monotonic() if self._idle else None
while not self.exit_flag.is_set():
if not self.start_log_thread():
break
if self._idle:
if idle_since is None:
idle_since = time.monotonic()
log_w(
f"No pods in namespace '{g_name_space}'. "
f"Will exit after {IDLE_EXIT_SECONDS}s idle."
)
elif time.monotonic() - idle_since >= IDLE_EXIT_SECONDS:
log_i(
f"No pods in namespace for {IDLE_EXIT_SECONDS}s, "
"exiting gracefully."
)
self.exit_flag.set()
break
else:
idle_since = None
time.sleep(interval)
for thread in self.threads:
thread.join()
log_i("All tasks have been terminated")
def read_config(config_file: str) -> None:
"""
Read and apply the contents of the configuration file.
:param config_file: Path to the configuration file
"""
global g_name_space, g_target_log, g_max_log_size, g_backup_count
script_dir = os.path.dirname(os.path.abspath(__file__))
cfg_path = os.path.normpath(
config_file if os.path.isabs(config_file)
else os.path.join(script_dir, config_file)
)
config = configparser.ConfigParser()
if not config.read(cfg_path):
log_e(f"The configuration file {cfg_path} does not exist or cannot be read.")
sys.exit(1)
log_section = "LogSetting"
for sec_name in config.sections():
log_i(f"[{sec_name}]")
for key, value in config[sec_name].items():
log_i(f"{key} = {value}")
if log_section not in config:
log_e(
f"The [{log_section}] section is missing in {cfg_path}; "
"add it and required keys (see the template in the same directory)."
)
sys.exit(1)
try:
g_max_log_size = config[log_section].getint("max_log_size", g_max_log_size)
g_backup_count = config[log_section].getint("backup_count", g_backup_count)
except ValueError as e:
log_e(f"Invalid integer in [{log_section}] (max_log_size or backup_count): {e}")
sys.exit(1)
g_name_space = (config[log_section].get("name_space", g_name_space) or "").strip()
out_raw = (
(config[log_section].get("out_path", g_target_log) or "").strip() or g_target_log
)
if os.path.isabs(out_raw):
out_base = os.path.normpath(out_raw)
else:
out_base = os.path.normpath(os.path.join(script_dir, out_raw))
if g_max_log_size <= 0:
log_e(f"max_log_size must be positive, got {g_max_log_size}.")
sys.exit(1)
if g_backup_count < 0:
log_e(f"backup_count must be non-negative, got {g_backup_count}.")
sys.exit(1)
session_dir = os.path.join(
out_base, datetime.now(timezone.utc).astimezone().strftime('%Y%m%d_%H%M%S')
)
g_target_log = os.path.abspath(session_dir)
log_i(
f"Read configuration: [{log_section}] succeeded. "
f"Pod logs directory (absolute): {g_target_log}"
)
if __name__ == "__main__":
read_config("log_config.ini")
log_i(
f"Use the command [touch {os.getcwd()}/stop_log_{g_name_space}] "
"to stop the background process."
)
LogMonitor().do()