1254 lines
55 KiB
Diff
1254 lines
55 KiB
Diff
From 24f8eddad364e83cfc5b6b1607462ffe524b59f1 Mon Sep 17 00:00:00 2001
|
||
From: =?UTF-8?q?=E8=B4=BA=E6=9C=89=E5=BF=97?= <1037617413@qq.com>
|
||
Date: Sat, 12 Oct 2024 21:59:18 +0800
|
||
Subject: [PATCH] add root cause analysis
|
||
|
||
---
|
||
config/plugins/ai_block_io.ini | 15 +-
|
||
.../sentryPlugins/ai_block_io/ai_block_io.py | 133 +++--
|
||
.../ai_block_io/config_parser.py | 465 +++++++++++-------
|
||
.../sentryPlugins/ai_block_io/data_access.py | 1 +
|
||
.../sentryPlugins/ai_block_io/detector.py | 54 +-
|
||
.../sentryPlugins/ai_block_io/io_data.py | 32 +-
|
||
.../ai_block_io/sliding_window.py | 57 ++-
|
||
src/python/sentryPlugins/ai_block_io/utils.py | 44 +-
|
||
8 files changed, 491 insertions(+), 310 deletions(-)
|
||
|
||
diff --git a/config/plugins/ai_block_io.ini b/config/plugins/ai_block_io.ini
|
||
index a814d52..422cfa3 100644
|
||
--- a/config/plugins/ai_block_io.ini
|
||
+++ b/config/plugins/ai_block_io.ini
|
||
@@ -2,7 +2,6 @@
|
||
level=info
|
||
|
||
[common]
|
||
-absolute_threshold=40
|
||
slow_io_detect_frequency=1
|
||
disk=default
|
||
stage=bio
|
||
@@ -18,4 +17,16 @@ n_sigma_parameter=3
|
||
[sliding_window]
|
||
sliding_window_type=not_continuous
|
||
window_size=30
|
||
-window_minimum_threshold=6
|
||
\ No newline at end of file
|
||
+window_minimum_threshold=6
|
||
+
|
||
+[latency_sata_ssd]
|
||
+read_tot_lim=50000
|
||
+write_tot_lim=50000
|
||
+
|
||
+[latency_nvme_ssd]
|
||
+read_tot_lim=500
|
||
+write_tot_lim=500
|
||
+
|
||
+[latency_sata_hdd]
|
||
+read_tot_lim=50000
|
||
+write_tot_lim=50000
|
||
\ No newline at end of file
|
||
diff --git a/src/python/sentryPlugins/ai_block_io/ai_block_io.py b/src/python/sentryPlugins/ai_block_io/ai_block_io.py
|
||
index e1052ec..dd661a1 100644
|
||
--- a/src/python/sentryPlugins/ai_block_io/ai_block_io.py
|
||
+++ b/src/python/sentryPlugins/ai_block_io/ai_block_io.py
|
||
@@ -12,13 +12,18 @@
|
||
import time
|
||
import signal
|
||
import logging
|
||
+from collections import defaultdict
|
||
|
||
from .detector import Detector, DiskDetector
|
||
-from .threshold import ThresholdFactory, AbsoluteThreshold
|
||
+from .threshold import ThresholdFactory
|
||
from .sliding_window import SlidingWindowFactory
|
||
from .utils import get_data_queue_size_and_update_size
|
||
from .config_parser import ConfigParser
|
||
-from .data_access import get_io_data_from_collect_plug, check_collect_valid
|
||
+from .data_access import (
|
||
+ get_io_data_from_collect_plug,
|
||
+ check_collect_valid,
|
||
+ get_disk_type,
|
||
+)
|
||
from .io_data import MetricName
|
||
from .alarm_report import Xalarm, Report
|
||
|
||
@@ -34,7 +39,7 @@ def sig_handler(signum, frame):
|
||
class SlowIODetection:
|
||
_config_parser = None
|
||
_disk_list = None
|
||
- _detector_name_list = {}
|
||
+ _detector_name_list = defaultdict(list)
|
||
_disk_detectors = {}
|
||
|
||
def __init__(self, config_parser: ConfigParser):
|
||
@@ -43,9 +48,13 @@ class SlowIODetection:
|
||
self.__init_detector()
|
||
|
||
def __init_detector_name_list(self):
|
||
- self._disk_list = check_collect_valid(self._config_parser.slow_io_detect_frequency)
|
||
+ self._disk_list = check_collect_valid(
|
||
+ self._config_parser.slow_io_detect_frequency
|
||
+ )
|
||
if self._disk_list is None:
|
||
- Report.report_pass("get available disk error, please check if the collector plug is enable. exiting...")
|
||
+ Report.report_pass(
|
||
+ "get available disk error, please check if the collector plug is enable. exiting..."
|
||
+ )
|
||
exit(1)
|
||
|
||
logging.info(f"ai_block_io plug has found disks: {self._disk_list}")
|
||
@@ -56,27 +65,45 @@ class SlowIODetection:
|
||
# 情况2:is not None and len = 0,则不启动任何磁盘检测
|
||
# 情况3:len != 0,则取交集
|
||
if disks is None:
|
||
- logging.warning("you not specify any disk or use default, so ai_block_io will enable all available disk.")
|
||
- for disk in self._disk_list:
|
||
- for stage in stages:
|
||
- for iotype in iotypes:
|
||
- if disk not in self._detector_name_list:
|
||
- self._detector_name_list[disk] = []
|
||
- self._detector_name_list[disk].append(MetricName(disk, stage, iotype, "latency"))
|
||
- else:
|
||
- for disk in disks:
|
||
- if disk in self._disk_list:
|
||
- for stage in stages:
|
||
- for iotype in iotypes:
|
||
- if disk not in self._detector_name_list:
|
||
- self._detector_name_list[disk] = []
|
||
- self._detector_name_list[disk].append(MetricName(disk, stage, iotype, "latency"))
|
||
- else:
|
||
- logging.warning("disk: [%s] not in available disk list, so it will be ignored.", disk)
|
||
- if len(self._detector_name_list) == 0:
|
||
- logging.critical("the disks to detection is empty, ai_block_io will exit.")
|
||
- Report.report_pass("the disks to detection is empty, ai_block_io will exit.")
|
||
- exit(1)
|
||
+ logging.warning(
|
||
+ "you not specify any disk or use default, so ai_block_io will enable all available disk."
|
||
+ )
|
||
+ for disk in self._disk_list:
|
||
+ if disks is not None:
|
||
+ if disk not in disks:
|
||
+ continue
|
||
+ disks.remove(disk)
|
||
+
|
||
+ disk_type_result = get_disk_type(disk)
|
||
+ if disk_type_result["ret"] == 0 and disk_type_result["message"] in (
|
||
+ '0',
|
||
+ '1',
|
||
+ '2',
|
||
+ ):
|
||
+ disk_type = int(disk_type_result["message"])
|
||
+ else:
|
||
+ logging.warning(
|
||
+ "%s get disk type error, return %s, so it will be ignored.",
|
||
+ disk,
|
||
+ disk_type_result,
|
||
+ )
|
||
+ continue
|
||
+ for stage in stages:
|
||
+ for iotype in iotypes:
|
||
+ self._detector_name_list[disk].append(
|
||
+ MetricName(disk, disk_type, stage, iotype, "latency")
|
||
+ )
|
||
+ if disks:
|
||
+ logging.warning(
|
||
+ "disks: %s not in available disk list, so they will be ignored.",
|
||
+ disks,
|
||
+ )
|
||
+ if not self._detector_name_list:
|
||
+ logging.critical("the disks to detection is empty, ai_block_io will exit.")
|
||
+ Report.report_pass(
|
||
+ "the disks to detection is empty, ai_block_io will exit."
|
||
+ )
|
||
+ exit(1)
|
||
|
||
def __init_detector(self):
|
||
train_data_duration, train_update_duration = (
|
||
@@ -88,26 +115,39 @@ class SlowIODetection:
|
||
train_data_duration, train_update_duration, slow_io_detection_frequency
|
||
)
|
||
sliding_window_type = self._config_parser.sliding_window_type
|
||
- window_size, window_threshold = (self._config_parser.get_window_size_and_window_minimum_threshold())
|
||
+ window_size, window_threshold = (
|
||
+ self._config_parser.get_window_size_and_window_minimum_threshold()
|
||
+ )
|
||
|
||
for disk, metric_name_list in self._detector_name_list.items():
|
||
- threshold = ThresholdFactory().get_threshold(
|
||
- threshold_type,
|
||
- boxplot_parameter=self._config_parser.boxplot_parameter,
|
||
- n_sigma_paramter=self._config_parser.n_sigma_parameter,
|
||
- data_queue_size=data_queue_size,
|
||
- data_queue_update_size=update_size,
|
||
- )
|
||
- sliding_window = SlidingWindowFactory().get_sliding_window(
|
||
- sliding_window_type,
|
||
- queue_length=window_size,
|
||
- threshold=window_threshold,
|
||
- )
|
||
disk_detector = DiskDetector(disk)
|
||
for metric_name in metric_name_list:
|
||
+ threshold = ThresholdFactory().get_threshold(
|
||
+ threshold_type,
|
||
+ boxplot_parameter=self._config_parser.boxplot_parameter,
|
||
+ n_sigma_paramter=self._config_parser.n_sigma_parameter,
|
||
+ data_queue_size=data_queue_size,
|
||
+ data_queue_update_size=update_size,
|
||
+ )
|
||
+ abs_threshold = self._config_parser.get_tot_lim(
|
||
+ metric_name.disk_type, metric_name.io_access_type_name
|
||
+ )
|
||
+ if abs_threshold is None:
|
||
+ logging.warning(
|
||
+ "disk %s, disk type %s, io type %s, get tot lim error, so it will be ignored.",
|
||
+ disk,
|
||
+ metric_name.disk_type,
|
||
+ metric_name.io_access_type_name,
|
||
+ )
|
||
+ sliding_window = SlidingWindowFactory().get_sliding_window(
|
||
+ sliding_window_type,
|
||
+ queue_length=window_size,
|
||
+ threshold=window_threshold,
|
||
+ abs_threshold=abs_threshold,
|
||
+ )
|
||
detector = Detector(metric_name, threshold, sliding_window)
|
||
disk_detector.add_detector(detector)
|
||
- logging.info(f'disk: [{disk}] add detector:\n [{disk_detector}]')
|
||
+ logging.info(f"disk: [{disk}] add detector:\n [{disk_detector}]")
|
||
self._disk_detectors[disk] = disk_detector
|
||
|
||
def launch(self):
|
||
@@ -138,14 +178,17 @@ class SlowIODetection:
|
||
logging.debug("step3. Report slow io event to sysSentry.")
|
||
for slow_io_event in slow_io_event_list:
|
||
metric_name: MetricName = slow_io_event[1]
|
||
+ window_info = slow_io_event[2]
|
||
+ root_cause = slow_io_event[3]
|
||
alarm_content = {
|
||
- "driver_name": f"{metric_name.get_disk_name()}",
|
||
- "reason": "disk_slow",
|
||
- "block_stack": f"{metric_name.get_stage_name()}",
|
||
- "io_type": f"{metric_name.get_io_access_type_name()}",
|
||
+ "driver_name": f"{metric_name.disk_name}",
|
||
+ "reason": root_cause,
|
||
+ "block_stack": f"{metric_name.stage_name}",
|
||
+ "io_type": f"{metric_name.io_access_type_name}",
|
||
"alarm_source": "ai_block_io",
|
||
"alarm_type": "latency",
|
||
- "details": f"current window is: {slow_io_event[2]}, threshold is: {slow_io_event[3]}.",
|
||
+ "details": f"disk type: {metric_name.disk_type}, current window: {window_info[1]}, "
|
||
+ f"ai threshold: {window_info[2]}, abs threshold: {window_info[3]}.",
|
||
}
|
||
Xalarm.major(alarm_content)
|
||
logging.warning(alarm_content)
|
||
diff --git a/src/python/sentryPlugins/ai_block_io/config_parser.py b/src/python/sentryPlugins/ai_block_io/config_parser.py
|
||
index a357766..3388cd4 100644
|
||
--- a/src/python/sentryPlugins/ai_block_io/config_parser.py
|
||
+++ b/src/python/sentryPlugins/ai_block_io/config_parser.py
|
||
@@ -20,59 +20,62 @@ from .utils import get_threshold_type_enum, get_sliding_window_type_enum, get_lo
|
||
|
||
LOG_FORMAT = "%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s"
|
||
|
||
-ALL_STAGE_LIST = ['throtl', 'wbt', 'gettag', 'plug', 'deadline', 'hctx', 'requeue', 'rq_driver', 'bio']
|
||
-ALL_IOTPYE_LIST = ['read', 'write']
|
||
+ALL_STAGE_LIST = [
|
||
+ "throtl",
|
||
+ "wbt",
|
||
+ "gettag",
|
||
+ "plug",
|
||
+ "deadline",
|
||
+ "hctx",
|
||
+ "requeue",
|
||
+ "rq_driver",
|
||
+ "bio",
|
||
+]
|
||
+ALL_IOTPYE_LIST = ["read", "write"]
|
||
+DISK_TYPE_MAP = {
|
||
+ 0: "nvme_ssd",
|
||
+ 1: "sata_ssd",
|
||
+ 2: "sata_hdd",
|
||
+}
|
||
|
||
|
||
def init_log_format(log_level: str):
|
||
logging.basicConfig(level=get_log_level(log_level.lower()), format=LOG_FORMAT)
|
||
if log_level.lower() not in ("info", "warning", "error", "debug"):
|
||
logging.warning(
|
||
- f"the log_level: {log_level} you set is invalid, use default value: info."
|
||
+ "the log_level: %s you set is invalid, use default value: info.", log_level
|
||
)
|
||
|
||
|
||
class ConfigParser:
|
||
- DEFAULT_ABSOLUTE_THRESHOLD = 40
|
||
- DEFAULT_SLOW_IO_DETECTION_FREQUENCY = 1
|
||
- DEFAULT_LOG_LEVEL = "info"
|
||
-
|
||
- DEFAULT_STAGE = 'throtl,wbt,gettag,plug,deadline,hctx,requeue,rq_driver,bio'
|
||
- DEFAULT_IOTYPE = 'read,write'
|
||
-
|
||
- DEFAULT_ALGORITHM_TYPE = "boxplot"
|
||
- DEFAULT_TRAIN_DATA_DURATION = 24
|
||
- DEFAULT_TRAIN_UPDATE_DURATION = 2
|
||
- DEFAULT_BOXPLOT_PARAMETER = 1.5
|
||
- DEFAULT_N_SIGMA_PARAMETER = 3
|
||
-
|
||
- DEFAULT_SLIDING_WINDOW_TYPE = "not_continuous"
|
||
- DEFAULT_WINDOW_SIZE = 30
|
||
- DEFAULT_WINDOW_MINIMUM_THRESHOLD = 6
|
||
+ DEFAULT_CONF = {
|
||
+ "log": {"level": "info"},
|
||
+ "common": {
|
||
+ "slow_io_detect_frequency": 1,
|
||
+ "disk": None,
|
||
+ "stage": "throtl,wbt,gettag,plug,deadline,hctx,requeue,rq_driver,bio",
|
||
+ "iotype": "read,write",
|
||
+ },
|
||
+ "algorithm": {
|
||
+ "train_data_duration": 24.0,
|
||
+ "train_update_duration": 2.0,
|
||
+ "algorithm_type": get_threshold_type_enum("boxplot"),
|
||
+ "boxplot_parameter": 1.5,
|
||
+ "n_sigma_parameter": 3.0,
|
||
+ },
|
||
+ "sliding_window": {
|
||
+ "sliding_window_type": get_sliding_window_type_enum("not_continuous"),
|
||
+ "window_size": 30,
|
||
+ "window_minimum_threshold": 6,
|
||
+ },
|
||
+ "latency_sata_ssd": {"read_tot_lim": 50000, "write_tot_lim": 50000},
|
||
+ "latency_nvme_ssd": {"read_tot_lim": 500, "write_tot_lim": 500},
|
||
+ "latency_sata_hdd": {"read_tot_lim": 50000, "write_tot_lim": 50000},
|
||
+ }
|
||
|
||
def __init__(self, config_file_name):
|
||
- self.__absolute_threshold = ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD
|
||
- self.__slow_io_detect_frequency = (
|
||
- ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY
|
||
- )
|
||
- self.__log_level = ConfigParser.DEFAULT_LOG_LEVEL
|
||
- self.__disks_to_detection = None
|
||
- self.__stage = ConfigParser.DEFAULT_STAGE
|
||
- self.__iotype = ConfigParser.DEFAULT_IOTYPE
|
||
-
|
||
- self.__algorithm_type = get_threshold_type_enum(
|
||
- ConfigParser.DEFAULT_ALGORITHM_TYPE
|
||
- )
|
||
- self.__train_data_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION
|
||
- self.__train_update_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION
|
||
- self.__boxplot_parameter = ConfigParser.DEFAULT_BOXPLOT_PARAMETER
|
||
- self.__n_sigma_parameter = ConfigParser.DEFAULT_N_SIGMA_PARAMETER
|
||
-
|
||
- self.__sliding_window_type = ConfigParser.DEFAULT_SLIDING_WINDOW_TYPE
|
||
- self.__window_size = ConfigParser.DEFAULT_WINDOW_SIZE
|
||
- self.__window_minimum_threshold = ConfigParser.DEFAULT_WINDOW_MINIMUM_THRESHOLD
|
||
-
|
||
- self.__config_file_name = config_file_name
|
||
+ self._conf = ConfigParser.DEFAULT_CONF
|
||
+ self._config_file_name = config_file_name
|
||
|
||
def _get_config_value(
|
||
self,
|
||
@@ -156,30 +159,21 @@ class ConfigParser:
|
||
|
||
return value
|
||
|
||
- def __read_absolute_threshold(self, items_common: dict):
|
||
- self.__absolute_threshold = self._get_config_value(
|
||
- items_common,
|
||
- "absolute_threshold",
|
||
- float,
|
||
- ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD,
|
||
- gt=0,
|
||
- )
|
||
-
|
||
- def __read__slow_io_detect_frequency(self, items_common: dict):
|
||
- self.__slow_io_detect_frequency = self._get_config_value(
|
||
+ def _read_slow_io_detect_frequency(self, items_common: dict):
|
||
+ self._conf["common"]["slow_io_detect_frequency"] = self._get_config_value(
|
||
items_common,
|
||
"slow_io_detect_frequency",
|
||
int,
|
||
- ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY,
|
||
+ self.DEFAULT_CONF["common"]["slow_io_detect_frequency"],
|
||
gt=0,
|
||
le=300,
|
||
)
|
||
|
||
- def __read__disks_to_detect(self, items_common: dict):
|
||
+ def _read_disks_to_detect(self, items_common: dict):
|
||
disks_to_detection = items_common.get("disk")
|
||
if disks_to_detection is None:
|
||
logging.warning("config of disk not found, the default value will be used.")
|
||
- self.__disks_to_detection = None
|
||
+ self._conf["common"]["disk"] = None
|
||
return
|
||
disks_to_detection = disks_to_detection.strip()
|
||
if not disks_to_detection:
|
||
@@ -189,40 +183,46 @@ class ConfigParser:
|
||
)
|
||
exit(1)
|
||
disk_list = disks_to_detection.split(",")
|
||
+ disk_list = [disk.strip() for disk in disk_list]
|
||
if len(disk_list) == 1 and disk_list[0] == "default":
|
||
- self.__disks_to_detection = None
|
||
+ self._conf["common"]["disk"] = None
|
||
return
|
||
- self.__disks_to_detection = disk_list
|
||
+ self._conf["common"]["disk"] = disk_list
|
||
|
||
- def __read__train_data_duration(self, items_algorithm: dict):
|
||
- self.__train_data_duration = self._get_config_value(
|
||
+ def _read_train_data_duration(self, items_algorithm: dict):
|
||
+ self._conf["common"]["train_data_duration"] = self._get_config_value(
|
||
items_algorithm,
|
||
"train_data_duration",
|
||
float,
|
||
- ConfigParser.DEFAULT_TRAIN_DATA_DURATION,
|
||
+ self.DEFAULT_CONF["algorithm"]["train_data_duration"],
|
||
gt=0,
|
||
le=720,
|
||
)
|
||
|
||
- def __read__train_update_duration(self, items_algorithm: dict):
|
||
- default_train_update_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION
|
||
- if default_train_update_duration > self.__train_data_duration:
|
||
- default_train_update_duration = self.__train_data_duration / 2
|
||
- self.__train_update_duration = self._get_config_value(
|
||
+ def _read_train_update_duration(self, items_algorithm: dict):
|
||
+ default_train_update_duration = self.DEFAULT_CONF["algorithm"][
|
||
+ "train_update_duration"
|
||
+ ]
|
||
+ if default_train_update_duration > self._conf["common"]["train_data_duration"]:
|
||
+ default_train_update_duration = (
|
||
+ self._conf["common"]["train_data_duration"] / 2
|
||
+ )
|
||
+ self._conf["common"]["train_update_duration"] = self._get_config_value(
|
||
items_algorithm,
|
||
"train_update_duration",
|
||
float,
|
||
default_train_update_duration,
|
||
gt=0,
|
||
- le=self.__train_data_duration,
|
||
+ le=self._conf["common"]["train_data_duration"],
|
||
)
|
||
|
||
- def __read__algorithm_type_and_parameter(self, items_algorithm: dict):
|
||
- algorithm_type = items_algorithm.get(
|
||
- "algorithm_type", ConfigParser.DEFAULT_ALGORITHM_TYPE
|
||
- )
|
||
- self.__algorithm_type = get_threshold_type_enum(algorithm_type)
|
||
- if self.__algorithm_type is None:
|
||
+ def _read_algorithm_type_and_parameter(self, items_algorithm: dict):
|
||
+ algorithm_type = items_algorithm.get("algorithm_type")
|
||
+ if algorithm_type is not None:
|
||
+ self._conf["algorithm"]["algorithm_type"] = get_threshold_type_enum(
|
||
+ algorithm_type
|
||
+ )
|
||
+ if self._conf["algorithm"]["algorithm_type"] is None:
|
||
logging.critical(
|
||
"the algorithm_type: %s you set is invalid. ai_block_io plug will exit.",
|
||
algorithm_type,
|
||
@@ -231,129 +231,175 @@ class ConfigParser:
|
||
f"the algorithm_type: {algorithm_type} you set is invalid. ai_block_io plug will exit."
|
||
)
|
||
exit(1)
|
||
-
|
||
- if self.__algorithm_type == ThresholdType.NSigmaThreshold:
|
||
- self.__n_sigma_parameter = self._get_config_value(
|
||
+ elif self._conf["algorithm"]["algorithm_type"] == ThresholdType.NSigmaThreshold:
|
||
+ self._conf["algorithm"]["n_sigma_parameter"] = self._get_config_value(
|
||
items_algorithm,
|
||
"n_sigma_parameter",
|
||
float,
|
||
- ConfigParser.DEFAULT_N_SIGMA_PARAMETER,
|
||
+ self.DEFAULT_CONF["algorithm"]["n_sigma_parameter"],
|
||
gt=0,
|
||
le=10,
|
||
)
|
||
- elif self.__algorithm_type == ThresholdType.BoxplotThreshold:
|
||
- self.__boxplot_parameter = self._get_config_value(
|
||
+ elif (
|
||
+ self._conf["algorithm"]["algorithm_type"] == ThresholdType.BoxplotThreshold
|
||
+ ):
|
||
+ self._conf["algorithm"]["boxplot_parameter"] = self._get_config_value(
|
||
items_algorithm,
|
||
"boxplot_parameter",
|
||
float,
|
||
- ConfigParser.DEFAULT_BOXPLOT_PARAMETER,
|
||
+ self.DEFAULT_CONF["algorithm"]["boxplot_parameter"],
|
||
gt=0,
|
||
le=10,
|
||
)
|
||
|
||
- def __read__stage(self, items_algorithm: dict):
|
||
- stage_str = items_algorithm.get('stage', ConfigParser.DEFAULT_STAGE)
|
||
- stage_list = stage_str.split(',')
|
||
- if len(stage_list) == 1 and stage_list[0] == '':
|
||
- logging.critical('stage value not allow is empty, exiting...')
|
||
+ def _read_stage(self, items_algorithm: dict):
|
||
+ stage_str = items_algorithm.get(
|
||
+ "stage", self.DEFAULT_CONF["common"]["stage"]
|
||
+ ).strip()
|
||
+ stage_list = stage_str.split(",")
|
||
+ stage_list = [stage.strip() for stage in stage_list]
|
||
+ if len(stage_list) == 1 and stage_list[0] == "":
|
||
+ logging.critical("stage value not allow is empty, exiting...")
|
||
exit(1)
|
||
- if len(stage_list) == 1 and stage_list[0] == 'default':
|
||
- logging.warning(f'stage will enable default value: {ConfigParser.DEFAULT_STAGE}')
|
||
- self.__stage = ALL_STAGE_LIST
|
||
+ if len(stage_list) == 1 and stage_list[0] == "default":
|
||
+ logging.warning(
|
||
+ "stage will enable default value: %s",
|
||
+ self.DEFAULT_CONF["common"]["stage"],
|
||
+ )
|
||
+ self._conf["common"]["stage"] = ALL_STAGE_LIST
|
||
return
|
||
for stage in stage_list:
|
||
if stage not in ALL_STAGE_LIST:
|
||
- logging.critical(f'stage: {stage} is not valid stage, ai_block_io will exit...')
|
||
+ logging.critical(
|
||
+ "stage: %s is not valid stage, ai_block_io will exit...", stage
|
||
+ )
|
||
exit(1)
|
||
dup_stage_list = set(stage_list)
|
||
- if 'bio' not in dup_stage_list:
|
||
- logging.critical('stage must contains bio stage, exiting...')
|
||
+ if "bio" not in dup_stage_list:
|
||
+ logging.critical("stage must contains bio stage, exiting...")
|
||
exit(1)
|
||
- self.__stage = dup_stage_list
|
||
-
|
||
- def __read__iotype(self, items_algorithm: dict):
|
||
- iotype_str = items_algorithm.get('iotype', ConfigParser.DEFAULT_IOTYPE)
|
||
- iotype_list = iotype_str.split(',')
|
||
- if len(iotype_list) == 1 and iotype_list[0] == '':
|
||
- logging.critical('iotype value not allow is empty, exiting...')
|
||
+ self._conf["common"]["stage"] = dup_stage_list
|
||
+
|
||
+ def _read_iotype(self, items_algorithm: dict):
|
||
+ iotype_str = items_algorithm.get(
|
||
+ "iotype", self.DEFAULT_CONF["common"]["iotype"]
|
||
+ ).strip()
|
||
+ iotype_list = iotype_str.split(",")
|
||
+ iotype_list = [iotype.strip() for iotype in iotype_list]
|
||
+ if len(iotype_list) == 1 and iotype_list[0] == "":
|
||
+ logging.critical("iotype value not allow is empty, exiting...")
|
||
exit(1)
|
||
- if len(iotype_list) == 1 and iotype_list[0] == 'default':
|
||
- logging.warning(f'iotype will enable default value: {ConfigParser.DEFAULT_IOTYPE}')
|
||
- self.__iotype = ALL_IOTPYE_LIST
|
||
+ if len(iotype_list) == 1 and iotype_list[0] == "default":
|
||
+ logging.warning(
|
||
+ "iotype will enable default value: %s",
|
||
+ self.DEFAULT_CONF["common"]["iotype"],
|
||
+ )
|
||
+ self._conf["common"]["iotype"] = ALL_IOTPYE_LIST
|
||
return
|
||
for iotype in iotype_list:
|
||
if iotype not in ALL_IOTPYE_LIST:
|
||
- logging.critical(f'iotype: {iotype} is not valid iotype, ai_block_io will exit...')
|
||
+ logging.critical(
|
||
+ "iotype: %s is not valid iotype, ai_block_io will exit...", iotype
|
||
+ )
|
||
exit(1)
|
||
dup_iotype_list = set(iotype_list)
|
||
- self.__iotype = dup_iotype_list
|
||
+ self._conf["common"]["iotype"] = dup_iotype_list
|
||
+
|
||
+ def _read_sliding_window_type(self, items_sliding_window: dict):
|
||
+ sliding_window_type = items_sliding_window.get("sliding_window_type")
|
||
+ if sliding_window_type is not None:
|
||
+ self._conf["sliding_window"]["sliding_window_type"] = (
|
||
+ get_sliding_window_type_enum(sliding_window_type)
|
||
+ )
|
||
+ if self._conf["sliding_window"]["sliding_window_type"] is None:
|
||
+ logging.critical(
|
||
+ "the sliding_window_type: %s you set is invalid. ai_block_io plug will exit.",
|
||
+ sliding_window_type,
|
||
+ )
|
||
+ Report.report_pass(
|
||
+ f"the sliding_window_type: {sliding_window_type} you set is invalid. ai_block_io plug will exit."
|
||
+ )
|
||
+ exit(1)
|
||
|
||
- def __read__window_size(self, items_sliding_window: dict):
|
||
- self.__window_size = self._get_config_value(
|
||
+ def _read_window_size(self, items_sliding_window: dict):
|
||
+ self._conf["sliding_window"]["window_size"] = self._get_config_value(
|
||
items_sliding_window,
|
||
"window_size",
|
||
int,
|
||
- ConfigParser.DEFAULT_WINDOW_SIZE,
|
||
+ self.DEFAULT_CONF["sliding_window"]["window_size"],
|
||
gt=0,
|
||
le=3600,
|
||
)
|
||
|
||
- def __read__window_minimum_threshold(self, items_sliding_window: dict):
|
||
- default_window_minimum_threshold = ConfigParser.DEFAULT_WINDOW_MINIMUM_THRESHOLD
|
||
- if default_window_minimum_threshold > self.__window_size:
|
||
- default_window_minimum_threshold = self.__window_size / 2
|
||
- self.__window_minimum_threshold = self._get_config_value(
|
||
- items_sliding_window,
|
||
- "window_minimum_threshold",
|
||
- int,
|
||
- default_window_minimum_threshold,
|
||
- gt=0,
|
||
- le=self.__window_size,
|
||
+ def _read_window_minimum_threshold(self, items_sliding_window: dict):
|
||
+ default_window_minimum_threshold = self.DEFAULT_CONF["sliding_window"][
|
||
+ "window_minimum_threshold"
|
||
+ ]
|
||
+ if (
|
||
+ default_window_minimum_threshold
|
||
+ > self._conf["sliding_window"]["window_size"]
|
||
+ ):
|
||
+ default_window_minimum_threshold = (
|
||
+ self._conf["sliding_window"]["window_size"] / 2
|
||
+ )
|
||
+ self._conf["sliding_window"]["window_minimum_threshold"] = (
|
||
+ self._get_config_value(
|
||
+ items_sliding_window,
|
||
+ "window_minimum_threshold",
|
||
+ int,
|
||
+ default_window_minimum_threshold,
|
||
+ gt=0,
|
||
+ le=self._conf["sliding_window"]["window_size"],
|
||
+ )
|
||
)
|
||
|
||
def read_config_from_file(self):
|
||
- if not os.path.exists(self.__config_file_name):
|
||
- init_log_format(self.__log_level)
|
||
+ if not os.path.exists(self._config_file_name):
|
||
+ init_log_format(self._conf["log"]["level"])
|
||
logging.critical(
|
||
"config file %s not found, ai_block_io plug will exit.",
|
||
- self.__config_file_name,
|
||
+ self._config_file_name,
|
||
)
|
||
Report.report_pass(
|
||
- f"config file {self.__config_file_name} not found, ai_block_io plug will exit."
|
||
+ f"config file {self._config_file_name} not found, ai_block_io plug will exit."
|
||
)
|
||
exit(1)
|
||
|
||
con = configparser.ConfigParser()
|
||
try:
|
||
- con.read(self.__config_file_name, encoding="utf-8")
|
||
+ con.read(self._config_file_name, encoding="utf-8")
|
||
except configparser.Error as e:
|
||
- init_log_format(self.__log_level)
|
||
+ init_log_format(self._conf["log"]["level"])
|
||
logging.critical(
|
||
- f"config file read error: %s, ai_block_io plug will exit.", e
|
||
+ "config file read error: %s, ai_block_io plug will exit.", e
|
||
)
|
||
Report.report_pass(
|
||
f"config file read error: {e}, ai_block_io plug will exit."
|
||
)
|
||
exit(1)
|
||
|
||
- if con.has_section('log'):
|
||
- items_log = dict(con.items('log'))
|
||
+ if con.has_section("log"):
|
||
+ items_log = dict(con.items("log"))
|
||
# 情况一:没有log,则使用默认值
|
||
# 情况二:有log,值为空或异常,使用默认值
|
||
# 情况三:有log,值正常,则使用该值
|
||
- self.__log_level = items_log.get('level', ConfigParser.DEFAULT_LOG_LEVEL)
|
||
- init_log_format(self.__log_level)
|
||
+ self._conf["log"]["level"] = items_log.get(
|
||
+ "level", self.DEFAULT_CONF["log"]["level"]
|
||
+ )
|
||
+ init_log_format(self._conf["log"]["level"])
|
||
else:
|
||
- init_log_format(self.__log_level)
|
||
- logging.warning(f"log section parameter not found, it will be set to default value.")
|
||
+ init_log_format(self._conf["log"]["level"])
|
||
+ logging.warning(
|
||
+ "log section parameter not found, it will be set to default value."
|
||
+ )
|
||
|
||
if con.has_section("common"):
|
||
items_common = dict(con.items("common"))
|
||
- self.__read_absolute_threshold(items_common)
|
||
- self.__read__slow_io_detect_frequency(items_common)
|
||
- self.__read__disks_to_detect(items_common)
|
||
- self.__read__stage(items_common)
|
||
- self.__read__iotype(items_common)
|
||
+
|
||
+ self._read_slow_io_detect_frequency(items_common)
|
||
+ self._read_disks_to_detect(items_common)
|
||
+ self._read_stage(items_common)
|
||
+ self._read_iotype(items_common)
|
||
else:
|
||
logging.warning(
|
||
"common section parameter not found, it will be set to default value."
|
||
@@ -361,9 +407,9 @@ class ConfigParser:
|
||
|
||
if con.has_section("algorithm"):
|
||
items_algorithm = dict(con.items("algorithm"))
|
||
- self.__read__train_data_duration(items_algorithm)
|
||
- self.__read__train_update_duration(items_algorithm)
|
||
- self.__read__algorithm_type_and_parameter(items_algorithm)
|
||
+ self._read_train_data_duration(items_algorithm)
|
||
+ self._read_train_update_duration(items_algorithm)
|
||
+ self._read_algorithm_type_and_parameter(items_algorithm)
|
||
else:
|
||
logging.warning(
|
||
"algorithm section parameter not found, it will be set to default value."
|
||
@@ -371,101 +417,162 @@ class ConfigParser:
|
||
|
||
if con.has_section("sliding_window"):
|
||
items_sliding_window = dict(con.items("sliding_window"))
|
||
- sliding_window_type = items_sliding_window.get(
|
||
- "sliding_window_type", ConfigParser.DEFAULT_SLIDING_WINDOW_TYPE
|
||
+
|
||
+ self._read_window_size(items_sliding_window)
|
||
+ self._read_window_minimum_threshold(items_sliding_window)
|
||
+ else:
|
||
+ logging.warning(
|
||
+ "sliding_window section parameter not found, it will be set to default value."
|
||
+ )
|
||
+
|
||
+ if con.has_section("latency_sata_ssd"):
|
||
+ items_latency_sata_ssd = dict(con.items("latency_sata_ssd"))
|
||
+ self._conf["latency_sata_ssd"]["read_tot_lim"] = self._get_config_value(
|
||
+ items_latency_sata_ssd,
|
||
+ "read_tot_lim",
|
||
+ int,
|
||
+ self.DEFAULT_CONF["latency_sata_ssd"]["read_tot_lim"],
|
||
+ gt=0,
|
||
)
|
||
- self.__sliding_window_type = get_sliding_window_type_enum(
|
||
- sliding_window_type
|
||
+ self._conf["latency_sata_ssd"]["write_tot_lim"] = self._get_config_value(
|
||
+ items_latency_sata_ssd,
|
||
+ "write_tot_lim",
|
||
+ int,
|
||
+ self.DEFAULT_CONF["latency_sata_ssd"]["write_tot_lim"],
|
||
+ gt=0,
|
||
)
|
||
- self.__read__window_size(items_sliding_window)
|
||
- self.__read__window_minimum_threshold(items_sliding_window)
|
||
else:
|
||
logging.warning(
|
||
- "sliding_window section parameter not found, it will be set to default value."
|
||
+ "latency_sata_ssd section parameter not found, it will be set to default value."
|
||
+ )
|
||
+ if con.has_section("latency_nvme_ssd"):
|
||
+ items_latency_nvme_ssd = dict(con.items("latency_nvme_ssd"))
|
||
+ self._conf["latency_nvme_ssd"]["read_tot_lim"] = self._get_config_value(
|
||
+ items_latency_nvme_ssd,
|
||
+ "read_tot_lim",
|
||
+ int,
|
||
+ self.DEFAULT_CONF["latency_nvme_ssd"]["read_tot_lim"],
|
||
+ gt=0,
|
||
+ )
|
||
+ self._conf["latency_nvme_ssd"]["write_tot_lim"] = self._get_config_value(
|
||
+ items_latency_nvme_ssd,
|
||
+ "write_tot_lim",
|
||
+ int,
|
||
+ self.DEFAULT_CONF["latency_nvme_ssd"]["write_tot_lim"],
|
||
+ gt=0,
|
||
+ )
|
||
+ else:
|
||
+ logging.warning(
|
||
+ "latency_nvme_ssd section parameter not found, it will be set to default value."
|
||
+ )
|
||
+ if con.has_section("latency_sata_hdd"):
|
||
+ items_latency_sata_hdd = dict(con.items("latency_sata_hdd"))
|
||
+ self._conf["latency_sata_hdd"]["read_tot_lim"] = self._get_config_value(
|
||
+ items_latency_sata_hdd,
|
||
+ "read_tot_lim",
|
||
+ int,
|
||
+ self.DEFAULT_CONF["latency_sata_hdd"]["read_tot_lim"],
|
||
+ gt=0,
|
||
+ )
|
||
+ self._conf["latency_sata_hdd"]["write_tot_lim"] = self._get_config_value(
|
||
+ items_latency_sata_hdd,
|
||
+ "write_tot_lim",
|
||
+ int,
|
||
+ self.DEFAULT_CONF["latency_sata_hdd"]["write_tot_lim"],
|
||
+ gt=0,
|
||
+ )
|
||
+ else:
|
||
+ logging.warning(
|
||
+ "latency_sata_hdd section parameter not found, it will be set to default value."
|
||
)
|
||
|
||
self.__print_all_config_value()
|
||
|
||
- def __repr__(self):
|
||
- config_str = {
|
||
- 'log.level': self.__log_level,
|
||
- 'common.absolute_threshold': self.__absolute_threshold,
|
||
- 'common.slow_io_detect_frequency': self.__slow_io_detect_frequency,
|
||
- 'common.disk': self.__disks_to_detection,
|
||
- 'common.stage': self.__stage,
|
||
- 'common.iotype': self.__iotype,
|
||
- 'algorithm.train_data_duration': self.__train_data_duration,
|
||
- 'algorithm.train_update_duration': self.__train_update_duration,
|
||
- 'algorithm.algorithm_type': self.__algorithm_type,
|
||
- 'algorithm.boxplot_parameter': self.__boxplot_parameter,
|
||
- 'algorithm.n_sigma_parameter': self.__n_sigma_parameter,
|
||
- 'sliding_window.sliding_window_type': self.__sliding_window_type,
|
||
- 'sliding_window.window_size': self.__window_size,
|
||
- 'sliding_window.window_minimum_threshold': self.__window_minimum_threshold
|
||
- }
|
||
- return str(config_str)
|
||
+ def __repr__(self) -> str:
|
||
+ return str(self._conf)
|
||
+
|
||
+ def __str__(self) -> str:
|
||
+ return str(self._conf)
|
||
|
||
def __print_all_config_value(self):
|
||
- logging.info(f"all config is follow:\n {self}")
|
||
+ logging.info("all config is follow:\n %s", self)
|
||
+
|
||
+ def get_tot_lim(self, disk_type, io_type):
|
||
+ if io_type == "read":
|
||
+ return self._conf.get(
|
||
+ f"latency_{DISK_TYPE_MAP.get(disk_type, '')}", {}
|
||
+ ).get("read_tot_lim", None)
|
||
+ elif io_type == "write":
|
||
+ return self._conf.get(
|
||
+ f"latency_{DISK_TYPE_MAP.get(disk_type, '')}", {}
|
||
+ ).get("write_tot_lim", None)
|
||
+ else:
|
||
+ return None
|
||
|
||
def get_train_data_duration_and_train_update_duration(self):
|
||
- return self.__train_data_duration, self.__train_update_duration
|
||
+ return (
|
||
+ self._conf["common"]["train_data_duration"],
|
||
+ self._conf["common"]["train_update_duration"],
|
||
+ )
|
||
|
||
def get_window_size_and_window_minimum_threshold(self):
|
||
- return self.__window_size, self.__window_minimum_threshold
|
||
+ return (
|
||
+ self._conf["sliding_window"]["window_size"],
|
||
+ self._conf["sliding_window"]["window_minimum_threshold"],
|
||
+ )
|
||
|
||
@property
|
||
def slow_io_detect_frequency(self):
|
||
- return self.__slow_io_detect_frequency
|
||
+ return self._conf["common"]["slow_io_detect_frequency"]
|
||
|
||
@property
|
||
def algorithm_type(self):
|
||
- return self.__algorithm_type
|
||
+ return self._conf["algorithm"]["algorithm_type"]
|
||
|
||
@property
|
||
def sliding_window_type(self):
|
||
- return self.__sliding_window_type
|
||
+ return self._conf["sliding_window"]["sliding_window_type"]
|
||
|
||
@property
|
||
def train_data_duration(self):
|
||
- return self.__train_data_duration
|
||
+ return self._conf["common"]["train_data_duration"]
|
||
|
||
@property
|
||
def train_update_duration(self):
|
||
- return self.__train_update_duration
|
||
+ return self._conf["common"]["train_update_duration"]
|
||
|
||
@property
|
||
def window_size(self):
|
||
- return self.__window_size
|
||
+ return self._conf["sliding_window"]["window_size"]
|
||
|
||
@property
|
||
def window_minimum_threshold(self):
|
||
- return self.__window_minimum_threshold
|
||
+ return self._conf["sliding_window"]["window_minimum_threshold"]
|
||
|
||
@property
|
||
def absolute_threshold(self):
|
||
- return self.__absolute_threshold
|
||
+ return self._conf["common"]["absolute_threshold"]
|
||
|
||
@property
|
||
def log_level(self):
|
||
- return self.__log_level
|
||
+ return self._conf["log"]["level"]
|
||
|
||
@property
|
||
def disks_to_detection(self):
|
||
- return self.__disks_to_detection
|
||
+ return self._conf["common"]["disk"]
|
||
|
||
@property
|
||
def stage(self):
|
||
- return self.__stage
|
||
+ return self._conf["common"]["stage"]
|
||
|
||
@property
|
||
def iotype(self):
|
||
- return self.__iotype
|
||
+ return self._conf["common"]["iotype"]
|
||
|
||
@property
|
||
def boxplot_parameter(self):
|
||
- return self.__boxplot_parameter
|
||
+ return self._conf["algorithm"]["boxplot_parameter"]
|
||
|
||
@property
|
||
def n_sigma_parameter(self):
|
||
- return self.__n_sigma_parameter
|
||
+ return self._conf["algorithm"]["n_sigma_parameter"]
|
||
diff --git a/src/python/sentryPlugins/ai_block_io/data_access.py b/src/python/sentryPlugins/ai_block_io/data_access.py
|
||
index ed997e6..1bc5ed8 100644
|
||
--- a/src/python/sentryPlugins/ai_block_io/data_access.py
|
||
+++ b/src/python/sentryPlugins/ai_block_io/data_access.py
|
||
@@ -16,6 +16,7 @@ from sentryCollector.collect_plugin import (
|
||
Result_Messages,
|
||
get_io_data,
|
||
is_iocollect_valid,
|
||
+ get_disk_type
|
||
)
|
||
|
||
|
||
diff --git a/src/python/sentryPlugins/ai_block_io/detector.py b/src/python/sentryPlugins/ai_block_io/detector.py
|
||
index e710ddd..87bd1dd 100644
|
||
--- a/src/python/sentryPlugins/ai_block_io/detector.py
|
||
+++ b/src/python/sentryPlugins/ai_block_io/detector.py
|
||
@@ -17,9 +17,6 @@ from .utils import get_metric_value_from_io_data_dict_by_metric_name
|
||
|
||
|
||
class Detector:
|
||
- _metric_name: MetricName = None
|
||
- _threshold: Threshold = None
|
||
- _slidingWindow: SlidingWindow = None
|
||
|
||
def __init__(self, metric_name: MetricName, threshold: Threshold, sliding_window: SlidingWindow):
|
||
self._metric_name = metric_name
|
||
@@ -40,18 +37,24 @@ class Detector:
|
||
metric_value = get_metric_value_from_io_data_dict_by_metric_name(io_data_dict_with_disk_name, self._metric_name)
|
||
if metric_value is None:
|
||
logging.debug('not found metric value, so return None.')
|
||
- return False, None, None
|
||
+ return (False, False), None, None, None
|
||
logging.debug(f'input metric value: {str(metric_value)}')
|
||
self._threshold.push_latest_data_to_queue(metric_value)
|
||
detection_result = self._slidingWindow.is_slow_io_event(metric_value)
|
||
- logging.debug(f'Detection result: {str(detection_result)}')
|
||
+ # 检测到慢周期,由Detector负责打印info级别日志
|
||
+ if detection_result[0][1]:
|
||
+ logging.info(f'[abnormal period happen]: disk info: {self._metric_name}, window: {detection_result[1]}, '
|
||
+ f'current value: {metric_value}, ai threshold: {detection_result[2]}, '
|
||
+ f'absolute threshold: {detection_result[3]}')
|
||
+ else:
|
||
+ logging.debug(f'Detection result: {str(detection_result)}')
|
||
logging.debug(f'exit Detector: {self}')
|
||
return detection_result
|
||
|
||
def __repr__(self):
|
||
- return (f'disk_name: {self._metric_name.get_disk_name()}, stage_name: {self._metric_name.get_stage_name()},'
|
||
- f' io_type_name: {self._metric_name.get_io_access_type_name()},'
|
||
- f' metric_name: {self._metric_name.get_metric_name()}, threshold_type: {self._threshold},'
|
||
+ return (f'disk_name: {self._metric_name.disk_name}, stage_name: {self._metric_name.stage_name},'
|
||
+ f' io_type_name: {self._metric_name.io_access_type_name},'
|
||
+ f' metric_name: {self._metric_name.metric_name}, threshold_type: {self._threshold},'
|
||
f' sliding_window_type: {self._slidingWindow}')
|
||
|
||
|
||
@@ -65,13 +68,38 @@ class DiskDetector:
|
||
self._detector_list.append(detector)
|
||
|
||
def is_slow_io_event(self, io_data_dict_with_disk_name: dict):
|
||
- # 只有bio阶段发生异常,就认为发生了慢IO事件
|
||
- # todo:根因诊断
|
||
+ """
|
||
+ 根因诊断逻辑:只有bio阶段发生异常,才认为发生了慢IO事件,即bio阶段异常是慢IO事件的必要条件
|
||
+ 情况一:bio异常,rq_driver也异常,则慢盘
|
||
+ 情况二:bio异常,rq_driver无异常,且有内核IO栈任意阶段异常,则IO栈异常
|
||
+ 情况三:bio异常,rq_driver无异常,且无内核IO栈任意阶段异常,则IO压力大
|
||
+ 情况四:bio异常,则UNKNOWN
|
||
+ """
|
||
+ diagnosis_info = {"bio": [], "rq_driver": [], "io_stage": []}
|
||
for detector in self._detector_list:
|
||
+ # result返回内容:(是否检测到慢IO,是否检测到慢周期)、窗口、ai阈值、绝对阈值
|
||
+ # 示例: (False, False), self._io_data_queue, self._ai_threshold, self._abs_threshold
|
||
result = detector.is_slow_io_event(io_data_dict_with_disk_name)
|
||
- if result[0] and detector.get_metric_name().get_stage_name() == 'bio':
|
||
- return result[0], detector.get_metric_name(), result[1], result[2]
|
||
- return False, None, None, None
|
||
+ if result[0][0]:
|
||
+ if detector.get_metric_name().stage_name == "bio":
|
||
+ diagnosis_info["bio"].append((detector.get_metric_name(), result))
|
||
+ elif detector.get_metric_name().stage_name == "rq_driver":
|
||
+ diagnosis_info["rq_driver"].append((detector.get_metric_name(), result))
|
||
+ else:
|
||
+ diagnosis_info["io_stage"].append((detector.get_metric_name(), result))
|
||
+
|
||
+ # 返回内容:(1)是否检测到慢IO事件、(2)MetricName、(3)滑动窗口及阈值、(4)慢IO事件根因
|
||
+ root_cause = None
|
||
+ if len(diagnosis_info["bio"]) == 0:
|
||
+ return False, None, None, None
|
||
+ elif len(diagnosis_info["rq_driver"]) != 0:
|
||
+ root_cause = "[Root Cause:disk slow]"
|
||
+ elif len(diagnosis_info["io_stage"]) != 0:
|
||
+ stage = diagnosis_info["io_stage"][0][1].get_stage_name()
|
||
+ root_cause = f"[Root Cause:io stage slow, stage: {stage}]"
|
||
+ if root_cause is None:
|
||
+ root_cause = "[Root Cause:high io pressure]"
|
||
+ return True, diagnosis_info["bio"][0][0], diagnosis_info["bio"][0][1], root_cause
|
||
|
||
def __repr__(self):
|
||
msg = f'disk: {self._disk_name}, '
|
||
diff --git a/src/python/sentryPlugins/ai_block_io/io_data.py b/src/python/sentryPlugins/ai_block_io/io_data.py
|
||
index 0e17051..d341b55 100644
|
||
--- a/src/python/sentryPlugins/ai_block_io/io_data.py
|
||
+++ b/src/python/sentryPlugins/ai_block_io/io_data.py
|
||
@@ -45,30 +45,10 @@ class IOData:
|
||
time_stamp: float = field(default_factory=lambda: datetime.now().timestamp())
|
||
|
||
|
||
+@dataclass(frozen=True)
|
||
class MetricName:
|
||
- _disk_name: str = None
|
||
- _stage_name: str = None
|
||
- _io_access_type_name: str = None
|
||
- _metric_name: str = None
|
||
-
|
||
- def __init__(self, disk_name: str, stage_name: str, io_access_type_name: str, metric_name: str):
|
||
- self._disk_name = disk_name
|
||
- self._stage_name = stage_name
|
||
- self._io_access_type_name = io_access_type_name
|
||
- self._metric_name = metric_name
|
||
-
|
||
- def get_disk_name(self):
|
||
- return self._disk_name
|
||
-
|
||
- def get_stage_name(self):
|
||
- return self._stage_name
|
||
-
|
||
- def get_io_access_type_name(self):
|
||
- return self._io_access_type_name
|
||
-
|
||
- def get_metric_name(self):
|
||
- return self._metric_name
|
||
-
|
||
- def __repr__(self):
|
||
- return (f'disk: {self._disk_name}, stage: {self._stage_name}, io_access_type: {self._io_access_type_name},'
|
||
- f'metric: {self._metric_name}')
|
||
+ disk_name: str
|
||
+ disk_type: str
|
||
+ stage_name: str
|
||
+ io_access_type_name: str
|
||
+ metric_name: str
|
||
diff --git a/src/python/sentryPlugins/ai_block_io/sliding_window.py b/src/python/sentryPlugins/ai_block_io/sliding_window.py
|
||
index 89191e5..d7c402a 100644
|
||
--- a/src/python/sentryPlugins/ai_block_io/sliding_window.py
|
||
+++ b/src/python/sentryPlugins/ai_block_io/sliding_window.py
|
||
@@ -21,15 +21,11 @@ class SlidingWindowType(Enum):
|
||
|
||
|
||
class SlidingWindow:
|
||
- _ai_threshold = None
|
||
- _queue_length = None
|
||
- _queue_threshold = None
|
||
- _io_data_queue: list = None
|
||
- _io_data_queue_abnormal_tag: list = None
|
||
-
|
||
- def __init__(self, queue_length: int, threshold: int):
|
||
+ def __init__(self, queue_length: int, threshold: int, abs_threshold: int = None):
|
||
self._queue_length = queue_length
|
||
self._queue_threshold = threshold
|
||
+ self._ai_threshold = None
|
||
+ self._abs_threshold = abs_threshold
|
||
self._io_data_queue = []
|
||
self._io_data_queue_abnormal_tag = []
|
||
|
||
@@ -38,7 +34,12 @@ class SlidingWindow:
|
||
self._io_data_queue.pop(0)
|
||
self._io_data_queue_abnormal_tag.pop(0)
|
||
self._io_data_queue.append(data)
|
||
- self._io_data_queue_abnormal_tag.append(data >= self._ai_threshold if self._ai_threshold is not None else False)
|
||
+ tag = False
|
||
+ if ((self._ai_threshold is not None and data >= self._ai_threshold) or
|
||
+ (self._abs_threshold is not None and data >= self._abs_threshold)):
|
||
+ tag = True
|
||
+ self._io_data_queue_abnormal_tag.append(tag)
|
||
+ return tag
|
||
|
||
def update(self, threshold):
|
||
if self._ai_threshold == threshold:
|
||
@@ -49,7 +50,7 @@ class SlidingWindow:
|
||
self._io_data_queue_abnormal_tag.append(data >= self._ai_threshold)
|
||
|
||
def is_slow_io_event(self, data):
|
||
- return False, None, None
|
||
+ return False, None, None, None
|
||
|
||
def __repr__(self):
|
||
return "[SlidingWindow]"
|
||
@@ -57,12 +58,13 @@ class SlidingWindow:
|
||
|
||
class NotContinuousSlidingWindow(SlidingWindow):
|
||
def is_slow_io_event(self, data):
|
||
- super().push(data)
|
||
- if len(self._io_data_queue) < self._queue_length or self._ai_threshold is None:
|
||
- return False, self._io_data_queue, self._ai_threshold
|
||
+ is_abnormal_period = super().push(data)
|
||
+ is_slow_io_event = False
|
||
+ if len(self._io_data_queue) < self._queue_length or (self._ai_threshold is None and self._abs_threshold is None):
|
||
+ is_slow_io_event = False
|
||
if self._io_data_queue_abnormal_tag.count(True) >= self._queue_threshold:
|
||
- return True, self._io_data_queue, self._ai_threshold
|
||
- return False, self._io_data_queue, self._ai_threshold
|
||
+ is_slow_io_event = True
|
||
+ return (is_slow_io_event, is_abnormal_period), self._io_data_queue, self._ai_threshold, self._abs_threshold
|
||
|
||
def __repr__(self):
|
||
return f"[NotContinuousSlidingWindow, window size: {self._queue_length}, threshold: {self._queue_threshold}]"
|
||
@@ -70,18 +72,20 @@ class NotContinuousSlidingWindow(SlidingWindow):
|
||
|
||
class ContinuousSlidingWindow(SlidingWindow):
|
||
def is_slow_io_event(self, data):
|
||
- super().push(data)
|
||
- if len(self._io_data_queue) < self._queue_length or self._ai_threshold is None:
|
||
- return False, self._io_data_queue, self._ai_threshold
|
||
+ is_abnormal_period = super().push(data)
|
||
+ is_slow_io_event = False
|
||
+ if len(self._io_data_queue) < self._queue_length or (self._ai_threshold is None and self._abs_threshold is None):
|
||
+ is_slow_io_event = False
|
||
consecutive_count = 0
|
||
for tag in self._io_data_queue_abnormal_tag:
|
||
if tag:
|
||
consecutive_count += 1
|
||
if consecutive_count >= self._queue_threshold:
|
||
- return True, self._io_data_queue, self._ai_threshold
|
||
+ is_slow_io_event = True
|
||
+ break
|
||
else:
|
||
consecutive_count = 0
|
||
- return False, self._io_data_queue, self._ai_threshold
|
||
+ return (is_slow_io_event, is_abnormal_period), self._io_data_queue, self._ai_threshold, self._abs_threshold
|
||
|
||
def __repr__(self):
|
||
return f"[ContinuousSlidingWindow, window size: {self._queue_length}, threshold: {self._queue_threshold}]"
|
||
@@ -89,20 +93,23 @@ class ContinuousSlidingWindow(SlidingWindow):
|
||
|
||
class MedianSlidingWindow(SlidingWindow):
|
||
def is_slow_io_event(self, data):
|
||
- super().push(data)
|
||
- if len(self._io_data_queue) < self._queue_length or self._ai_threshold is None:
|
||
- return False, self._io_data_queue, self._ai_threshold
|
||
+ is_abnormal_period = super().push(data)
|
||
+ is_slow_io_event = False
|
||
+ if len(self._io_data_queue) < self._queue_length or (self._ai_threshold is None and self._abs_threshold is None):
|
||
+ is_slow_io_event = False
|
||
median = np.median(self._io_data_queue)
|
||
if median >= self._ai_threshold:
|
||
- return True, self._io_data_queue, self._ai_threshold
|
||
- return False, self._io_data_queue, self._ai_threshold
|
||
+ is_slow_io_event = True
|
||
+ return (is_slow_io_event, is_abnormal_period), self._io_data_queue, self._ai_threshold, self._abs_threshold
|
||
|
||
def __repr__(self):
|
||
return f"[MedianSlidingWindow, window size: {self._queue_length}]"
|
||
|
||
|
||
class SlidingWindowFactory:
|
||
- def get_sliding_window(self, sliding_window_type: SlidingWindowType, *args, **kwargs):
|
||
+ def get_sliding_window(
|
||
+ self, sliding_window_type: SlidingWindowType, *args, **kwargs
|
||
+ ):
|
||
if sliding_window_type == SlidingWindowType.NotContinuousSlidingWindow:
|
||
return NotContinuousSlidingWindow(*args, **kwargs)
|
||
elif sliding_window_type == SlidingWindowType.ContinuousSlidingWindow:
|
||
diff --git a/src/python/sentryPlugins/ai_block_io/utils.py b/src/python/sentryPlugins/ai_block_io/utils.py
|
||
index 0ed37b9..d6f4067 100644
|
||
--- a/src/python/sentryPlugins/ai_block_io/utils.py
|
||
+++ b/src/python/sentryPlugins/ai_block_io/utils.py
|
||
@@ -19,53 +19,57 @@ from .io_data import MetricName, IOData
|
||
|
||
|
||
def get_threshold_type_enum(algorithm_type: str):
|
||
- if algorithm_type.lower() == 'absolute':
|
||
+ if algorithm_type.lower() == "absolute":
|
||
return ThresholdType.AbsoluteThreshold
|
||
- if algorithm_type.lower() == 'boxplot':
|
||
+ if algorithm_type.lower() == "boxplot":
|
||
return ThresholdType.BoxplotThreshold
|
||
- if algorithm_type.lower() == 'n_sigma':
|
||
+ if algorithm_type.lower() == "n_sigma":
|
||
return ThresholdType.NSigmaThreshold
|
||
return None
|
||
|
||
|
||
def get_sliding_window_type_enum(sliding_window_type: str):
|
||
- if sliding_window_type.lower() == 'not_continuous':
|
||
+ if sliding_window_type.lower() == "not_continuous":
|
||
return SlidingWindowType.NotContinuousSlidingWindow
|
||
- if sliding_window_type.lower() == 'continuous':
|
||
+ if sliding_window_type.lower() == "continuous":
|
||
return SlidingWindowType.ContinuousSlidingWindow
|
||
- if sliding_window_type.lower() == 'median':
|
||
+ if sliding_window_type.lower() == "median":
|
||
return SlidingWindowType.MedianSlidingWindow
|
||
- logging.warning(f"the sliding window type: {sliding_window_type} you set is invalid, use default value: not_continuous")
|
||
- return SlidingWindowType.NotContinuousSlidingWindow
|
||
+ return None
|
||
|
||
|
||
-def get_metric_value_from_io_data_dict_by_metric_name(io_data_dict: dict, metric_name: MetricName):
|
||
+def get_metric_value_from_io_data_dict_by_metric_name(
|
||
+ io_data_dict: dict, metric_name: MetricName
|
||
+):
|
||
try:
|
||
- io_data: IOData = io_data_dict[metric_name.get_disk_name()]
|
||
- io_stage_data = asdict(io_data)[metric_name.get_stage_name()]
|
||
- base_data = io_stage_data[metric_name.get_io_access_type_name()]
|
||
- metric_value = base_data[metric_name.get_metric_name()]
|
||
+ io_data: IOData = io_data_dict[metric_name.disk_name]
|
||
+ io_stage_data = asdict(io_data)[metric_name.stage_name]
|
||
+ base_data = io_stage_data[metric_name.io_access_type_name]
|
||
+ metric_value = base_data[metric_name.metric_name]
|
||
return metric_value
|
||
except KeyError:
|
||
return None
|
||
|
||
|
||
-def get_data_queue_size_and_update_size(training_data_duration: float, train_update_duration: float,
|
||
- slow_io_detect_frequency: int):
|
||
+def get_data_queue_size_and_update_size(
|
||
+ training_data_duration: float,
|
||
+ train_update_duration: float,
|
||
+ slow_io_detect_frequency: int,
|
||
+):
|
||
data_queue_size = int(training_data_duration * 60 * 60 / slow_io_detect_frequency)
|
||
update_size = int(train_update_duration * 60 * 60 / slow_io_detect_frequency)
|
||
return data_queue_size, update_size
|
||
|
||
|
||
def get_log_level(log_level: str):
|
||
- if log_level.lower() == 'debug':
|
||
+ if log_level.lower() == "debug":
|
||
return logging.DEBUG
|
||
- elif log_level.lower() == 'info':
|
||
+ elif log_level.lower() == "info":
|
||
return logging.INFO
|
||
- elif log_level.lower() == 'warning':
|
||
+ elif log_level.lower() == "warning":
|
||
return logging.WARNING
|
||
- elif log_level.lower() == 'error':
|
||
+ elif log_level.lower() == "error":
|
||
return logging.ERROR
|
||
- elif log_level.lower() == 'critical':
|
||
+ elif log_level.lower() == "critical":
|
||
return logging.CRITICAL
|
||
return logging.INFO
|
||
--
|
||
2.23.0
|
||
|