"""oGRAC deployment orchestrator."""
import getpass
import grp
import os
import pwd
import shlex
import shutil
import sys
import subprocess
import tempfile
import time
CUR_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, CUR_DIR)
from nofile_utils import update_limits_conf_user_nofile
from config import get_config, cfg
from log_config import get_logger
from utils import (
exec_popen, run_cmd, run_as_user, run_python_as_user,
ensure_dir, safe_remove, copy_tree, chown_recursive,
read_version, CommandError,
)
from config_validation_runner import validate_config_params_or_raise
LOG = get_logger("deploy")
PRE_INSTALL_ORDER = ["ograc", "cms", "dss"]
INSTALL_ORDER = ["cms", "dss", "ograc", "og_om", "ograc_exporter"]
START_ORDER = ["cms", "dss", "ograc", "og_om", "ograc_exporter"]
STOP_ORDER = ["cms", "dss", "ograc", "og_om", "ograc_exporter"]
UNINSTALL_ORDER = ["og_om", "ograc_exporter", "ograc", "dss", "cms"]
BACKUP_ORDER = ["ograc", "dss", "cms", "og_om"]
RESTORE_ORDER = ["og_om", "cms", "dss", "ograc"]
CHECK_STATUS_ORDER = ["ograc", "cms", "dss", "og_om", "ograc_exporter"]
PRE_UPGRADE_ORDER = ["og_om", "ograc_exporter", "cms", "ograc"]
UPGRADE_ORDER = ["og_om", "ograc_exporter", "cms", "ograc"]
POST_UPGRADE_ORDER = ["og_om", "ograc_exporter", "cms", "ograc"]
ROLLBACK_ORDER = ["cms", "ograc", "og_om", "ograc_exporter"]
INIT_CONTAINER_ORDER = ["cms", "ograc"]
class OgracDeploy:
"""oGRAC deployment orchestrator."""
def __init__(self):
self.cfg = get_config()
self.paths = self.cfg.paths
self.deploy = self.cfg.deploy
self._sync_deploy_attrs()
os.environ["DEPLOY_PKG_DIR"] = os.path.dirname(CUR_DIR)
def _sync_deploy_attrs(self):
self.ograc_user = self.deploy.ograc_user
self.ograc_group = self.deploy.ograc_group
self.ograc_common_group = self.deploy.ograc_common_group
self.ogmgr_user = self.deploy.ogmgr_user
self.deploy_mode = self.deploy.deploy_mode
self.ograc_in_container = self.deploy.ograc_in_container
self.node_id = self.deploy.node_id
def _call_module(self, module, action, *extra_args):
"""Invoke submodule appctl.sh."""
appctl = os.path.join(self.paths.action_dir, module, "appctl.sh")
if not os.path.exists(appctl):
appctl = os.path.join(CUR_DIR, module, "appctl.sh")
if not os.path.exists(appctl):
LOG.warning("Module %s appctl.sh not found, skipping", module)
return 0
cmd = f"sh {appctl} {action}"
if extra_args:
cmd += " " + " ".join(str(a) for a in extra_args)
LOG.info("Calling %s %s", module, action)
ret, stdout, stderr = exec_popen(
cmd, timeout=self.cfg.timeout(action) or 1800)
if ret != 0:
details = "\n".join(part for part in (stdout.strip(), stderr.strip()) if part)
LOG.error("%s %s failed: %s", module, action, details)
else:
LOG.info("%s %s success", module, action)
return ret
def _prompt_sys_password_and_write_file(self):
if not sys.stdin.isatty():
LOG.error("password set error, please run sh appctl.sh install config_params_lun.json")
raise RuntimeError("password should be set in interactive terminal.")
prompt = "please input ograc password: "
confirm = "please confirm the password: "
pwd1 = getpass.getpass(prompt)
if not pwd1:
raise RuntimeError("password cannot be empty.")
pwd2 = getpass.getpass(confirm)
if pwd1 != pwd2:
raise RuntimeError("the password entered twice does not match, please re-execute the installation.")
fd, path = tempfile.mkstemp(prefix="ograc_sys_pwd.", dir="/tmp")
try:
os.write(fd, pwd1.encode("utf-8"))
os.close(fd)
fd = None
os.chmod(path, 0o600)
try:
uid = pwd.getpwnam(self.ograc_user).pw_uid
gid = grp.getgrnam(self.ograc_group).gr_gid
os.chown(path, uid, gid)
except (KeyError, OSError) as e:
os.unlink(path)
raise RuntimeError(
f"failed to set password file owner ({self.ograc_user}:{self.ograc_group}): {e}"
) from e
return path
except Exception:
if fd is not None:
try:
os.close(fd)
except OSError:
pass
if os.path.isfile(path):
try:
os.unlink(path)
except OSError:
pass
raise
def pre_install(self, install_type="override", config_file=""):
LOG.info("Begin pre_install, install_type=%s", install_type)
try:
config_file = validate_config_params_or_raise(CUR_DIR, config_file, logger=LOG)
except RuntimeError as error:
LOG.error(str(error))
return 1
if self.ograc_in_container not in ("1", "2"):
pre_install_py = os.path.join(CUR_DIR, "pre_install.py")
ret, stdout, stderr = exec_popen(
"python3 %s %s %s" % (
shlex.quote(pre_install_py),
shlex.quote(install_type),
shlex.quote(config_file),
),
timeout=self.cfg.timeout("pre_install"))
if ret != 0:
LOG.error("pre_install.py failed: %s", stderr)
return 1
else:
config_path = config_file
if os.path.exists(config_path):
deploy_param = os.path.join(CUR_DIR, "deploy_param.json")
copy_tree(config_path, deploy_param)
config_dir = os.path.join(os.path.dirname(CUR_DIR), "config")
deploy_param_src = os.path.join(CUR_DIR, "deploy_param.json")
if os.path.exists(deploy_param_src):
os.rename(deploy_param_src, os.path.join(config_dir, "deploy_param.json"))
self.deploy.write_param("install_type", install_type)
self._reload_config()
self._init_user_and_group()
self._correct_files_mod()
for module in PRE_INSTALL_ORDER:
ret = self._call_module(module, "pre_install", install_type)
if ret != 0:
LOG.error("pre_install %s failed", module)
return 1
LOG.info("pre_install %s success", module)
LOG.info("pre_install completed successfully")
return 0
def install(self, install_type="override", config_file=""):
LOG.info("Begin install, install_type=%s", install_type)
self._reload_config()
config_install_type = self.deploy.get("install_type", "override")
mes_ssl_switch = self.deploy.mes_ssl_switch
if config_install_type == "override":
self._create_common_dirs()
self._mount_fs()
if mes_ssl_switch:
self._copy_certificate()
self._install_ograc_package()
self._copy_resources()
self._fix_common_permissions()
sys_password_file = None
if "ograc" in INSTALL_ORDER and sys.stdin.isatty():
sys_password_file = self._prompt_sys_password_and_write_file()
if sys_password_file:
os.environ["OGRAC_SYS_PASSWORD_FILE"] = sys_password_file
try:
for module in INSTALL_ORDER:
LOG.info("Installing %s", module)
ret = self._call_module(module, "install")
if ret != 0:
LOG.error("Install %s failed", module)
return 1
LOG.info("Install %s success", module)
finally:
if sys_password_file and os.path.isfile(sys_password_file):
try:
os.unlink(sys_password_file)
except OSError:
pass
os.environ.pop("OGRAC_SYS_PASSWORD_FILE", None)
self._config_security_limits()
self._show_version()
LOG.info("install completed successfully")
return 0
def start(self, start_mode=""):
LOG.info("Begin start")
self._init_limits_config()
for module in START_ORDER:
LOG.info("Starting %s", module)
ret = self._call_module(module, "start", start_mode)
if ret != 0:
LOG.error("Start %s failed", module)
return 1
LOG.info("Start %s success", module)
self._start_daemon()
self._start_systemd_timers()
LOG.info("start completed successfully")
return 0
def stop(self):
LOG.info("Begin stop")
ensure_dir(os.path.dirname(self.paths.stop_enable))
open(self.paths.stop_enable, 'a').close()
if self.ograc_in_container == "0":
self._stop_systemd_timers()
self._stop_daemon()
self._kill_user_processes("cms_start2.sh -start")
for module in STOP_ORDER:
LOG.info("Stopping %s", module)
ret = self._call_module(module, "stop")
if ret != 0:
LOG.error("Stop %s failed", module)
return 1
LOG.info("Stop %s success", module)
LOG.info("stop completed successfully")
return 0
def uninstall(self, uninstall_type="override", force_type=""):
LOG.info("Begin uninstall, type=%s", uninstall_type)
self._reload_config()
self.deploy.write_param("uninstall_type", uninstall_type)
self._clear_security_limits()
self._clear_residual_files()
for module in UNINSTALL_ORDER:
LOG.info("Uninstalling %s", module)
args = [uninstall_type]
if force_type:
args.append(force_type)
ret = self._call_module(module, "uninstall", *args)
if ret != 0:
LOG.error("Uninstall %s failed", module)
return 1
LOG.info("Uninstall %s success", module)
if uninstall_type == "override":
self._umount_fs()
self._cleanup_override()
LOG.info("uninstall completed successfully")
return 0
def check_status(self):
LOG.info("Begin check_status")
all_online = True
all_offline = True
for module in CHECK_STATUS_ORDER:
ret = self._call_module(module, "check_status")
if ret == 0:
LOG.info("%s is online", module)
all_offline = False
else:
LOG.error("%s is offline", module)
all_online = False
daemon_running = self._is_daemon_running()
if daemon_running:
all_offline = False
else:
all_online = False
if self.ograc_in_container == "0":
timers_ready = self._check_systemd_timers_ready()
if timers_ready:
all_offline = False
else:
all_online = False
if all_online:
LOG.info("All processes are online")
return 0
if all_offline:
LOG.error("All processes are offline")
return 1
LOG.info("Partial online")
return 2
def backup(self):
LOG.info("Begin backup")
backup_root = self.paths.backup_dir
timestamp = time.strftime("%Y%m%d%H%M%S")
current_backup = os.path.join(backup_root, timestamp)
link_path = os.path.join(backup_root, "files")
ensure_dir(current_backup, mode=0o750)
chown_recursive(backup_root, self.ograc_user, self.ograc_group)
if os.path.islink(link_path) or os.path.exists(link_path):
os.remove(link_path)
os.symlink(current_backup, link_path)
LOG.info("Backup dir: %s, symlink: files -> %s", current_backup, timestamp)
deploy_param = os.path.join(self.paths.config_dir, "deploy_param.json")
if os.path.isfile(deploy_param):
shutil.copy2(deploy_param, current_backup)
LOG.info("Backed up deploy_param.json")
for module in BACKUP_ORDER:
ret = self._call_module(module, "backup")
if ret != 0:
LOG.error("Backup %s failed", module)
return 1
self._cleanup_old_backups(backup_root, max_keep=5)
LOG.info("backup completed successfully")
return 0
def _cleanup_old_backups(self, backup_root, max_keep=5):
if not os.path.isdir(backup_root):
return
dirs = sorted([
d for d in os.listdir(backup_root)
if os.path.isdir(os.path.join(backup_root, d)) and d.isdigit()
])
while len(dirs) > max_keep:
oldest = dirs.pop(0)
target = os.path.join(backup_root, oldest)
LOG.info("Cleaning old backup: %s", target)
shutil.rmtree(target, ignore_errors=True)
def restore(self):
LOG.info("Begin restore")
backup_root = self.paths.backup_dir
link_path = os.path.join(backup_root, "files")
if not os.path.exists(link_path):
LOG.error("No backup found at %s", link_path)
return 1
LOG.info("Restoring from %s", os.path.realpath(link_path))
deploy_param_bak = os.path.join(link_path, "deploy_param.json")
if os.path.isfile(deploy_param_bak):
dst = os.path.join(self.paths.config_dir, "deploy_param.json")
ensure_dir(self.paths.config_dir, mode=0o750)
shutil.copy2(deploy_param_bak, dst)
LOG.info("Restored deploy_param.json")
for module in RESTORE_ORDER:
ret = self._call_module(module, "restore")
if ret != 0:
LOG.error("Restore %s failed", module)
return 1
LOG.info("restore completed successfully")
return 0
def init_container(self):
LOG.info("Begin init_container")
for module in INIT_CONTAINER_ORDER:
LOG.info("Init %s", module)
ret = self._call_module(module, "init_container")
if ret != 0:
LOG.error("Init %s failed", module)
return 1
LOG.info("Init %s success", module)
LOG.info("init_container completed successfully")
return 0
def certificate(self, *args):
LOG.info("Begin certificate operations")
cert_script = os.path.join(
CUR_DIR, "implement", "certificate_update_and_revocation.py")
cmd = f"python3 -B {cert_script} " + " ".join(args)
ret, _, stderr = exec_popen(cmd)
if ret != 0:
LOG.error("Certificate operation failed: %s", stderr)
return ret
def config_opt(self, *args):
LOG.info("Begin config_opt")
script = os.path.join(CUR_DIR, "implement", "config_opt.py")
cmd = f"python3 -B {script} " + " ".join(args)
ret, _, stderr = exec_popen(cmd)
if ret != 0:
LOG.error("config_opt failed: %s", stderr)
return ret
def clear_upgrade_backup(self):
LOG.info("Begin clear_upgrade_backup")
script = os.path.join(CUR_DIR, "upgrade", "clear_upgrade_backup.py")
ret, _, stderr = exec_popen(f"python3 {script}")
if ret != 0:
LOG.error("clear_upgrade_backup failed: %s", stderr)
return 1
LOG.info("clear_upgrade_backup success")
return 0
def _reload_config(self):
from config import reset_config, get_config
reset_config()
self.cfg = get_config()
self.deploy = self.cfg.deploy
self.paths = self.cfg.paths
self._sync_deploy_attrs()
def _init_user_and_group(self):
LOG.info("Initializing users and groups")
svc_user = self.ograc_user
svc_group = self.ograc_group
common_group = self.ograc_common_group
ogmgr = self.ogmgr_user
self._ensure_group(svc_group)
self._ensure_group(common_group)
self._ensure_user(svc_user, svc_group, f"/home/{svc_user}")
self._ensure_user(ogmgr, svc_group, f"/home/{ogmgr}")
exec_popen(f"usermod -a -G {common_group} {svc_user}")
exec_popen(f"usermod -a -G {common_group} {ogmgr}")
import pwd
for u in (svc_user, ogmgr):
try:
home = pwd.getpwnam(u).pw_dir
except KeyError:
continue
if os.path.isdir(home):
exec_popen(f"chown {u}:{svc_group} {home}")
os.chmod(home, 0o700)
@staticmethod
def _ensure_group(name):
ret, _, _ = exec_popen(f"getent group {name}")
if ret == 0:
return
ret, stdout, stderr = exec_popen(f"groupadd {name}")
if ret != 0:
LOG.error("groupadd %s failed: %s %s", name, stdout, stderr)
raise RuntimeError(f"groupadd {name} failed: {stderr}")
@staticmethod
def _ensure_user(name, group, home):
ret, _, _ = exec_popen(f"id {name}")
if ret == 0:
return
ret, stdout, stderr = exec_popen(
f"useradd {name} -g {group} -m -d {home} -s /sbin/nologin")
if ret != 0:
LOG.error("useradd %s failed: %s %s", name, stdout, stderr)
raise RuntimeError(f"useradd {name} failed: {stderr}")
@staticmethod
def _append_user_to_existing_group(user, group):
ret, _, _ = exec_popen(f"getent group {group}")
if ret != 0:
LOG.info("Skip adding %s to optional group %s: group not found", user, group)
return
ret, stdout, stderr = exec_popen(f"usermod -a -G {group} {user}")
if ret != 0:
LOG.warning("Failed to add %s to group %s: %s %s", user, group, stdout, stderr)
def _correct_files_mod(self):
LOG.info("Correcting file permissions")
pkg_dir = os.path.dirname(CUR_DIR)
batch_400 = [
(CUR_DIR, 1), (os.path.join(pkg_dir, "config"), 1),
(os.path.join(pkg_dir, "common"), None),
(os.path.join(CUR_DIR, "implement"), None),
(os.path.join(CUR_DIR, "logic"), None),
(os.path.join(CUR_DIR, "storage_operate"), None),
(os.path.join(CUR_DIR, "inspection"), None),
(os.path.join(CUR_DIR, "wsr"), None),
(os.path.join(CUR_DIR, "wsr_report"), None),
]
for d, depth in batch_400:
if not os.path.isdir(d):
continue
cmd = f'find "{d}"/'
if depth:
cmd += f' -maxdepth {depth}'
cmd += ' -type f -print0 | xargs -0 chmod 400'
exec_popen(cmd)
exec_popen(f'find "{CUR_DIR}"/ -maxdepth 1 -type f -name "*.py" -exec chmod 644 {{}} +')
exec_popen(f'find "{CUR_DIR}"/ -maxdepth 1 -type f -name "*.sh" -exec chmod 755 {{}} +')
sub_dirs = [d for d in os.listdir(CUR_DIR)
if os.path.isdir(os.path.join(CUR_DIR, d)) and not d.startswith('.')]
for d in sub_dirs:
full = os.path.join(CUR_DIR, d)
exec_popen(f'find "{full}" -type d -exec chmod 755 {{}} +')
exec_popen(f'find "{full}" -type f -name "*.py" -exec chmod 644 {{}} +')
exec_popen(f'find "{full}" -type f -name "*.sh" -exec chmod 755 {{}} +')
exec_popen(f'find "{full}" -type f -name "*.json" -exec chmod 644 {{}} +')
for d in (CUR_DIR, os.path.join(CUR_DIR, "logic")):
if os.path.isdir(d):
exec_popen(f'find "{d}"/ -maxdepth 1 -type d -print0 | xargs -0 chmod 755')
special = {
os.path.join(pkg_dir, "common"): 0o755,
CUR_DIR: 0o755,
os.path.join(pkg_dir, "config"): 0o755,
os.path.join(CUR_DIR, "config_params_lun.json"): 0o755,
os.path.join(pkg_dir, "config", "deploy_param.json"): 0o644,
os.path.join(pkg_dir, "config", "dr_deploy_param.json"): 0o644,
os.path.join(pkg_dir, "versions.yml"): 0o644,
os.path.join(CUR_DIR, "inspection"): 0o750,
}
for path, mode in special.items():
if os.path.exists(path):
try:
os.chmod(path, mode)
except OSError:
pass
def _create_common_dirs(self):
LOG.info("Creating common directories")
user_group = f"{self.ograc_user}:{self.ograc_group}"
ensure_dir(os.path.join(self.paths.ograc_home, "image"), mode=0o755)
ensure_dir(os.path.join(self.paths.common_dir, "data"), mode=0o750)
ensure_dir(os.path.join(self.paths.common_dir, "socket"), mode=0o755)
ensure_dir(self.paths.common_config_dir, mode=0o755)
data_root = self.paths.data_root
data_local = self.paths.data_local
ensure_dir(data_root, mode=0o755)
ensure_dir(data_local, mode=0o755)
ograc_data_base = os.path.join(data_local, "ograc")
for sub in ("", "tmp", os.path.join("tmp", "data")):
d = os.path.join(ograc_data_base, sub) if sub else ograc_data_base
ensure_dir(d, mode=0o750)
exec_popen(f"chown -R {user_group} {ograc_data_base}")
exec_popen(f"chown {user_group} {self.paths.ograc_home}")
exec_popen(f"chown {user_group} {data_local}")
def _install_ograc_package(self):
"""Install oGRAC package."""
LOG.info("Installing oGRAC package")
tar_pattern = os.path.join(os.path.dirname(CUR_DIR), "repo", "ograc-*.tar.gz")
import glob
tar_files = glob.glob(tar_pattern)
if not tar_files:
LOG.error("oGRAC tar.gz not found")
return 1
install_base = os.path.join(self.paths.ograc_home, "image")
ensure_dir(install_base, mode=0o755, user=self.ograc_user, group=self.ograc_group)
run_cmd(f"tar -zxf {tar_files[0]} -C {install_base}", "failed to extract ograc package")
run_cmd(f"chmod 755 {install_base}", "failed to chmod install_base")
unpack_path = os.path.join(
install_base, "ograc_connector", "ogracKernel",
"oGRAC-DATABASE-LINUX-64bit", "oGRAC-RUN-LINUX-64bit.tar.gz")
if os.path.exists(unpack_path):
run_cmd(f"tar -zxf {unpack_path} -C {install_base}", "failed to extract oGRAC-RUN package")
rpm_path = os.path.join(install_base, "oGRAC-RUN-LINUX-64bit")
if os.path.isdir(rpm_path):
run_cmd(
f"chown {self.ograc_user}:{self.ograc_group} -hR {rpm_path}",
"failed to chown oGRAC-RUN package")
run_cmd(f'find "{rpm_path}" -type d -exec chmod 750 {{}} +',
"failed to chmod oGRAC-RUN dirs")
run_cmd(f'find "{rpm_path}" -type f -exec chmod 640 {{}} +',
"failed to chmod oGRAC-RUN files")
run_cmd(f"chmod 755 {install_base}", "failed to chmod image dir")
return 0
def _copy_resources(self):
"""Copy action/ directory to install path (shared modules, component subdirs, config/, common/)."""
LOG.info("Copying resources to install path")
if os.path.isfile(self.paths.rpm_flag):
return
action_dst = self.paths.action_dir
ensure_dir(action_dst, mode=0o755)
_SKIP = {"__pycache__"}
for item in os.listdir(CUR_DIR):
if item in _SKIP:
continue
src = os.path.join(CUR_DIR, item)
dst = os.path.join(action_dst, item)
copy_tree(src, dst, skip_names=_SKIP)
pkg_dir = os.path.dirname(CUR_DIR)
for extra in ("config", "common"):
src = os.path.join(pkg_dir, extra)
dst = os.path.join(self.paths.ograc_home, extra)
if os.path.isdir(src):
copy_tree(src, dst)
versions_src = os.path.join(pkg_dir, "versions.yml")
if os.path.isfile(versions_src):
shutil.copy2(versions_src, os.path.join(self.paths.ograc_home, "versions.yml"))
repo_src = os.path.join(pkg_dir, "repo")
if os.path.isdir(repo_src):
copy_tree(repo_src, os.path.join(self.paths.ograc_home, "repo"))
pkg_marker = os.path.join(action_dst, ".deploy_pkg_dir")
with open(pkg_marker, "w") as f:
f.write(os.path.dirname(CUR_DIR))
self._fix_action_permissions(action_dst)
def _fix_action_permissions(self, action_dst):
"""Set ownership and permissions for action scripts (ograc_user for dirs/py, root for appctl.sh)."""
user_group = f"{self.ograc_user}:{self.ograc_group}"
exec_popen(f'find "{action_dst}" -type d -exec chmod 755 {{}} +')
exec_popen(f'find "{action_dst}" -type f -name "*.py" -exec chmod 644 {{}} +')
exec_popen(f'find "{action_dst}" -type f -name "*.sh" -exec chmod 755 {{}} +')
exec_popen(f'find "{action_dst}" -type f -name "*.json" -exec chmod 644 {{}} +')
exec_popen(f'chown -R {user_group} "{action_dst}"')
for module in ("cms", "dss", "ograc", "og_om", "ograc_exporter",
"logicrep", "docker"):
appctl = os.path.join(action_dst, module, "appctl.sh")
if os.path.isfile(appctl):
exec_popen(f"chown root:root {appctl}")
def _fix_common_permissions(self):
"""Align common/script permissions with the legacy deploy flow."""
common_script_dir = self.paths.common_script_dir
logs_handler_dir = os.path.join(common_script_dir, "logs_handler")
logs_tool_dir = os.path.join(logs_handler_dir, "logs_tool")
compress_script = os.path.join(logs_handler_dir, "do_compress_and_archive.py")
if os.path.isdir(common_script_dir):
os.chmod(common_script_dir, 0o755)
if os.path.isdir(logs_handler_dir):
os.chmod(logs_handler_dir, 0o755)
if os.path.isdir(logs_tool_dir):
os.chmod(logs_tool_dir, 0o700)
if os.path.isfile(compress_script):
try:
uid = pwd.getpwnam(self.ograc_user).pw_uid
gid = grp.getgrnam(self.ograc_group).gr_gid
os.chown(compress_script, uid, gid)
os.chmod(compress_script, 0o440)
except (KeyError, OSError) as err:
raise RuntimeError(
f"failed to fix logs handler permissions for {compress_script}: {err}"
) from err
def _mount_fs(self):
"""Mount file systems."""
if self.ograc_in_container != "0":
return
if self.deploy_mode in ("dbstor", "dss"):
return
LOG.info("Mounting file systems")
storage_share_fs = self.deploy.storage_share_fs
storage_archive_fs = self.deploy.storage_archive_fs
storage_metadata_fs = self.deploy.storage_metadata_fs
if storage_metadata_fs:
metadata_dir = self.paths.metadata_path(storage_metadata_fs)
ensure_dir(metadata_dir, mode=0o755)
metadata_ip = self.deploy.get("metadata_logic_ip")
if metadata_ip:
exec_popen(
f"mount -t nfs -o timeo=50,nosuid,nodev "
f"{metadata_ip}:/{storage_metadata_fs} {metadata_dir}")
if storage_archive_fs:
archive_dir = self.paths.archive_path(storage_archive_fs)
ensure_dir(archive_dir, mode=0o750)
archive_ip = self.deploy.get("archive_logic_ip")
if archive_ip:
exec_popen(
f"mount -t nfs -o timeo=50,nosuid,nodev "
f"{archive_ip}:/{storage_archive_fs} {archive_dir}")
if self.deploy_mode == "file" and storage_share_fs:
share_dir = self.paths.share_path(storage_share_fs)
ensure_dir(share_dir, mode=0o750)
share_ip = self.deploy.get("share_logic_ip")
if share_ip:
exec_popen(
f"mount -t nfs -o vers=4.0,timeo=50,nosuid,nodev "
f"{share_ip}:/{storage_share_fs} {share_dir}")
def _umount_fs(self):
"""Unmount file systems."""
if self.ograc_in_container != "0":
return
LOG.info("Unmounting file systems")
for fs_type in ("share", "archive", "metadata", "storage"):
fs_name = self.deploy.get(f"storage_{fs_type}_fs", "")
if fs_name:
mount_point = os.path.join(
self.paths.data_remote, f"{fs_type}_{fs_name}")
exec_popen(f"umount -f -l {mount_point} > /dev/null 2>&1")
safe_remove(mount_point)
def _copy_certificate(self):
"""Copy certificate files."""
if self.ograc_in_container != "0":
return
LOG.info("Copying certificates")
cert_dir = self.paths.certificates_dir
safe_remove(cert_dir)
ensure_dir(cert_dir, mode=0o700)
for key, name in [("ca_path", "ca.crt"), ("crt_path", "mes.crt"),
("key_path", "mes.key")]:
src = self.deploy.get(key)
if src and os.path.exists(src):
copy_tree(src, os.path.join(cert_dir, name))
chown_recursive(cert_dir, self.ograc_user, self.ograc_group)
def _config_security_limits(self):
"""Configure /etc/security/limits.conf."""
LOG.info("Configuring security limits")
limits_file = "/etc/security/limits.conf"
if not os.path.exists(limits_file):
return
entries = [
f"{self.ograc_user} hard nice -20",
f"{self.ograc_user} soft nice -20",
"* soft memlock unlimited",
"* hard memlock unlimited",
]
try:
with open(limits_file, "r") as f:
content = f.read()
for entry in entries:
if entry not in content:
with open(limits_file, "a") as f:
f.write(f"\n{entry}")
except OSError as e:
LOG.warning("Failed to configure limits: %s", e)
def _clear_security_limits(self):
"""Clear limits entries written by this instance."""
LOG.info("Clearing security limits")
limits_file = "/etc/security/limits.conf"
if not os.path.exists(limits_file):
return
patterns = [
f"/^{self.ograc_user} hard nice -20$/d",
f"/^{self.ograc_user} soft nice -20$/d",
f"/^{self.ograc_user} hard nofile /d",
f"/^{self.ograc_user} soft nofile /d",
]
for pattern in patterns:
exec_popen(f"sed -i '{pattern}' {limits_file}")
def _show_version(self):
"""Write version info to /usr/local/bin/show."""
LOG.info("Writing version info")
versions_file = self.paths.versions_yml
version = read_version(versions_file)
show_script = "/usr/local/bin/show"
try:
with open(show_script, "w") as f:
f.write(f"""#!/bin/bash
sn=$(dmidecode -s system-uuid)
name=$(cat /etc/hostname)
echo "SN : ${{sn}}"
echo "System Name : ${{name}}"
echo "Product Model : ograc"
echo "Product Version : {version}"
""")
os.chmod(show_script, 0o550)
except OSError:
pass
def _start_daemon(self):
"""Start daemon."""
LOG.info("Starting daemon")
cms_reg = os.path.join(self.paths.action_dir, "cms", "cms_reg.sh")
run_as_user(f"sh {cms_reg} enable", self.ograc_user)
daemon_script = self.paths.ograc_service_script
if os.path.exists(daemon_script):
exec_popen(f"sh {daemon_script} start")
def _stop_daemon(self):
"""Stop daemon."""
LOG.info("Stopping daemon")
daemon_script = self.paths.ograc_service_script
self._kill_user_processes(f"sh {daemon_script} start")
if os.path.exists(daemon_script):
exec_popen(f"sh {daemon_script} stop")
def _kill_user_processes(self, pattern, user=None):
target_user = user or self.ograc_user
cmd = (
f"ps -u {target_user} -o pid=,args= | grep '{pattern}' | "
"grep -v grep | awk '{print $1}'"
)
ret, stdout, _ = exec_popen(cmd)
if ret != 0 or not stdout.strip():
return
for pid in stdout.strip().splitlines():
exec_popen(f"kill -9 {pid}")
def _is_daemon_running(self):
cmd = (
f"ps -u {self.ograc_user} -o pid=,args= | "
f"grep '{self.paths.ograc_daemon_script}' | grep -v grep"
)
ret, stdout, _ = exec_popen(cmd)
return ret == 0 and bool(stdout.strip())
def _install_systemd_units(self):
"""Generate systemd unit files from cfg.paths."""
service_script = self.paths.ograc_service_script
logs_script = os.path.join(self.paths.common_script_dir, "logs_handler", "execute.py")
env_lines = self._build_systemd_env_lines()
units = {
self.paths.daemon_service_unit: (
"[Unit]\n"
f"Description=ograc daemon service ({self.paths.instance_tag})\n\n"
"[Service]\n"
"Type=oneshot\n"
"KillMode=process\n"
f"WorkingDirectory={self.paths.ograc_home}\n"
f"{env_lines}"
f"ExecStart=/bin/bash {service_script} start\n"
),
self.paths.daemon_timer_unit: (
"[Unit]\n"
f"Description=Run daemon guard every 60s ({self.paths.instance_tag})\n\n"
"[Timer]\n"
f"Unit={self.paths.daemon_service_unit}\n"
"OnBootSec=2min\n"
"OnUnitActiveSec=60s\n\n"
"[Install]\n"
"WantedBy=multi-user.target\n"
),
self.paths.logs_service_unit: (
"[Unit]\n"
f"Description=regularly clean up ograc logs ({self.paths.instance_tag})\n\n"
"[Service]\n"
"Type=oneshot\n"
"KillMode=process\n"
f"WorkingDirectory={self.paths.ograc_home}\n"
f"{env_lines}"
f"ExecStart=/usr/bin/python3 {logs_script}\n"
),
self.paths.logs_timer_unit: (
"[Unit]\n"
f"Description=Run logs handler every 60min ({self.paths.instance_tag})\n\n"
"[Timer]\n"
f"Unit={self.paths.logs_service_unit}\n"
"OnBootSec=2min\n"
"OnUnitActiveSec=60min\n\n"
"[Install]\n"
"WantedBy=multi-user.target\n"
),
}
for name, content in units.items():
path = os.path.join("/etc/systemd/system", name)
try:
with open(path, "w") as f:
f.write(content)
os.chmod(path, 0o644)
except OSError as e:
LOG.warning("Failed to write %s: %s", path, e)
def _build_systemd_env_lines(self):
env_map = {
"OGRAC_HOME": self.paths.ograc_home,
"OGRAC_DATA_ROOT": self.paths.data_root,
"OGRAC_ACTION_DIR": self.paths.action_dir,
"COMMON_SCRIPT_DIR": self.paths.common_script_dir,
"OGRAC_USER": self.ograc_user,
"OGRAC_GROUP": self.ograc_group,
"OGRAC_INSTANCE_TAG": self.paths.instance_tag,
"OGRAC_DAEMON_SERVICE": self.paths.daemon_service_unit,
"OGRAC_DAEMON_TIMER": self.paths.daemon_timer_unit,
"OGRAC_LOGS_SERVICE": self.paths.logs_service_unit,
"OGRAC_LOGS_TIMER": self.paths.logs_timer_unit,
"DEPLOY_DAEMON_LOG": self.paths.deploy_daemon_log,
"NFS_PORT": str(self.deploy.nfs_port),
}
lines = []
for key, value in env_map.items():
escaped = str(value).replace("\\", "\\\\").replace('"', '\\"')
lines.append(f'Environment="{key}={escaped}"\n')
return "".join(lines)
def _cleanup_legacy_systemd_units(self):
legacy_pairs = (
("ograc.service", "ograc.timer"),
("ograc_logs_handler.service", "ograc_logs_handler.timer"),
)
for service_name, timer_name in legacy_pairs:
service_path = os.path.join("/etc/systemd/system", service_name)
if not os.path.isfile(service_path):
continue
try:
with open(service_path, encoding="utf-8") as f:
content = f.read()
except OSError:
continue
if self.paths.ograc_home not in content:
continue
exec_popen(f"systemctl stop {timer_name}")
exec_popen(f"systemctl disable {timer_name}")
safe_remove(os.path.join("/etc/systemd/system", timer_name))
safe_remove(service_path)
@staticmethod
def _run_systemctl(command, unit_name, allow_fail=False):
ret, stdout, stderr = exec_popen(f"systemctl {command} {unit_name}")
if ret == 0:
return True
message = stderr or stdout or "unknown error"
if allow_fail:
LOG.warning("systemctl %s %s failed: %s", command, unit_name, message)
return False
raise RuntimeError(f"systemctl {command} {unit_name} failed: {message}")
def _check_systemd_timers_ready(self):
ready = True
for timer in (self.paths.daemon_timer_unit, self.paths.logs_timer_unit):
active_ok = self._run_systemctl("is-active", timer, allow_fail=True)
enabled_ok = self._run_systemctl("is-enabled", timer, allow_fail=True)
if not active_ok:
LOG.error("%s is not active", timer)
if not enabled_ok:
LOG.error("%s is not enabled", timer)
ready = ready and active_ok and enabled_ok
return ready
def _start_systemd_timers(self):
if self.ograc_in_container != "0":
return
LOG.info("Installing and starting systemd timers")
self._cleanup_legacy_systemd_units()
self._install_systemd_units()
ret, stdout, stderr = exec_popen("systemctl daemon-reload")
if ret != 0:
raise RuntimeError(f"systemctl daemon-reload failed: {stderr or stdout}")
for timer in (self.paths.daemon_timer_unit, self.paths.logs_timer_unit):
self._run_systemctl("start", timer)
self._run_systemctl("enable", timer)
def _stop_systemd_timers(self):
LOG.info("Stopping systemd timers")
ret, stdout, stderr = exec_popen("systemctl daemon-reload")
if ret != 0:
LOG.warning("systemctl daemon-reload failed while stopping timers: %s", stderr or stdout)
for timer in (self.paths.daemon_timer_unit, self.paths.logs_timer_unit):
self._run_systemctl("stop", timer, allow_fail=True)
self._run_systemctl("disable", timer, allow_fail=True)
def _init_limits_config(self):
"""Configure openfile limits."""
limits_file = "/etc/security/limits.conf"
open_file_num = 102400
update_limits_conf_user_nofile(
limits_file, self.ograc_user, open_file_num
)
def _clear_residual_files(self):
"""Clear residual files."""
LOG.info("Clearing residual files")
storage_metadata_fs = self.deploy.storage_metadata_fs
if storage_metadata_fs:
metadata_dir = self.paths.metadata_path(storage_metadata_fs)
safe_remove(os.path.join(metadata_dir, "upgrade"))
safe_remove(os.path.join(metadata_dir, "upgrade.lock"))
if str(self.node_id) == "0":
safe_remove(os.path.join(metadata_dir, "deploy_param.json"))
safe_remove(os.path.join(metadata_dir, "dr_deploy_param.json"))
safe_remove(os.path.join(metadata_dir, "versions.yml"))
safe_remove("/opt/backup_note")
@staticmethod
def _cleanup_sysv_ipc_for_user(user, force=False):
ipc_types = [
("-m", "ipcrm -m", "shm", 5),
("-s", "ipcrm -s", "sem", -1),
("-q", "ipcrm -q", "msg", -1),
]
for list_flag, rm_prefix, label, nattch_col in ipc_types:
ret, stdout, stderr = exec_popen(f"ipcs {list_flag}")
if ret != 0:
LOG.warning("Failed to list %s: %s%s", label, stdout, stderr)
continue
for line in stdout.splitlines():
parts = line.split()
if len(parts) < 3 or parts[0] == "key" or parts[0].startswith("------"):
continue
owner = parts[2]
ipc_id = parts[1]
if owner != user:
continue
if nattch_col >= 0 and len(parts) > nattch_col:
nattch = parts[nattch_col]
if nattch != "0" and not force:
LOG.info("Skip attached %s %s owned by %s", label, ipc_id, user)
continue
if nattch != "0":
LOG.info("Force removing attached %s %s owned by %s (nattch=%s)",
label, ipc_id, user, nattch)
ret, _, err = exec_popen(f"{rm_prefix} {ipc_id}")
if ret != 0:
LOG.warning("Failed to remove %s %s for %s: %s", label, ipc_id, user, err)
else:
LOG.info("Removed %s %s for %s", label, ipc_id, user)
@staticmethod
def _kill_all_user_processes(user):
ret, _, _ = exec_popen(f"id -u {user}")
if ret != 0:
return
exec_popen(f"pkill -9 -u {user}")
for _ in range(10):
ret, stdout, _ = exec_popen(f"ps -u {user} -o pid= 2>/dev/null")
if ret != 0 or not stdout.strip():
return
time.sleep(0.5)
LOG.warning("Some processes of user %s are still alive after kill", user)
def _cleanup_instance_ipc(self, force=False):
if force:
for user in (self.ograc_user, self.ogmgr_user):
self._kill_all_user_processes(user)
for user in (self.ograc_user, self.ogmgr_user):
self._cleanup_sysv_ipc_for_user(user, force=force)
safe_remove(os.path.join("/dev/shm", user))
def _cleanup_override(self):
"""Cleanup for override mode."""
LOG.info("Cleaning up override resources")
if self.ograc_in_container == "0":
self._stop_systemd_timers()
safe_remove(os.path.join(self.paths.common_dir, "data"))
safe_remove(os.path.join(self.paths.common_dir, "socket"))
safe_remove(self.paths.common_config_dir)
self._cleanup_instance_ipc(force=True)
for user in (self.ograc_user, self.ogmgr_user):
exec_popen(f"id -u {user} > /dev/null 2>&1 && userdel -rf {user}")
exec_popen(f"groupdel -f {self.ograc_common_group} > /dev/null 2>&1")
safe_remove(os.path.join("/etc/systemd/system", self.paths.daemon_timer_unit))
safe_remove(os.path.join("/etc/systemd/system", self.paths.daemon_service_unit))
safe_remove(os.path.join("/etc/systemd/system", self.paths.logs_timer_unit))
safe_remove(os.path.join("/etc/systemd/system", self.paths.logs_service_unit))
safe_remove("/usr/local/bin/show")
exec_popen("systemctl daemon-reload")
self._remove_instance_home()
def _remove_instance_home(self):
real_home = os.path.realpath(self.paths.ograc_home)
if not os.path.isdir(real_home):
return
protected_paths = {
"/", "/opt", "/home", "/usr", "/var", "/tmp", "/mnt", "/mnt/dbdata",
}
if real_home in protected_paths:
LOG.warning("Skip removing protected ograc_home: %s", real_home)
return
if len([part for part in real_home.split(os.sep) if part]) < 2:
LOG.warning("Skip removing shallow ograc_home: %s", real_home)
return
action_dir = os.path.realpath(self.paths.action_dir)
if not action_dir.startswith(real_home + os.sep):
LOG.warning("Skip removing unexpected ograc_home: %s", real_home)
return
quoted_home = shlex.quote(real_home)
remover_cmd = f"sleep 1; rm -rf -- {quoted_home}"
try:
with open(os.devnull, "wb") as devnull:
subprocess.Popen(
["/bin/bash", "-c", remover_cmd],
cwd="/",
stdin=devnull,
stdout=devnull,
stderr=devnull,
start_new_session=True,
)
except OSError as err:
LOG.warning("Failed to schedule ograc_home removal %s: %s", real_home, err)
return
LOG.info("Scheduled removal of current instance ograc_home: %s", real_home)
_KNOWN_INSTALL_TYPES = {"override", "reserve"}
def _parse_install_args(args):
"""Parse (install_type, config_file) from various appctl.sh call conventions."""
if len(args) >= 2:
return args[0], args[1]
if len(args) == 1:
if args[0] in _KNOWN_INSTALL_TYPES:
return args[0], ""
return "override", args[0]
return "override", ""
def _parse_uninstall_args(args):
"""Parse (uninstall_type, force_type) from various appctl.sh call conventions."""
if len(args) >= 2:
return args[0], args[1]
if len(args) == 1:
if args[0] in _KNOWN_INSTALL_TYPES:
return args[0], ""
return "override", args[0]
return "override", ""
def main():
if len(sys.argv) < 2:
print("Usage: ograc_deploy.py <action> [args...]")
sys.exit(1)
action = sys.argv[1]
args = sys.argv[2:]
deployer = OgracDeploy()
action_map = {
"pre_install": lambda: deployer.pre_install(*_parse_install_args(args)),
"install": lambda: deployer.install(*_parse_install_args(args)),
"start": lambda: deployer.start(args[0] if args else ""),
"stop": lambda: deployer.stop(),
"uninstall": lambda: deployer.uninstall(*_parse_uninstall_args(args)),
"check_status": lambda: deployer.check_status(),
"backup": lambda: deployer.backup(),
"restore": lambda: deployer.restore(),
"init_container": lambda: deployer.init_container(),
"certificate": lambda: deployer.certificate(*args),
"config_opt": lambda: deployer.config_opt(*args),
"clear_upgrade_backup": lambda: deployer.clear_upgrade_backup(),
"pre_upgrade": lambda: _run_upgrade_action("pre_upgrade", args),
"upgrade": lambda: _run_upgrade_action("upgrade", args),
"upgrade_commit": lambda: _run_upgrade_action("upgrade_commit", args),
"rollback": lambda: _run_upgrade_action("rollback", args),
"check_point": lambda: _run_upgrade_action("check_point", args),
}
handler = action_map.get(action)
if handler is None:
print(f"Unknown action: {action}")
sys.exit(1)
try:
ret = handler()
sys.exit(ret or 0)
except Exception as e:
LOG.error("Action %s failed: %s", action, str(e))
sys.exit(1)
def _run_upgrade_action(action, args):
"""Delegate upgrade actions to ograc_upgrade.py."""
upgrade_script = os.path.join(CUR_DIR, "upgrade", "ograc_upgrade.py")
cmd = f"python3 {upgrade_script} {action} " + " ".join(args)
ret, _, stderr = exec_popen(cmd, timeout=7200)
if ret != 0:
LOG.error("%s failed: %s", action, stderr)
return ret
if __name__ == "__main__":
main()