✏️ 正在编辑: wmt-api.py
路径:
/opt/cloudlinux/venv/lib/python3.11/site-packages/wmt/wmt-api.py
提示:
您可以编辑任何文件(包括二进制文件),但请注意不当修改可能导致文件损坏。
#!/opt/cloudlinux/venv/bin/python3 import json import fcntl import os import time import sys import syslog import psutil import signal from argparse import ArgumentParser from datetime import datetime, timedelta from wmt.common.const import ( WMT_SCANNER_SERVICE, WMT_LOCK_FILE, CONFIG_PATH, WMT_DB ) from wmt.common.report import generate_report, report_dict from wmt.db import setup_database from wmt.common.service import set_service_state from wmt.common.notification import Notifier, SupportedNotificationTypes from wmt.common import cfg from cllicense import CloudlinuxLicenseLib from wmt.common.utils import send_report_to_clickhouse, manage_crons def print_result_and_exit(result='success', exit_code=0, **extra): message = { 'result': result, 'timestamp': time.time() } message.update(extra) print(json.dumps(message, indent=2, sort_keys=True)) sys.exit(exit_code) def _audit_log(action, details=''): uid = os.getuid() try: user = os.getlogin() except OSError: user = str(uid) syslog.openlog('wmt-api', syslog.LOG_PID, syslog.LOG_AUTH) syslog.syslog(syslog.LOG_INFO, f'action={action} uid={uid} user={user} {details}') syslog.closelog() def get_status(): if not os.path.exists(WMT_LOCK_FILE): return 'stopped' with open(WMT_LOCK_FILE) as f: try: fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) # locked except OSError: return 'started' return 'stopped' def get_scanner_pid(): if not os.path.exists(WMT_LOCK_FILE): return None with open(WMT_LOCK_FILE) as f: # Probe the lock the same way get_status does. The running daemon # holds LOCK_EX for its entire lifetime, so: # - LOCK_SH succeeds (no contention) -> daemon is NOT running. # Any PID still in the file is stale (crash / SIGKILL) and # may have been reused by an unrelated process; do not # return it (avoids os.kill(stale_pid, SIGUSR1)). # - LOCK_SH fails (EWOULDBLOCK) -> daemon is alive and the PID # in the file is current. Read it without holding the lock; # the truncate-after-lock in save_pid_and_lock guarantees the # file is either empty or a complete PID. try: fcntl.flock(f.fileno(), fcntl.LOCK_SH | fcntl.LOCK_NB) except OSError: data = f.read().strip() else: fcntl.flock(f.fileno(), fcntl.LOCK_UN) return None if not data: return None try: pid = int(data) except ValueError: return None return pid if psutil.pid_exists(pid) else None def get_config(): return cfg.to_dict() def change_config(new_json_config): old_config = cfg.to_dict() config = cfg.modify(new_json_config) changed = {k: {'old': old_config.get(k), 'new': config.get(k)} for k in config if config.get(k) != old_config.get(k)} _audit_log('config-change', f'changed={json.dumps(changed)}') scanner_pid = get_scanner_pid() if scanner_pid is not None: os.kill(scanner_pid, signal.SIGUSR1) return config def run(): args = ArgumentParser() args.add_argument('--config-get', action='store_true') args.add_argument('--config-change', type=str) args.add_argument('--report-get', action='store_true') args.add_argument('--send-clickhouse', action='store_true') # just for 24h report by mail executed by cron args.add_argument('--send-email', action='store_true') args.add_argument('--status', action='store_true') args.add_argument('--start', action='store_true') args.add_argument('--stop', action='store_true') opts = args.parse_args() if opts.config_get: config = get_config() config['default_report_email'] = cfg.default_report_email if config.get("ignore_list"): tmp_val = ",".join(config.get("ignore_list")) config['ignore_list'] = tmp_val print_result_and_exit(config=config) elif opts.config_change: config = change_config(opts.config_change) config['default_report_email'] = cfg.default_report_email print_result_and_exit(config=config) elif opts.report_get: engine = setup_database(readonly=os.path.exists(WMT_DB)) # 24h back from NOW start, end = datetime.now() - timedelta(days=1), datetime.now() report = report_dict(generate_report(engine, start, end)) print_result_and_exit(report=report, date=datetime.now().strftime('%Y-%m-%d %H:%M')) elif opts.send_clickhouse: engine = setup_database(readonly=True) # for the day before start, end = datetime.now().date() - timedelta(days=1), datetime.now().date() send_report_to_clickhouse(report_dict(generate_report(engine, start, end))) _audit_log('send-clickhouse') print_result_and_exit() elif opts.send_email: is_scanner_running = get_status() == 'started' if is_scanner_running and cfg.cfg.summary_notification_enabled: if not CloudlinuxLicenseLib().get_license_status(): print_result_and_exit('CloudLinux license is expired. ' 'You may buy new license here: https://lp.cloudlinux.com/cloudlinux-os-solo', exit_code=1) engine = setup_database() start, end = datetime.now().date() - timedelta(days=1), datetime.now().date() report = generate_report(engine, start, end) Notifier( target_email=cfg.target_email, from_email=cfg.from_email, report=report, notification_type=SupportedNotificationTypes.REPORT, ).notify() _audit_log('send-email', f'target={cfg.target_email}') print_result_and_exit() else: print_result_and_exit('Summary report email will not be sent! ' f'Please, ensure "{WMT_SCANNER_SERVICE}" service is running and ' f'alert_notifications is enabled in "{CONFIG_PATH}"', exit_code=1) elif opts.status: status = get_status() print_result_and_exit(status=status) elif opts.start: # Bugbot 0698f84b: audit AFTER set_service_state succeeds, so a # failure path doesn't record a phantom service-start event. set_service_state(WMT_SCANNER_SERVICE, 'start') manage_crons(status=True) _audit_log('service-start', f'service={WMT_SCANNER_SERVICE}') print_result_and_exit() elif opts.stop: set_service_state(WMT_SCANNER_SERVICE, 'stop') manage_crons(status=False) _audit_log('service-stop', f'service={WMT_SCANNER_SERVICE}') print_result_and_exit() else: args.print_help() def main(): try: run() except Exception as e: print_result_and_exit(result='error', exit_code=1, context=str(e)) if __name__ == '__main__': main()
💾 保存文件
← 返回文件管理器