✏️ 正在编辑: storage.py
路径:
/opt/cloudlinux/venv/lib/python3.11/site-packages/ssa/modules/storage.py
提示:
您可以编辑任何文件(包括二进制文件),但请注意不当修改可能导致文件损坏。
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ Represents storage where ssa data is collected, stored and extracted """ import itertools from dataclasses import dataclass from typing import List, Iterator, Tuple, Dict import sqlalchemy from sqlalchemy import func, cast, distinct from ssa.db import session_scope, RequestResult @dataclass class DomainData: domain_name: str domain_total_reqs: List[int] is_a_wordpress_domain: bool urls_number: int def iter_domains_data(engine) -> Iterator[DomainData]: """ Iterates data from database domain-by-domain. """ with session_scope(engine) as db: results_by_hour = db.query( RequestResult.domain, func.strftime('%H', RequestResult.created_at), func.Count(RequestResult.id), func.max(RequestResult.wordpress), func.count(distinct(RequestResult.path)) ).group_by( RequestResult.domain, func.strftime('%H', RequestResult.created_at) ).order_by( RequestResult.domain, func.strftime('%H', RequestResult.created_at) ) results_by_hour_grouped = itertools.groupby(results_by_hour, key=lambda item: item[0]) for domain_name, group in results_by_hour_grouped: domain_results_by_hour = tuple(group) urls_number = 0 # at some hours there may be no requests # so we must normalize data to match 24h data format requests_number_by_hour = [0] * 24 for _, hour, requests_num, is_wordpress, urls in domain_results_by_hour: requests_number_by_hour[int(hour)] = requests_num urls_number = max(urls_number, urls) yield DomainData( domain_name=domain_name, domain_total_reqs=requests_number_by_hour, is_a_wordpress_domain=is_wordpress, urls_number=urls_number ) def iter_urls_data(engine, domain_name, all_paths): """ Iterates urls data from database url-by-url. """ with session_scope(engine) as db: urls_data = db.query( RequestResult.path, func.strftime('%H', RequestResult.created_at), func.Sum(cast( RequestResult.hitting_limits, sqlalchemy.Integer )).label('url_throttled_reqs'), func.Count( RequestResult.id ).label('url_total_reqs'), func.Sum(cast( RequestResult.is_slow_request, sqlalchemy.Integer) ).label('url_slow_reqs') ).filter( RequestResult.domain == domain_name ).filter( RequestResult.path.in_(all_paths) ).group_by( RequestResult.path, func.strftime('%H', RequestResult.created_at) ).order_by( RequestResult.path, func.strftime('%H', RequestResult.created_at) ) previous_path = None url_throttled_reqs, url_total_reqs, url_slow_reqs = \ [0] * 24, [0] * 24, [0] * 24 for path, hour, url_throttled_req, url_total_req, url_slow_req in urls_data: if previous_path and previous_path != path: yield previous_path, dict( path=previous_path, url_throttled_reqs=url_throttled_reqs, url_total_reqs=url_total_reqs, url_slow_reqs=url_slow_reqs ) url_throttled_reqs, url_total_reqs, url_slow_reqs = \ [0] * 24, [0] * 24, [0] * 24 url_throttled_reqs[int(hour)] = url_throttled_req url_total_reqs[int(hour)] = url_total_req url_slow_reqs[int(hour)] = url_slow_req previous_path = path yield path, dict( path=path, url_throttled_reqs=url_throttled_reqs, url_total_reqs=url_total_reqs, url_slow_reqs=url_slow_reqs ) def get_url_durations(engine, domain_name) -> Dict[str, Tuple[int]]: """ Get information about durations of requests url-by-url. """ with session_scope(engine) as db: urls_data = db.query( RequestResult.path, RequestResult.duration ).filter( RequestResult.domain == domain_name ).order_by( RequestResult.path ) # Use iterator directly to avoid loading all data into RAM durations_by_path = itertools.groupby( urls_data, lambda item: item[0]) for key, group in durations_by_path: yield key, [duration for _, duration in group]
💾 保存文件
← 返回文件管理器