From f3a0738061e852c8125513f6222b4a5d6ea73270 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B4=BA=E6=9C=89=E5=BF=97?= <1037617413@qq.com> Date: Fri, 25 Oct 2024 15:34:25 +0800 Subject: [PATCH] ai_block_io fix some config parameters parse bug --- .../sentryPlugins/ai_block_io/ai_block_io.py | 70 +++++---- .../ai_block_io/config_parser.py | 135 ++++++++++++++---- .../sentryPlugins/ai_block_io/data_access.py | 14 ++ .../sentryPlugins/ai_block_io/detector.py | 16 ++- .../ai_block_io/sliding_window.py | 2 +- .../sentryPlugins/ai_block_io/threshold.py | 14 +- src/python/sentryPlugins/ai_block_io/utils.py | 2 - 7 files changed, 180 insertions(+), 73 deletions(-) diff --git a/src/python/sentryPlugins/ai_block_io/ai_block_io.py b/src/python/sentryPlugins/ai_block_io/ai_block_io.py index 74f246a..14f740d 100644 --- a/src/python/sentryPlugins/ai_block_io/ai_block_io.py +++ b/src/python/sentryPlugins/ai_block_io/ai_block_io.py @@ -23,6 +23,7 @@ from .data_access import ( get_io_data_from_collect_plug, check_collect_valid, get_disk_type, + check_disk_is_available ) from .io_data import MetricName from .alarm_report import Xalarm, Report @@ -31,14 +32,14 @@ CONFIG_FILE = "/etc/sysSentry/plugins/ai_block_io.ini" def sig_handler(signum, frame): - logging.info("receive signal: %d", signum) Report.report_pass(f"receive signal: {signum}, exiting...") + logging.info("Finished ai_block_io plugin running.") exit(signum) class SlowIODetection: _config_parser = None - _disk_list = None + _disk_list = [] _detector_name_list = defaultdict(list) _disk_detectors = {} @@ -48,32 +49,30 @@ class SlowIODetection: self.__init_detector() def __init_detector_name_list(self): - self._disk_list = check_collect_valid( - self._config_parser.period_time - ) - if self._disk_list is None: - Report.report_pass( - "get available disk error, please check if the collector plug is enable. exiting..." - ) - logging.critical("get available disk error, please check if the collector plug is enable. exiting...") - exit(1) - - logging.info(f"ai_block_io plug has found disks: {self._disk_list}") disks: list = self._config_parser.disks_to_detection stages: list = self._config_parser.stage iotypes: list = self._config_parser.iotype - # 情况1:None,则启用所有磁盘检测 - # 情况2:is not None and len = 0,则不启动任何磁盘检测 - # 情况3:len != 0,则取交集 + if disks is None: - logging.warning( - "you not specify any disk or use default, so ai_block_io will enable all available disk." - ) - for disk in self._disk_list: - if disks is not None: - if disk not in disks: - continue - disks.remove(disk) + logging.warning("you not specify any disk or use default, so ai_block_io will enable all available disk.") + all_available_disk_list = check_collect_valid(self._config_parser.period_time) + if all_available_disk_list is None: + Report.report_pass("get available disk error, please check if the collector plug is enable. exiting...") + logging.critical("get available disk error, please check if the collector plug is enable. exiting...") + exit(1) + if len(all_available_disk_list) == 0: + Report.report_pass("not found available disk. exiting...") + logging.critical("not found available disk. exiting...") + exit(1) + disks = all_available_disk_list + logging.info(f"available disk list is follow: {disks}.") + + for disk in disks: + tmp_disk = [disk] + ret = check_disk_is_available(self._config_parser.period_time, tmp_disk) + if not ret: + logging.warning(f"disk: {disk} is not available, it will be ignored.") + continue disk_type_result = get_disk_type(disk) if disk_type_result["ret"] == 0 and disk_type_result["message"] in ( @@ -89,20 +88,15 @@ class SlowIODetection: disk_type_result, ) continue + self._disk_list.append(disk) for stage in stages: for iotype in iotypes: self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "latency")) self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "io_dump")) - if disks: - logging.warning( - "disks: %s not in available disk list, so they will be ignored.", - disks, - ) + if not self._detector_name_list: + Report.report_pass("the disks to detection is empty, ai_block_io will exit.") logging.critical("the disks to detection is empty, ai_block_io will exit.") - Report.report_pass( - "the disks to detection is empty, ai_block_io will exit." - ) exit(1) def __init_detector(self): @@ -202,16 +196,20 @@ class SlowIODetection: logging.debug("step3. Report slow io event to sysSentry.") for slow_io_event in slow_io_event_list: alarm_content = { + "alarm_source": "ai_block_io", "driver_name": slow_io_event[1], + "io_type": slow_io_event[4], "reason": slow_io_event[2], "block_stack": slow_io_event[3], - "io_type": slow_io_event[4], - "alarm_source": "ai_block_io", "alarm_type": slow_io_event[5], - "details": slow_io_event[6], + "details": slow_io_event[6] } Xalarm.major(alarm_content) - logging.warning("[SLOW IO] " + str(alarm_content)) + tmp_alarm_content = alarm_content.copy() + del tmp_alarm_content["details"] + logging.warning("[SLOW IO] " + str(tmp_alarm_content)) + logging.warning(f"latency: " + str(alarm_content.get("details").get("latency"))) + logging.warning(f"iodump: " + str(alarm_content.get("details").get("iodump"))) # Step4:等待检测时间 logging.debug("step4. Wait to start next slow io event detection loop.") diff --git a/src/python/sentryPlugins/ai_block_io/config_parser.py b/src/python/sentryPlugins/ai_block_io/config_parser.py index 91ec5c6..3049db2 100644 --- a/src/python/sentryPlugins/ai_block_io/config_parser.py +++ b/src/python/sentryPlugins/ai_block_io/config_parser.py @@ -105,21 +105,26 @@ class ConfigParser: ge=None, lt=None, le=None, + section=None ): + if section is not None: + print_key = section + "." + key + else: + print_key = key value = config_items.get(key) if value is None: logging.warning( "config of %s not found, the default value %s will be used.", - key, + print_key, default_value, ) value = default_value if not value: logging.critical( - "the value of %s is empty, ai_block_io plug will exit.", key + "the value of %s is empty, ai_block_io plug will exit.", print_key ) Report.report_pass( - f"the value of {key} is empty, ai_block_io plug will exit." + f"the value of {print_key} is empty, ai_block_io plug will exit." ) exit(1) try: @@ -127,51 +132,51 @@ class ConfigParser: except ValueError: logging.critical( "the value of %s is not a valid %s, ai_block_io plug will exit.", - key, + print_key, value_type, ) Report.report_pass( - f"the value of {key} is not a valid {value_type}, ai_block_io plug will exit." + f"the value of {print_key} is not a valid {value_type}, ai_block_io plug will exit." ) exit(1) if gt is not None and value <= gt: logging.critical( "the value of %s is not greater than %s, ai_block_io plug will exit.", - key, + print_key, gt, ) Report.report_pass( - f"the value of {key} is not greater than {gt}, ai_block_io plug will exit." + f"the value of {print_key} is not greater than {gt}, ai_block_io plug will exit." ) exit(1) if ge is not None and value < ge: logging.critical( "the value of %s is not greater than or equal to %s, ai_block_io plug will exit.", - key, + print_key, ge, ) Report.report_pass( - f"the value of {key} is not greater than or equal to {ge}, ai_block_io plug will exit." + f"the value of {print_key} is not greater than or equal to {ge}, ai_block_io plug will exit." ) exit(1) if lt is not None and value >= lt: logging.critical( "the value of %s is not less than %s, ai_block_io plug will exit.", - key, + print_key, lt, ) Report.report_pass( - f"the value of {key} is not less than {lt}, ai_block_io plug will exit." + f"the value of {print_key} is not less than {lt}, ai_block_io plug will exit." ) exit(1) if le is not None and value > le: logging.critical( "the value of %s is not less than or equal to %s, ai_block_io plug will exit.", - key, + print_key, le, ) Report.report_pass( - f"the value of {key} is not less than or equal to {le}, ai_block_io plug will exit." + f"the value of {print_key} is not less than or equal to {le}, ai_block_io plug will exit." ) exit(1) @@ -188,7 +193,7 @@ class ConfigParser: frequency = self._conf["common"]["period_time"] ret = check_detect_frequency_is_valid(frequency) if ret is None: - log = f"period_time: {frequency} is valid, "\ + log = f"period_time: {frequency} is invalid, "\ f"Check whether the value range is too large or is not an "\ f"integer multiple of period_time.. exiting..." Report.report_pass(log) @@ -202,6 +207,7 @@ class ConfigParser: self._conf["common"]["disk"] = None return disks_to_detection = disks_to_detection.strip() + disks_to_detection = disks_to_detection.lower() if not disks_to_detection: logging.critical("the value of disk is empty, ai_block_io plug will exit.") Report.report_pass( @@ -213,7 +219,18 @@ class ConfigParser: if len(disk_list) == 1 and disk_list[0] == "default": self._conf["common"]["disk"] = None return - self._conf["common"]["disk"] = disk_list + if len(disk_list) > 10: + ten_disk_list = disk_list[0:10] + other_disk_list = disk_list[10:] + logging.warning(f"disk only support maximum is 10, disks: {ten_disk_list} will be retained, other: {other_disk_list} will be ignored.") + else: + ten_disk_list = disk_list + set_ten_disk_list = set(ten_disk_list) + if len(ten_disk_list) > len(set_ten_disk_list): + tmp = ten_disk_list + ten_disk_list = list(set_ten_disk_list) + logging.warning(f"disk exist duplicate, it will be deduplicate, before: {tmp}, after: {ten_disk_list}") + self._conf["common"]["disk"] = ten_disk_list def _read_train_data_duration(self, items_algorithm: dict): self._conf["algorithm"]["train_data_duration"] = self._get_config_value( @@ -244,10 +261,12 @@ class ConfigParser: def _read_algorithm_type_and_parameter(self, items_algorithm: dict): algorithm_type = items_algorithm.get("algorithm_type") - if algorithm_type is not None: - self._conf["algorithm"]["algorithm_type"] = get_threshold_type_enum( - algorithm_type - ) + if algorithm_type is None: + default_algorithm_type = self._conf["algorithm"]["algorithm_type"] + logging.warning(f"algorithm_type not found, it will be set default: {default_algorithm_type}") + else: + self._conf["algorithm"]["algorithm_type"] = get_threshold_type_enum(algorithm_type) + if self._conf["algorithm"]["algorithm_type"] is None: logging.critical( "the algorithm_type: %s you set is invalid. ai_block_io plug will exit.", @@ -257,6 +276,7 @@ class ConfigParser: f"the algorithm_type: {algorithm_type} you set is invalid. ai_block_io plug will exit." ) exit(1) + elif self._conf["algorithm"]["algorithm_type"] == ThresholdType.NSigmaThreshold: self._conf["algorithm"]["n_sigma_parameter"] = self._get_config_value( items_algorithm, @@ -279,9 +299,14 @@ class ConfigParser: ) def _read_stage(self, items_algorithm: dict): - stage_str = items_algorithm.get( - "stage", self.DEFAULT_CONF["common"]["stage"] - ).strip() + stage_str = items_algorithm.get("stage") + if stage_str is None: + stage_str = self.DEFAULT_CONF["common"]["stage"] + logging.warning(f"stage not found, it will be set default: {stage_str}") + else: + stage_str = stage_str.strip() + + stage_str = stage_str.lower() stage_list = stage_str.split(",") stage_list = [stage.strip() for stage in stage_list] if len(stage_list) == 1 and stage_list[0] == "": @@ -307,9 +332,14 @@ class ConfigParser: self._conf["common"]["stage"] = dup_stage_list def _read_iotype(self, items_algorithm: dict): - iotype_str = items_algorithm.get( - "iotype", self.DEFAULT_CONF["common"]["iotype"] - ).strip() + iotype_str = items_algorithm.get("iotype") + if iotype_str is None: + iotype_str = self.DEFAULT_CONF["common"]["iotype"] + logging.warning(f"iotype not found, it will be set default: {iotype_str}") + else: + iotype_str = iotype_str.strip() + + iotype_str = iotype_str.lower() iotype_list = iotype_str.split(",") iotype_list = [iotype.strip() for iotype in iotype_list] if len(iotype_list) == 1 and iotype_list[0] == "": @@ -333,6 +363,13 @@ class ConfigParser: def _read_sliding_window_type(self, items_sliding_window: dict): sliding_window_type = items_sliding_window.get("win_type") + + if sliding_window_type is None: + default_sliding_window_type = self._conf["algorithm"]["win_type"] + logging.warning(f"win_type not found, it will be set default: {default_sliding_window_type}") + return + + sliding_window_type = sliding_window_type.strip() if sliding_window_type is not None: self._conf["algorithm"]["win_type"] = ( get_sliding_window_type_enum(sliding_window_type) @@ -439,6 +476,7 @@ class ConfigParser: int, self.DEFAULT_CONF["latency_sata_ssd"]["read_tot_lim"], gt=0, + section="latency_sata_ssd" ) self._conf["latency_sata_ssd"]["write_tot_lim"] = self._get_config_value( items_latency_sata_ssd, @@ -446,21 +484,32 @@ class ConfigParser: int, self.DEFAULT_CONF["latency_sata_ssd"]["write_tot_lim"], gt=0, + section="latency_sata_ssd" ) self._conf["latency_sata_ssd"]["read_avg_lim"] = self._get_config_value( items_latency_sata_ssd, "read_avg_lim", int, self.DEFAULT_CONF["latency_sata_ssd"]["read_avg_lim"], - gt=0 + gt=0, + section="latency_sata_ssd" ) self._conf["latency_sata_ssd"]["write_avg_lim"] = self._get_config_value( items_latency_sata_ssd, "write_avg_lim", int, self.DEFAULT_CONF["latency_sata_ssd"]["write_avg_lim"], - gt=0 + gt=0, + section="latency_sata_ssd" ) + if self._conf["latency_sata_ssd"]["read_avg_lim"] >= self._conf["latency_sata_ssd"]["read_tot_lim"]: + Report.report_pass("latency_sata_ssd.read_avg_lim must < latency_sata_ssd.read_tot_lim . exiting...") + logging.critical("latency_sata_ssd.read_avg_lim must < latency_sata_ssd.read_tot_lim . exiting...") + exit(1) + if self._conf["latency_sata_ssd"]["write_avg_lim"] >= self._conf["latency_sata_ssd"]["write_tot_lim"]: + Report.report_pass("latency_sata_ssd.write_avg_lim must < latency_sata_ssd.write_tot_lim . exiting...") + logging.critical("latency_sata_ssd.read_avg_lim must < latency_sata_ssd.read_tot_lim . exiting...") + exit(1) else: Report.report_pass("not found latency_sata_ssd section. exiting...") logging.critical("not found latency_sata_ssd section. exiting...") @@ -474,6 +523,7 @@ class ConfigParser: int, self.DEFAULT_CONF["latency_nvme_ssd"]["read_tot_lim"], gt=0, + section="latency_nvme_ssd" ) self._conf["latency_nvme_ssd"]["write_tot_lim"] = self._get_config_value( items_latency_nvme_ssd, @@ -481,21 +531,32 @@ class ConfigParser: int, self.DEFAULT_CONF["latency_nvme_ssd"]["write_tot_lim"], gt=0, + section="latency_nvme_ssd" ) self._conf["latency_nvme_ssd"]["read_avg_lim"] = self._get_config_value( items_latency_nvme_ssd, "read_avg_lim", int, self.DEFAULT_CONF["latency_nvme_ssd"]["read_avg_lim"], - gt=0 + gt=0, + section="latency_nvme_ssd" ) self._conf["latency_nvme_ssd"]["write_avg_lim"] = self._get_config_value( items_latency_nvme_ssd, "write_avg_lim", int, self.DEFAULT_CONF["latency_nvme_ssd"]["write_avg_lim"], - gt=0 + gt=0, + section="latency_nvme_ssd" ) + if self._conf["latency_nvme_ssd"]["read_avg_lim"] >= self._conf["latency_nvme_ssd"]["read_tot_lim"]: + Report.report_pass("latency_nvme_ssd.read_avg_lim must < latency_nvme_ssd.read_tot_lim . exiting...") + logging.critical("latency_nvme_ssd.read_avg_lim must < latency_nvme_ssd.read_tot_lim . exiting...") + exit(1) + if self._conf["latency_nvme_ssd"]["write_avg_lim"] >= self._conf["latency_nvme_ssd"]["write_tot_lim"]: + Report.report_pass("latency_nvme_ssd.write_avg_lim must < latency_nvme_ssd.write_tot_lim . exiting...") + logging.critical("latency_nvme_ssd.write_avg_lim must < latency_nvme_ssd.write_tot_lim . exiting...") + exit(1) else: Report.report_pass("not found latency_nvme_ssd section. exiting...") logging.critical("not found latency_nvme_ssd section. exiting...") @@ -509,6 +570,7 @@ class ConfigParser: int, self.DEFAULT_CONF["latency_sata_hdd"]["read_tot_lim"], gt=0, + section="latency_sata_hdd" ) self._conf["latency_sata_hdd"]["write_tot_lim"] = self._get_config_value( items_latency_sata_hdd, @@ -516,21 +578,32 @@ class ConfigParser: int, self.DEFAULT_CONF["latency_sata_hdd"]["write_tot_lim"], gt=0, + section="latency_sata_hdd" ) self._conf["latency_sata_hdd"]["read_avg_lim"] = self._get_config_value( items_latency_sata_hdd, "read_avg_lim", int, self.DEFAULT_CONF["latency_sata_hdd"]["read_avg_lim"], - gt=0 + gt=0, + section="latency_sata_hdd" ) self._conf["latency_sata_hdd"]["write_avg_lim"] = self._get_config_value( items_latency_sata_hdd, "write_avg_lim", int, self.DEFAULT_CONF["latency_sata_hdd"]["write_avg_lim"], - gt=0 + gt=0, + section="latency_sata_hdd" ) + if self._conf["latency_sata_hdd"]["read_avg_lim"] >= self._conf["latency_sata_hdd"]["read_tot_lim"]: + Report.report_pass("latency_sata_hdd.read_avg_lim must < latency_sata_hdd.read_tot_lim . exiting...") + logging.critical("latency_sata_hdd.read_avg_lim must < latency_sata_hdd.read_tot_lim . exiting...") + exit(1) + if self._conf["latency_sata_hdd"]["write_avg_lim"] >= self._conf["latency_sata_hdd"]["write_tot_lim"]: + Report.report_pass("latency_sata_hdd.write_avg_lim must < latency_sata_hdd.write_tot_lim . exiting...") + logging.critical("latency_sata_hdd.write_avg_lim must < latency_sata_hdd.write_tot_lim . exiting...") + exit(1) else: Report.report_pass("not found latency_sata_hdd section. exiting...") logging.critical("not found latency_sata_hdd section. exiting...") diff --git a/src/python/sentryPlugins/ai_block_io/data_access.py b/src/python/sentryPlugins/ai_block_io/data_access.py index e4869d5..2f2d607 100644 --- a/src/python/sentryPlugins/ai_block_io/data_access.py +++ b/src/python/sentryPlugins/ai_block_io/data_access.py @@ -67,6 +67,20 @@ def check_detect_frequency_is_valid(period): return None +def check_disk_is_available(period_time, disk): + data_raw = is_iocollect_valid(period_time, disk) + if data_raw["ret"] == 0: + try: + data = json.loads(data_raw["message"]) + except Exception as e: + return False + if not data: + return False + return True + else: + return False + + def _get_raw_data(period, disk_list): return get_io_data( period, diff --git a/src/python/sentryPlugins/ai_block_io/detector.py b/src/python/sentryPlugins/ai_block_io/detector.py index e3a0952..496e032 100644 --- a/src/python/sentryPlugins/ai_block_io/detector.py +++ b/src/python/sentryPlugins/ai_block_io/detector.py @@ -75,6 +75,18 @@ class Detector: f' sliding_window_type: {self._slidingWindow}') +def set_to_str(parameter: set): + ret = "" + parameter = list(parameter) + length = len(parameter) + for i in range(length): + if i == 0: + ret += parameter[i] + else: + ret += "," + parameter[i] + return ret + + class DiskDetector: def __init__(self, disk_name: str): @@ -124,7 +136,7 @@ class DiskDetector: alarm_type.add(metric_name.metric_name) latency_wins, iodump_wins = self.get_detector_list_window() - details = f"latency: {latency_wins}, iodump: {iodump_wins}" + details = {"latency": latency_wins, "iodump": iodump_wins} io_press = {"throtl", "wbt", "iocost", "bfq"} driver_slow = {"rq_driver"} @@ -137,7 +149,7 @@ class DiskDetector: elif not kernel_slow.isdisjoint(block_stack): reason = "kernel_slow" - return True, driver_name, reason, str(block_stack), str(io_type), str(alarm_type), details + return True, driver_name, reason, set_to_str(block_stack), set_to_str(io_type), set_to_str(alarm_type), details def __repr__(self): msg = f'disk: {self._disk_name}, ' diff --git a/src/python/sentryPlugins/ai_block_io/sliding_window.py b/src/python/sentryPlugins/ai_block_io/sliding_window.py index 4083c43..ff3fa3b 100644 --- a/src/python/sentryPlugins/ai_block_io/sliding_window.py +++ b/src/python/sentryPlugins/ai_block_io/sliding_window.py @@ -107,7 +107,7 @@ class MedianSlidingWindow(SlidingWindow): if len(self._io_data_queue) < self._queue_length or (self._ai_threshold is None and self._abs_threshold is None): is_slow_io_event = False median = np.median(self._io_data_queue) - if median >= self._ai_threshold: + if (self._ai_threshold is not None and median > self._ai_threshold) or (self._abs_threshold is not None and median > self._abs_threshold): is_slow_io_event = True return (is_slow_io_event, is_abnormal_period), self._io_data_queue, self._ai_threshold, self._abs_threshold, self._avg_lim diff --git a/src/python/sentryPlugins/ai_block_io/threshold.py b/src/python/sentryPlugins/ai_block_io/threshold.py index 600d041..e202bb8 100644 --- a/src/python/sentryPlugins/ai_block_io/threshold.py +++ b/src/python/sentryPlugins/ai_block_io/threshold.py @@ -65,9 +65,12 @@ class Threshold: def __repr__(self): return "Threshold" + def __str__(self): + return "Threshold" + class AbsoluteThreshold(Threshold): - def __init__(self, data_queue_size: int = 10000, data_queue_update_size: int = 1000): + def __init__(self, data_queue_size: int = 10000, data_queue_update_size: int = 1000, **kwargs): super().__init__(data_queue_size, data_queue_update_size) def push_latest_data_to_queue(self, data): @@ -76,6 +79,9 @@ class AbsoluteThreshold(Threshold): def __repr__(self): return "[AbsoluteThreshold]" + def __str__(self): + return "absolute" + class BoxplotThreshold(Threshold): def __init__(self, boxplot_parameter: float = 1.5, data_queue_size: int = 10000, data_queue_update_size: int = 1000, **kwargs): @@ -112,6 +118,9 @@ class BoxplotThreshold(Threshold): def __repr__(self): return f"[BoxplotThreshold, param is: {self.parameter}, train_size: {self.data_queue.maxsize}, update_size: {self.data_queue_update_size}]" + def __str__(self): + return "boxplot" + class NSigmaThreshold(Threshold): def __init__(self, n_sigma_parameter: float = 3.0, data_queue_size: int = 10000, data_queue_update_size: int = 1000, **kwargs): @@ -147,6 +156,9 @@ class NSigmaThreshold(Threshold): def __repr__(self): return f"[NSigmaThreshold, param is: {self.parameter}, train_size: {self.data_queue.maxsize}, update_size: {self.data_queue_update_size}]" + def __str__(self): + return "n_sigma" + class ThresholdType(Enum): AbsoluteThreshold = 0 diff --git a/src/python/sentryPlugins/ai_block_io/utils.py b/src/python/sentryPlugins/ai_block_io/utils.py index d6f4067..7d2390b 100644 --- a/src/python/sentryPlugins/ai_block_io/utils.py +++ b/src/python/sentryPlugins/ai_block_io/utils.py @@ -19,8 +19,6 @@ from .io_data import MetricName, IOData def get_threshold_type_enum(algorithm_type: str): - if algorithm_type.lower() == "absolute": - return ThresholdType.AbsoluteThreshold if algorithm_type.lower() == "boxplot": return ThresholdType.BoxplotThreshold if algorithm_type.lower() == "n_sigma": -- 2.23.0