ai_block_io support stage and iotype

Signed-off-by: 贺有志 <1037617413@qq.com>
This commit is contained in:
贺有志 2024-10-11 13:54:50 +00:00 committed by Gitee
parent 26b0c34e50
commit 43f4e9ae10
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
2 changed files with 914 additions and 1 deletions

View File

@ -0,0 +1,906 @@
From 13dc3712b4530a312aa43610f7696a4a62f30e96 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E8=B4=BA=E6=9C=89=E5=BF=97?= <1037617413@qq.com>
Date: Fri, 11 Oct 2024 21:50:32 +0800
Subject: [PATCH] ai_block_io support stage and iotype
---
config/plugins/ai_block_io.ini | 7 +-
.../sentryPlugins/ai_block_io/ai_block_io.py | 126 +++--
.../ai_block_io/config_parser.py | 471 +++++++++++++-----
.../sentryPlugins/ai_block_io/data_access.py | 11 +-
.../sentryPlugins/ai_block_io/detector.py | 25 +
src/python/sentryPlugins/ai_block_io/utils.py | 3 +-
6 files changed, 453 insertions(+), 190 deletions(-)
diff --git a/config/plugins/ai_block_io.ini b/config/plugins/ai_block_io.ini
index 01ce266..a814d52 100644
--- a/config/plugins/ai_block_io.ini
+++ b/config/plugins/ai_block_io.ini
@@ -1,7 +1,12 @@
+[log]
+level=info
+
[common]
absolute_threshold=40
slow_io_detect_frequency=1
-log_level=info
+disk=default
+stage=bio
+iotype=read,write
[algorithm]
train_data_duration=24
diff --git a/src/python/sentryPlugins/ai_block_io/ai_block_io.py b/src/python/sentryPlugins/ai_block_io/ai_block_io.py
index 77104a9..e1052ec 100644
--- a/src/python/sentryPlugins/ai_block_io/ai_block_io.py
+++ b/src/python/sentryPlugins/ai_block_io/ai_block_io.py
@@ -13,7 +13,7 @@ import time
import signal
import logging
-from .detector import Detector
+from .detector import Detector, DiskDetector
from .threshold import ThresholdFactory, AbsoluteThreshold
from .sliding_window import SlidingWindowFactory
from .utils import get_data_queue_size_and_update_size
@@ -34,8 +34,8 @@ def sig_handler(signum, frame):
class SlowIODetection:
_config_parser = None
_disk_list = None
- _detector_name_list = []
- _detectors = {}
+ _detector_name_list = {}
+ _disk_detectors = {}
def __init__(self, config_parser: ConfigParser):
self._config_parser = config_parser
@@ -43,85 +43,101 @@ class SlowIODetection:
self.__init_detector()
def __init_detector_name_list(self):
- self._disk_list = check_collect_valid(self._config_parser.get_slow_io_detect_frequency())
+ self._disk_list = check_collect_valid(self._config_parser.slow_io_detect_frequency)
if self._disk_list is None:
Report.report_pass("get available disk error, please check if the collector plug is enable. exiting...")
exit(1)
logging.info(f"ai_block_io plug has found disks: {self._disk_list}")
- disks_to_detection: list = self._config_parser.get_disks_to_detection()
+ disks: list = self._config_parser.disks_to_detection
+ stages: list = self._config_parser.stage
+ iotypes: list = self._config_parser.iotype
# 情况1None则启用所有磁盘检测
# 情况2is not None and len = 0则不启动任何磁盘检测
# 情况3len = 0则取交集
- if disks_to_detection is None:
+ if disks is None:
logging.warning("you not specify any disk or use default, so ai_block_io will enable all available disk.")
for disk in self._disk_list:
- self._detector_name_list.append(MetricName(disk, "bio", "read", "latency"))
- self._detector_name_list.append(MetricName(disk, "bio", "write", "latency"))
- elif len(disks_to_detection) == 0:
- logging.warning('please attention: conf file not specify any disk to detection, so it will not start ai block io.')
+ for stage in stages:
+ for iotype in iotypes:
+ if disk not in self._detector_name_list:
+ self._detector_name_list[disk] = []
+ self._detector_name_list[disk].append(MetricName(disk, stage, iotype, "latency"))
else:
- for disk_to_detection in disks_to_detection:
- if disk_to_detection in self._disk_list:
- self._detector_name_list.append(MetricName(disk_to_detection, "bio", "read", "latency"))
- self._detector_name_list.append(MetricName(disk_to_detection, "bio", "write", "latency"))
+ for disk in disks:
+ if disk in self._disk_list:
+ for stage in stages:
+ for iotype in iotypes:
+ if disk not in self._detector_name_list:
+ self._detector_name_list[disk] = []
+ self._detector_name_list[disk].append(MetricName(disk, stage, iotype, "latency"))
else:
- logging.warning(f"disk[{disk_to_detection}] not in available disk list, so it will be ignored.")
- logging.info(f'start to detection follow disk and it\'s metric: {self._detector_name_list}')
+ logging.warning("disk: [%s] not in available disk list, so it will be ignored.", disk)
+ if len(self._detector_name_list) == 0:
+ logging.critical("the disks to detection is empty, ai_block_io will exit.")
+ Report.report_pass("the disks to detection is empty, ai_block_io will exit.")
+ exit(1)
def __init_detector(self):
- train_data_duration, train_update_duration = (self._config_parser.
- get_train_data_duration_and_train_update_duration())
- slow_io_detection_frequency = self._config_parser.get_slow_io_detect_frequency()
- threshold_type = self._config_parser.get_algorithm_type()
- data_queue_size, update_size = get_data_queue_size_and_update_size(train_data_duration,
- train_update_duration,
- slow_io_detection_frequency)
- sliding_window_type = self._config_parser.get_sliding_window_type()
- window_size, window_threshold = self._config_parser.get_window_size_and_window_minimum_threshold()
-
- for detector_name in self._detector_name_list:
- threshold = ThresholdFactory().get_threshold(threshold_type,
- boxplot_parameter=self._config_parser.get_boxplot_parameter(),
- n_sigma_paramter=self._config_parser.get_n_sigma_parameter(),
- data_queue_size=data_queue_size,
- data_queue_update_size=update_size)
- sliding_window = SlidingWindowFactory().get_sliding_window(sliding_window_type, queue_length=window_size,
- threshold=window_threshold)
- detector = Detector(detector_name, threshold, sliding_window)
- # 绝对阈值的阈值初始化
- if isinstance(threshold, AbsoluteThreshold):
- threshold.set_threshold(self._config_parser.get_absolute_threshold())
- self._detectors[detector_name] = detector
- logging.info(f"add detector: {detector}")
+ train_data_duration, train_update_duration = (
+ self._config_parser.get_train_data_duration_and_train_update_duration()
+ )
+ slow_io_detection_frequency = self._config_parser.slow_io_detect_frequency
+ threshold_type = self._config_parser.algorithm_type
+ data_queue_size, update_size = get_data_queue_size_and_update_size(
+ train_data_duration, train_update_duration, slow_io_detection_frequency
+ )
+ sliding_window_type = self._config_parser.sliding_window_type
+ window_size, window_threshold = (self._config_parser.get_window_size_and_window_minimum_threshold())
+
+ for disk, metric_name_list in self._detector_name_list.items():
+ threshold = ThresholdFactory().get_threshold(
+ threshold_type,
+ boxplot_parameter=self._config_parser.boxplot_parameter,
+ n_sigma_paramter=self._config_parser.n_sigma_parameter,
+ data_queue_size=data_queue_size,
+ data_queue_update_size=update_size,
+ )
+ sliding_window = SlidingWindowFactory().get_sliding_window(
+ sliding_window_type,
+ queue_length=window_size,
+ threshold=window_threshold,
+ )
+ disk_detector = DiskDetector(disk)
+ for metric_name in metric_name_list:
+ detector = Detector(metric_name, threshold, sliding_window)
+ disk_detector.add_detector(detector)
+ logging.info(f'disk: [{disk}] add detector:\n [{disk_detector}]')
+ self._disk_detectors[disk] = disk_detector
def launch(self):
while True:
- logging.debug('step0. AI threshold slow io event detection is looping.')
+ logging.debug("step0. AI threshold slow io event detection is looping.")
# Step1获取IO数据
io_data_dict_with_disk_name = get_io_data_from_collect_plug(
- self._config_parser.get_slow_io_detect_frequency(), self._disk_list
+ self._config_parser.slow_io_detect_frequency, self._disk_list
)
- logging.debug(f'step1. Get io data: {str(io_data_dict_with_disk_name)}')
+ logging.debug(f"step1. Get io data: {str(io_data_dict_with_disk_name)}")
if io_data_dict_with_disk_name is None:
- Report.report_pass("get io data error, please check if the collector plug is enable. exitting...")
+ Report.report_pass(
+ "get io data error, please check if the collector plug is enable. exitting..."
+ )
exit(1)
# Step2慢IO检测
- logging.debug('step2. Start to detection slow io event.')
+ logging.debug("step2. Start to detection slow io event.")
slow_io_event_list = []
- for metric_name, detector in self._detectors.items():
- result = detector.is_slow_io_event(io_data_dict_with_disk_name)
+ for disk, disk_detector in self._disk_detectors.items():
+ result = disk_detector.is_slow_io_event(io_data_dict_with_disk_name)
if result[0]:
- slow_io_event_list.append((detector.get_metric_name(), result))
- logging.debug('step2. End to detection slow io event.')
+ slow_io_event_list.append(result)
+ logging.debug("step2. End to detection slow io event.")
# Step3慢IO事件上报
- logging.debug('step3. Report slow io event to sysSentry.')
+ logging.debug("step3. Report slow io event to sysSentry.")
for slow_io_event in slow_io_event_list:
- metric_name: MetricName = slow_io_event[0]
- result = slow_io_event[1]
+ metric_name: MetricName = slow_io_event[1]
alarm_content = {
"driver_name": f"{metric_name.get_disk_name()}",
"reason": "disk_slow",
@@ -129,14 +145,14 @@ class SlowIODetection:
"io_type": f"{metric_name.get_io_access_type_name()}",
"alarm_source": "ai_block_io",
"alarm_type": "latency",
- "details": f"current window is: {result[1]}, threshold is: {result[2]}.",
+ "details": f"current window is: {slow_io_event[2]}, threshold is: {slow_io_event[3]}.",
}
Xalarm.major(alarm_content)
logging.warning(alarm_content)
# Step4等待检测时间
- logging.debug('step4. Wait to start next slow io event detection loop.')
- time.sleep(self._config_parser.get_slow_io_detect_frequency())
+ logging.debug("step4. Wait to start next slow io event detection loop.")
+ time.sleep(self._config_parser.slow_io_detect_frequency)
def main():
diff --git a/src/python/sentryPlugins/ai_block_io/config_parser.py b/src/python/sentryPlugins/ai_block_io/config_parser.py
index 354c122..a357766 100644
--- a/src/python/sentryPlugins/ai_block_io/config_parser.py
+++ b/src/python/sentryPlugins/ai_block_io/config_parser.py
@@ -9,44 +9,60 @@
# PURPOSE.
# See the Mulan PSL v2 for more details.
+import os
import configparser
import logging
+from .alarm_report import Report
from .threshold import ThresholdType
from .utils import get_threshold_type_enum, get_sliding_window_type_enum, get_log_level
LOG_FORMAT = "%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s"
+ALL_STAGE_LIST = ['throtl', 'wbt', 'gettag', 'plug', 'deadline', 'hctx', 'requeue', 'rq_driver', 'bio']
+ALL_IOTPYE_LIST = ['read', 'write']
+
def init_log_format(log_level: str):
logging.basicConfig(level=get_log_level(log_level.lower()), format=LOG_FORMAT)
- if log_level.lower() not in ('info', 'warning', 'error', 'debug'):
- logging.warning(f'the log_level: {log_level} you set is invalid, use default value: info.')
+ if log_level.lower() not in ("info", "warning", "error", "debug"):
+ logging.warning(
+ f"the log_level: {log_level} you set is invalid, use default value: info."
+ )
class ConfigParser:
DEFAULT_ABSOLUTE_THRESHOLD = 40
DEFAULT_SLOW_IO_DETECTION_FREQUENCY = 1
- DEFAULT_LOG_LEVEL = 'info'
+ DEFAULT_LOG_LEVEL = "info"
+
+ DEFAULT_STAGE = 'throtl,wbt,gettag,plug,deadline,hctx,requeue,rq_driver,bio'
+ DEFAULT_IOTYPE = 'read,write'
- DEFAULT_ALGORITHM_TYPE = 'boxplot'
+ DEFAULT_ALGORITHM_TYPE = "boxplot"
DEFAULT_TRAIN_DATA_DURATION = 24
DEFAULT_TRAIN_UPDATE_DURATION = 2
DEFAULT_BOXPLOT_PARAMETER = 1.5
DEFAULT_N_SIGMA_PARAMETER = 3
- DEFAULT_SLIDING_WINDOW_TYPE = 'not_continuous'
+ DEFAULT_SLIDING_WINDOW_TYPE = "not_continuous"
DEFAULT_WINDOW_SIZE = 30
DEFAULT_WINDOW_MINIMUM_THRESHOLD = 6
def __init__(self, config_file_name):
self.__absolute_threshold = ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD
- self.__slow_io_detect_frequency = ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY
+ self.__slow_io_detect_frequency = (
+ ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY
+ )
self.__log_level = ConfigParser.DEFAULT_LOG_LEVEL
self.__disks_to_detection = None
+ self.__stage = ConfigParser.DEFAULT_STAGE
+ self.__iotype = ConfigParser.DEFAULT_IOTYPE
- self.__algorithm_type = ConfigParser.DEFAULT_ALGORITHM_TYPE
+ self.__algorithm_type = get_threshold_type_enum(
+ ConfigParser.DEFAULT_ALGORITHM_TYPE
+ )
self.__train_data_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION
self.__train_update_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION
self.__boxplot_parameter = ConfigParser.DEFAULT_BOXPLOT_PARAMETER
@@ -58,199 +74,398 @@ class ConfigParser:
self.__config_file_name = config_file_name
- def __read_absolute_threshold(self, items_common: dict):
+ def _get_config_value(
+ self,
+ config_items: dict,
+ key: str,
+ value_type,
+ default_value=None,
+ gt=None,
+ ge=None,
+ lt=None,
+ le=None,
+ ):
+ value = config_items.get(key)
+ if value is None:
+ logging.warning(
+ "config of %s not found, the default value %s will be used.",
+ key,
+ default_value,
+ )
+ value = default_value
+ if not value:
+ logging.critical(
+ "the value of %s is empty, ai_block_io plug will exit.", key
+ )
+ Report.report_pass(
+ f"the value of {key} is empty, ai_block_io plug will exit."
+ )
+ exit(1)
try:
- self.__absolute_threshold = float(items_common.get('absolute_threshold',
- ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD))
- if self.__absolute_threshold <= 0:
- logging.warning(
- f'the_absolute_threshold: {self.__absolute_threshold} you set is invalid, use default value: {ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD}.')
- self.__absolute_threshold = ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD
+ value = value_type(value)
except ValueError:
- self.__absolute_threshold = ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD
- logging.warning(
- f'the_absolute_threshold type conversion has error, use default value: {self.__absolute_threshold}.')
+ logging.critical(
+ "the value of %s is not a valid %s, ai_block_io plug will exit.",
+ key,
+ value_type,
+ )
+ Report.report_pass(
+ f"the value of {key} is not a valid {value_type}, ai_block_io plug will exit."
+ )
+ exit(1)
+ if gt is not None and value <= gt:
+ logging.critical(
+ "the value of %s is not greater than %s, ai_block_io plug will exit.",
+ key,
+ gt,
+ )
+ Report.report_pass(
+ f"the value of {key} is not greater than {gt}, ai_block_io plug will exit."
+ )
+ exit(1)
+ if ge is not None and value < ge:
+ logging.critical(
+ "the value of %s is not greater than or equal to %s, ai_block_io plug will exit.",
+ key,
+ ge,
+ )
+ Report.report_pass(
+ f"the value of {key} is not greater than or equal to {ge}, ai_block_io plug will exit."
+ )
+ exit(1)
+ if lt is not None and value >= lt:
+ logging.critical(
+ "the value of %s is not less than %s, ai_block_io plug will exit.",
+ key,
+ lt,
+ )
+ Report.report_pass(
+ f"the value of {key} is not less than {lt}, ai_block_io plug will exit."
+ )
+ exit(1)
+ if le is not None and value > le:
+ logging.critical(
+ "the value of %s is not less than or equal to %s, ai_block_io plug will exit.",
+ key,
+ le,
+ )
+ Report.report_pass(
+ f"the value of {key} is not less than or equal to {le}, ai_block_io plug will exit."
+ )
+ exit(1)
+
+ return value
+
+ def __read_absolute_threshold(self, items_common: dict):
+ self.__absolute_threshold = self._get_config_value(
+ items_common,
+ "absolute_threshold",
+ float,
+ ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD,
+ gt=0,
+ )
def __read__slow_io_detect_frequency(self, items_common: dict):
- try:
- self.__slow_io_detect_frequency = int(items_common.get('slow_io_detect_frequency',
- ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY))
- if self.__slow_io_detect_frequency < 1 or self.__slow_io_detect_frequency > 10:
- logging.warning(
- f'the slow_io_detect_frequency: {self.__slow_io_detect_frequency} you set is invalid, use default value: {ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY}.')
- self.__slow_io_detect_frequency = ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY
- except ValueError:
- self.__slow_io_detect_frequency = ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY
- logging.warning(f'slow_io_detect_frequency type conversion has error, use default value: {self.__slow_io_detect_frequency}.')
+ self.__slow_io_detect_frequency = self._get_config_value(
+ items_common,
+ "slow_io_detect_frequency",
+ int,
+ ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY,
+ gt=0,
+ le=300,
+ )
def __read__disks_to_detect(self, items_common: dict):
- disks_to_detection = items_common.get('disk')
+ disks_to_detection = items_common.get("disk")
if disks_to_detection is None:
- logging.warning(f'config of disk not found, the default value will be used.')
+ logging.warning("config of disk not found, the default value will be used.")
self.__disks_to_detection = None
return
- disk_list = disks_to_detection.split(',')
- if len(disk_list) == 0 or (len(disk_list) == 1 and disk_list[0] == ''):
- logging.warning("you don't specify any disk.")
- self.__disks_to_detection = []
- return
- if len(disk_list) == 1 and disk_list[0] == 'default':
+ disks_to_detection = disks_to_detection.strip()
+ if not disks_to_detection:
+ logging.critical("the value of disk is empty, ai_block_io plug will exit.")
+ Report.report_pass(
+ "the value of disk is empty, ai_block_io plug will exit."
+ )
+ exit(1)
+ disk_list = disks_to_detection.split(",")
+ if len(disk_list) == 1 and disk_list[0] == "default":
self.__disks_to_detection = None
return
self.__disks_to_detection = disk_list
def __read__train_data_duration(self, items_algorithm: dict):
- try:
- self.__train_data_duration = float(items_algorithm.get('train_data_duration',
- ConfigParser.DEFAULT_TRAIN_DATA_DURATION))
- if self.__train_data_duration <= 0 or self.__train_data_duration > 720:
- logging.warning(
- f'the train_data_duration: {self.__train_data_duration} you set is invalid, use default value: {ConfigParser.DEFAULT_TRAIN_DATA_DURATION}.')
- self.__train_data_duration = ConfigParser.DEFAULT_TRAIN_DATA_DURATION
- except ValueError:
- self.__train_data_duration = ConfigParser.DEFAULT_TRAIN_DATA_DURATION
- logging.warning(f'the train_data_duration type conversion has error, use default value: {self.__train_data_duration}.')
+ self.__train_data_duration = self._get_config_value(
+ items_algorithm,
+ "train_data_duration",
+ float,
+ ConfigParser.DEFAULT_TRAIN_DATA_DURATION,
+ gt=0,
+ le=720,
+ )
def __read__train_update_duration(self, items_algorithm: dict):
default_train_update_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION
if default_train_update_duration > self.__train_data_duration:
default_train_update_duration = self.__train_data_duration / 2
-
- try:
- self.__train_update_duration = float(items_algorithm.get('train_update_duration',
- ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION))
- if self.__train_update_duration <= 0 or self.__train_update_duration > self.__train_data_duration:
- logging.warning(
- f'the train_update_duration: {self.__train_update_duration} you set is invalid, use default value: {default_train_update_duration}.')
- self.__train_update_duration = default_train_update_duration
- except ValueError:
- self.__train_update_duration = default_train_update_duration
- logging.warning(f'the train_update_duration type conversion has error, use default value: {self.__train_update_duration}.')
+ self.__train_update_duration = self._get_config_value(
+ items_algorithm,
+ "train_update_duration",
+ float,
+ default_train_update_duration,
+ gt=0,
+ le=self.__train_data_duration,
+ )
def __read__algorithm_type_and_parameter(self, items_algorithm: dict):
- algorithm_type = items_algorithm.get('algorithm_type', ConfigParser.DEFAULT_ALGORITHM_TYPE)
+ algorithm_type = items_algorithm.get(
+ "algorithm_type", ConfigParser.DEFAULT_ALGORITHM_TYPE
+ )
self.__algorithm_type = get_threshold_type_enum(algorithm_type)
+ if self.__algorithm_type is None:
+ logging.critical(
+ "the algorithm_type: %s you set is invalid. ai_block_io plug will exit.",
+ algorithm_type,
+ )
+ Report.report_pass(
+ f"the algorithm_type: {algorithm_type} you set is invalid. ai_block_io plug will exit."
+ )
+ exit(1)
if self.__algorithm_type == ThresholdType.NSigmaThreshold:
- try:
- self.__n_sigma_parameter = float(items_algorithm.get('n_sigma_parameter',
- ConfigParser.DEFAULT_N_SIGMA_PARAMETER))
- if self.__n_sigma_parameter <= 0 or self.__n_sigma_parameter > 10:
- logging.warning(
- f'the n_sigma_parameter: {self.__n_sigma_parameter} you set is invalid, use default value: {ConfigParser.DEFAULT_N_SIGMA_PARAMETER}.')
- self.__n_sigma_parameter = ConfigParser.DEFAULT_N_SIGMA_PARAMETER
- except ValueError:
- self.__n_sigma_parameter = ConfigParser.DEFAULT_N_SIGMA_PARAMETER
- logging.warning(f'the n_sigma_parameter type conversion has error, use default value: {self.__n_sigma_parameter}.')
+ self.__n_sigma_parameter = self._get_config_value(
+ items_algorithm,
+ "n_sigma_parameter",
+ float,
+ ConfigParser.DEFAULT_N_SIGMA_PARAMETER,
+ gt=0,
+ le=10,
+ )
elif self.__algorithm_type == ThresholdType.BoxplotThreshold:
- try:
- self.__boxplot_parameter = float(items_algorithm.get('boxplot_parameter',
- ConfigParser.DEFAULT_BOXPLOT_PARAMETER))
- if self.__boxplot_parameter <= 0 or self.__boxplot_parameter > 10:
- logging.warning(
- f'the boxplot_parameter: {self.__boxplot_parameter} you set is invalid, use default value: {ConfigParser.DEFAULT_BOXPLOT_PARAMETER}.')
- self.__n_sigma_parameter = ConfigParser.DEFAULT_BOXPLOT_PARAMETER
- except ValueError:
- self.__boxplot_parameter = ConfigParser.DEFAULT_BOXPLOT_PARAMETER
- logging.warning(f'the boxplot_parameter type conversion has error, use default value: {self.__boxplot_parameter}.')
+ self.__boxplot_parameter = self._get_config_value(
+ items_algorithm,
+ "boxplot_parameter",
+ float,
+ ConfigParser.DEFAULT_BOXPLOT_PARAMETER,
+ gt=0,
+ le=10,
+ )
+
+ def __read__stage(self, items_algorithm: dict):
+ stage_str = items_algorithm.get('stage', ConfigParser.DEFAULT_STAGE)
+ stage_list = stage_str.split(',')
+ if len(stage_list) == 1 and stage_list[0] == '':
+ logging.critical('stage value not allow is empty, exiting...')
+ exit(1)
+ if len(stage_list) == 1 and stage_list[0] == 'default':
+ logging.warning(f'stage will enable default value: {ConfigParser.DEFAULT_STAGE}')
+ self.__stage = ALL_STAGE_LIST
+ return
+ for stage in stage_list:
+ if stage not in ALL_STAGE_LIST:
+ logging.critical(f'stage: {stage} is not valid stage, ai_block_io will exit...')
+ exit(1)
+ dup_stage_list = set(stage_list)
+ if 'bio' not in dup_stage_list:
+ logging.critical('stage must contains bio stage, exiting...')
+ exit(1)
+ self.__stage = dup_stage_list
+
+ def __read__iotype(self, items_algorithm: dict):
+ iotype_str = items_algorithm.get('iotype', ConfigParser.DEFAULT_IOTYPE)
+ iotype_list = iotype_str.split(',')
+ if len(iotype_list) == 1 and iotype_list[0] == '':
+ logging.critical('iotype value not allow is empty, exiting...')
+ exit(1)
+ if len(iotype_list) == 1 and iotype_list[0] == 'default':
+ logging.warning(f'iotype will enable default value: {ConfigParser.DEFAULT_IOTYPE}')
+ self.__iotype = ALL_IOTPYE_LIST
+ return
+ for iotype in iotype_list:
+ if iotype not in ALL_IOTPYE_LIST:
+ logging.critical(f'iotype: {iotype} is not valid iotype, ai_block_io will exit...')
+ exit(1)
+ dup_iotype_list = set(iotype_list)
+ self.__iotype = dup_iotype_list
def __read__window_size(self, items_sliding_window: dict):
- try:
- self.__window_size = int(items_sliding_window.get('window_size',
- ConfigParser.DEFAULT_WINDOW_SIZE))
- if self.__window_size < 1 or self.__window_size > 3600:
- logging.warning(
- f'the window_size: {self.__window_size} you set is invalid, use default value: {ConfigParser.DEFAULT_WINDOW_SIZE}.')
- self.__window_size = ConfigParser.DEFAULT_WINDOW_SIZE
- except ValueError:
- self.__window_size = ConfigParser.DEFAULT_WINDOW_SIZE
- logging.warning(f'window_size type conversion has error, use default value: {self.__window_size}.')
+ self.__window_size = self._get_config_value(
+ items_sliding_window,
+ "window_size",
+ int,
+ ConfigParser.DEFAULT_WINDOW_SIZE,
+ gt=0,
+ le=3600,
+ )
def __read__window_minimum_threshold(self, items_sliding_window: dict):
default_window_minimum_threshold = ConfigParser.DEFAULT_WINDOW_MINIMUM_THRESHOLD
if default_window_minimum_threshold > self.__window_size:
default_window_minimum_threshold = self.__window_size / 2
- try:
- self.__window_minimum_threshold = (
- int(items_sliding_window.get('window_minimum_threshold',
- ConfigParser.DEFAULT_WINDOW_MINIMUM_THRESHOLD)))
- if self.__window_minimum_threshold < 1 or self.__window_minimum_threshold > self.__window_size:
- logging.warning(
- f'the window_minimum_threshold: {self.__window_minimum_threshold} you set is invalid, use default value: {default_window_minimum_threshold}.')
- self.__window_minimum_threshold = default_window_minimum_threshold
- except ValueError:
- self.__window_minimum_threshold = default_window_minimum_threshold
- logging.warning(f'window_minimum_threshold type conversion has error, use default value: {self.__window_minimum_threshold}.')
+ self.__window_minimum_threshold = self._get_config_value(
+ items_sliding_window,
+ "window_minimum_threshold",
+ int,
+ default_window_minimum_threshold,
+ gt=0,
+ le=self.__window_size,
+ )
def read_config_from_file(self):
+ if not os.path.exists(self.__config_file_name):
+ init_log_format(self.__log_level)
+ logging.critical(
+ "config file %s not found, ai_block_io plug will exit.",
+ self.__config_file_name,
+ )
+ Report.report_pass(
+ f"config file {self.__config_file_name} not found, ai_block_io plug will exit."
+ )
+ exit(1)
+
con = configparser.ConfigParser()
try:
- con.read(self.__config_file_name, encoding='utf-8')
+ con.read(self.__config_file_name, encoding="utf-8")
except configparser.Error as e:
init_log_format(self.__log_level)
- logging.critical(f'config file read error: {e}, ai_block_io plug will exit.')
+ logging.critical(
+ f"config file read error: %s, ai_block_io plug will exit.", e
+ )
+ Report.report_pass(
+ f"config file read error: {e}, ai_block_io plug will exit."
+ )
exit(1)
- if con.has_section('common'):
- items_common = dict(con.items('common'))
- self.__log_level = items_common.get('log_level', ConfigParser.DEFAULT_LOG_LEVEL)
+ if con.has_section('log'):
+ items_log = dict(con.items('log'))
+ # 情况一没有log则使用默认值
+ # 情况二有log值为空或异常使用默认值
+ # 情况三有log值正常则使用该值
+ self.__log_level = items_log.get('level', ConfigParser.DEFAULT_LOG_LEVEL)
init_log_format(self.__log_level)
+ else:
+ init_log_format(self.__log_level)
+ logging.warning(f"log section parameter not found, it will be set to default value.")
+
+ if con.has_section("common"):
+ items_common = dict(con.items("common"))
self.__read_absolute_threshold(items_common)
self.__read__slow_io_detect_frequency(items_common)
self.__read__disks_to_detect(items_common)
+ self.__read__stage(items_common)
+ self.__read__iotype(items_common)
else:
- init_log_format(self.__log_level)
- logging.warning("common section parameter not found, it will be set to default value.")
+ logging.warning(
+ "common section parameter not found, it will be set to default value."
+ )
- if con.has_section('algorithm'):
- items_algorithm = dict(con.items('algorithm'))
+ if con.has_section("algorithm"):
+ items_algorithm = dict(con.items("algorithm"))
self.__read__train_data_duration(items_algorithm)
self.__read__train_update_duration(items_algorithm)
self.__read__algorithm_type_and_parameter(items_algorithm)
else:
- logging.warning("algorithm section parameter not found, it will be set to default value.")
-
- if con.has_section('sliding_window'):
- items_sliding_window = dict(con.items('sliding_window'))
- sliding_window_type = items_sliding_window.get('sliding_window_type',
- ConfigParser.DEFAULT_SLIDING_WINDOW_TYPE)
- self.__sliding_window_type = get_sliding_window_type_enum(sliding_window_type)
+ logging.warning(
+ "algorithm section parameter not found, it will be set to default value."
+ )
+
+ if con.has_section("sliding_window"):
+ items_sliding_window = dict(con.items("sliding_window"))
+ sliding_window_type = items_sliding_window.get(
+ "sliding_window_type", ConfigParser.DEFAULT_SLIDING_WINDOW_TYPE
+ )
+ self.__sliding_window_type = get_sliding_window_type_enum(
+ sliding_window_type
+ )
self.__read__window_size(items_sliding_window)
self.__read__window_minimum_threshold(items_sliding_window)
else:
- logging.warning("sliding_window section parameter not found, it will be set to default value.")
+ logging.warning(
+ "sliding_window section parameter not found, it will be set to default value."
+ )
self.__print_all_config_value()
+ def __repr__(self):
+ config_str = {
+ 'log.level': self.__log_level,
+ 'common.absolute_threshold': self.__absolute_threshold,
+ 'common.slow_io_detect_frequency': self.__slow_io_detect_frequency,
+ 'common.disk': self.__disks_to_detection,
+ 'common.stage': self.__stage,
+ 'common.iotype': self.__iotype,
+ 'algorithm.train_data_duration': self.__train_data_duration,
+ 'algorithm.train_update_duration': self.__train_update_duration,
+ 'algorithm.algorithm_type': self.__algorithm_type,
+ 'algorithm.boxplot_parameter': self.__boxplot_parameter,
+ 'algorithm.n_sigma_parameter': self.__n_sigma_parameter,
+ 'sliding_window.sliding_window_type': self.__sliding_window_type,
+ 'sliding_window.window_size': self.__window_size,
+ 'sliding_window.window_minimum_threshold': self.__window_minimum_threshold
+ }
+ return str(config_str)
+
def __print_all_config_value(self):
- pass
+ logging.info(f"all config is follow:\n {self}")
+
+ def get_train_data_duration_and_train_update_duration(self):
+ return self.__train_data_duration, self.__train_update_duration
- def get_slow_io_detect_frequency(self):
+ def get_window_size_and_window_minimum_threshold(self):
+ return self.__window_size, self.__window_minimum_threshold
+
+ @property
+ def slow_io_detect_frequency(self):
return self.__slow_io_detect_frequency
- def get_algorithm_type(self):
+ @property
+ def algorithm_type(self):
return self.__algorithm_type
- def get_sliding_window_type(self):
+ @property
+ def sliding_window_type(self):
return self.__sliding_window_type
- def get_train_data_duration_and_train_update_duration(self):
- return self.__train_data_duration, self.__train_update_duration
+ @property
+ def train_data_duration(self):
+ return self.__train_data_duration
- def get_window_size_and_window_minimum_threshold(self):
- return self.__window_size, self.__window_minimum_threshold
+ @property
+ def train_update_duration(self):
+ return self.__train_update_duration
+
+ @property
+ def window_size(self):
+ return self.__window_size
- def get_absolute_threshold(self):
+ @property
+ def window_minimum_threshold(self):
+ return self.__window_minimum_threshold
+
+ @property
+ def absolute_threshold(self):
return self.__absolute_threshold
- def get_log_level(self):
+ @property
+ def log_level(self):
return self.__log_level
- def get_disks_to_detection(self):
+ @property
+ def disks_to_detection(self):
return self.__disks_to_detection
- def get_boxplot_parameter(self):
+ @property
+ def stage(self):
+ return self.__stage
+
+ @property
+ def iotype(self):
+ return self.__iotype
+
+ @property
+ def boxplot_parameter(self):
return self.__boxplot_parameter
- def get_n_sigma_parameter(self):
+ @property
+ def n_sigma_parameter(self):
return self.__n_sigma_parameter
diff --git a/src/python/sentryPlugins/ai_block_io/data_access.py b/src/python/sentryPlugins/ai_block_io/data_access.py
index c7679cd..ed997e6 100644
--- a/src/python/sentryPlugins/ai_block_io/data_access.py
+++ b/src/python/sentryPlugins/ai_block_io/data_access.py
@@ -41,11 +41,14 @@ def check_collect_valid(period):
try:
data = json.loads(data_raw["message"])
except Exception as e:
- logging.warning(f"get io data failed, {e}")
+ logging.warning(f"get valid devices failed, occur exception: {e}")
+ return None
+ if not data:
+ logging.warning(f"get valid devices failed, return {data_raw}")
return None
return [k for k in data.keys()]
else:
- logging.warning(f"get io data failed, return {data_raw}")
+ logging.warning(f"get valid devices failed, return {data_raw}")
return None
@@ -60,7 +63,7 @@ def _get_raw_data(period, disk_list):
def _get_io_stage_data(data):
io_stage_data = IOStageData()
- for data_type in ('read', 'write', 'flush', 'discard'):
+ for data_type in ("read", "write", "flush", "discard"):
if data_type in data:
getattr(io_stage_data, data_type).latency = data[data_type][0]
getattr(io_stage_data, data_type).io_dump = data[data_type][1]
@@ -87,7 +90,7 @@ def get_io_data_from_collect_plug(period, disk_list):
getattr(disk_ret, k)
setattr(disk_ret, k, _get_io_stage_data(v))
except AttributeError:
- logging.debug(f'no attr {k}')
+ logging.debug(f"no attr {k}")
continue
ret[disk] = disk_ret
return ret
diff --git a/src/python/sentryPlugins/ai_block_io/detector.py b/src/python/sentryPlugins/ai_block_io/detector.py
index 0ed282b..e710ddd 100644
--- a/src/python/sentryPlugins/ai_block_io/detector.py
+++ b/src/python/sentryPlugins/ai_block_io/detector.py
@@ -53,3 +53,28 @@ class Detector:
f' io_type_name: {self._metric_name.get_io_access_type_name()},'
f' metric_name: {self._metric_name.get_metric_name()}, threshold_type: {self._threshold},'
f' sliding_window_type: {self._slidingWindow}')
+
+
+class DiskDetector:
+
+ def __init__(self, disk_name: str):
+ self._disk_name = disk_name
+ self._detector_list = []
+
+ def add_detector(self, detector: Detector):
+ self._detector_list.append(detector)
+
+ def is_slow_io_event(self, io_data_dict_with_disk_name: dict):
+ # 只有bio阶段发生异常就认为发生了慢IO事件
+ # todo根因诊断
+ for detector in self._detector_list:
+ result = detector.is_slow_io_event(io_data_dict_with_disk_name)
+ if result[0] and detector.get_metric_name().get_stage_name() == 'bio':
+ return result[0], detector.get_metric_name(), result[1], result[2]
+ return False, None, None, None
+
+ def __repr__(self):
+ msg = f'disk: {self._disk_name}, '
+ for detector in self._detector_list:
+ msg += f'\n detector: [{detector}]'
+ return msg
diff --git a/src/python/sentryPlugins/ai_block_io/utils.py b/src/python/sentryPlugins/ai_block_io/utils.py
index 8dbba06..0ed37b9 100644
--- a/src/python/sentryPlugins/ai_block_io/utils.py
+++ b/src/python/sentryPlugins/ai_block_io/utils.py
@@ -25,8 +25,7 @@ def get_threshold_type_enum(algorithm_type: str):
return ThresholdType.BoxplotThreshold
if algorithm_type.lower() == 'n_sigma':
return ThresholdType.NSigmaThreshold
- logging.warning(f"the algorithm type: {algorithm_type} you set is invalid, use default value: boxplot")
- return ThresholdType.BoxplotThreshold
+ return None
def get_sliding_window_type_enum(sliding_window_type: str):
--
2.23.0

View File

@ -4,7 +4,7 @@
Summary: System Inspection Framework
Name: sysSentry
Version: 1.0.2
Release: 33
Release: 34
License: Mulan PSL v2
Group: System Environment/Daemons
Source0: https://gitee.com/openeuler/sysSentry/releases/download/v%{version}/%{name}-%{version}.tar.gz
@ -50,6 +50,7 @@ Patch37: fix-xalarm_Report-function-not-refuse-alarm-msg-exce.patch
Patch38: fix-xalarm_upgrade-not-return-val-and-fail-when-thre.patch
Patch39: add-log-for-xalarm-when-sending-msg-and-clean-invali.patch
Patch40: add-xalarm-cleanup-invalid-server-socket-peroidly.patch
Patch41: ai_block_io-support-stage-and-iotype.patch
BuildRequires: cmake gcc-c++
BuildRequires: python3 python3-setuptools
@ -294,6 +295,12 @@ rm -rf %{buildroot}
%attr(0550,root,root) %{python3_sitelib}/sentryPlugins/ai_block_io
%changelog
* Fri Oct 11 2024 heyouzhi <heyouzhi@huawei.com> - 1.0.2-34
- Type:requirement
- CVE:NA
- SUG:NA
- DESC:ai_block_io support stage and iotype
* Fri Oct 11 2024 caixiaomeng <caixiaomeng2@huawei.com> - 1.0.2-33
- Type:bugfix
- CVE:NA