sysSentry/add-root-cause-analysis.patch
贺有志 36a07c1468
add root cause analysis
Signed-off-by: 贺有志 <1037617413@qq.com>
2024-10-12 14:02:18 +00:00

1254 lines
55 KiB
Diff
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

From 24f8eddad364e83cfc5b6b1607462ffe524b59f1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E8=B4=BA=E6=9C=89=E5=BF=97?= <1037617413@qq.com>
Date: Sat, 12 Oct 2024 21:59:18 +0800
Subject: [PATCH] add root cause analysis
---
config/plugins/ai_block_io.ini | 15 +-
.../sentryPlugins/ai_block_io/ai_block_io.py | 133 +++--
.../ai_block_io/config_parser.py | 465 +++++++++++-------
.../sentryPlugins/ai_block_io/data_access.py | 1 +
.../sentryPlugins/ai_block_io/detector.py | 54 +-
.../sentryPlugins/ai_block_io/io_data.py | 32 +-
.../ai_block_io/sliding_window.py | 57 ++-
src/python/sentryPlugins/ai_block_io/utils.py | 44 +-
8 files changed, 491 insertions(+), 310 deletions(-)
diff --git a/config/plugins/ai_block_io.ini b/config/plugins/ai_block_io.ini
index a814d52..422cfa3 100644
--- a/config/plugins/ai_block_io.ini
+++ b/config/plugins/ai_block_io.ini
@@ -2,7 +2,6 @@
level=info
[common]
-absolute_threshold=40
slow_io_detect_frequency=1
disk=default
stage=bio
@@ -18,4 +17,16 @@ n_sigma_parameter=3
[sliding_window]
sliding_window_type=not_continuous
window_size=30
-window_minimum_threshold=6
\ No newline at end of file
+window_minimum_threshold=6
+
+[latency_sata_ssd]
+read_tot_lim=50000
+write_tot_lim=50000
+
+[latency_nvme_ssd]
+read_tot_lim=500
+write_tot_lim=500
+
+[latency_sata_hdd]
+read_tot_lim=50000
+write_tot_lim=50000
\ No newline at end of file
diff --git a/src/python/sentryPlugins/ai_block_io/ai_block_io.py b/src/python/sentryPlugins/ai_block_io/ai_block_io.py
index e1052ec..dd661a1 100644
--- a/src/python/sentryPlugins/ai_block_io/ai_block_io.py
+++ b/src/python/sentryPlugins/ai_block_io/ai_block_io.py
@@ -12,13 +12,18 @@
import time
import signal
import logging
+from collections import defaultdict
from .detector import Detector, DiskDetector
-from .threshold import ThresholdFactory, AbsoluteThreshold
+from .threshold import ThresholdFactory
from .sliding_window import SlidingWindowFactory
from .utils import get_data_queue_size_and_update_size
from .config_parser import ConfigParser
-from .data_access import get_io_data_from_collect_plug, check_collect_valid
+from .data_access import (
+ get_io_data_from_collect_plug,
+ check_collect_valid,
+ get_disk_type,
+)
from .io_data import MetricName
from .alarm_report import Xalarm, Report
@@ -34,7 +39,7 @@ def sig_handler(signum, frame):
class SlowIODetection:
_config_parser = None
_disk_list = None
- _detector_name_list = {}
+ _detector_name_list = defaultdict(list)
_disk_detectors = {}
def __init__(self, config_parser: ConfigParser):
@@ -43,9 +48,13 @@ class SlowIODetection:
self.__init_detector()
def __init_detector_name_list(self):
- self._disk_list = check_collect_valid(self._config_parser.slow_io_detect_frequency)
+ self._disk_list = check_collect_valid(
+ self._config_parser.slow_io_detect_frequency
+ )
if self._disk_list is None:
- Report.report_pass("get available disk error, please check if the collector plug is enable. exiting...")
+ Report.report_pass(
+ "get available disk error, please check if the collector plug is enable. exiting..."
+ )
exit(1)
logging.info(f"ai_block_io plug has found disks: {self._disk_list}")
@@ -56,27 +65,45 @@ class SlowIODetection:
# 情况2is not None and len = 0则不启动任何磁盘检测
# 情况3len = 0则取交集
if disks is None:
- logging.warning("you not specify any disk or use default, so ai_block_io will enable all available disk.")
- for disk in self._disk_list:
- for stage in stages:
- for iotype in iotypes:
- if disk not in self._detector_name_list:
- self._detector_name_list[disk] = []
- self._detector_name_list[disk].append(MetricName(disk, stage, iotype, "latency"))
- else:
- for disk in disks:
- if disk in self._disk_list:
- for stage in stages:
- for iotype in iotypes:
- if disk not in self._detector_name_list:
- self._detector_name_list[disk] = []
- self._detector_name_list[disk].append(MetricName(disk, stage, iotype, "latency"))
- else:
- logging.warning("disk: [%s] not in available disk list, so it will be ignored.", disk)
- if len(self._detector_name_list) == 0:
- logging.critical("the disks to detection is empty, ai_block_io will exit.")
- Report.report_pass("the disks to detection is empty, ai_block_io will exit.")
- exit(1)
+ logging.warning(
+ "you not specify any disk or use default, so ai_block_io will enable all available disk."
+ )
+ for disk in self._disk_list:
+ if disks is not None:
+ if disk not in disks:
+ continue
+ disks.remove(disk)
+
+ disk_type_result = get_disk_type(disk)
+ if disk_type_result["ret"] == 0 and disk_type_result["message"] in (
+ '0',
+ '1',
+ '2',
+ ):
+ disk_type = int(disk_type_result["message"])
+ else:
+ logging.warning(
+ "%s get disk type error, return %s, so it will be ignored.",
+ disk,
+ disk_type_result,
+ )
+ continue
+ for stage in stages:
+ for iotype in iotypes:
+ self._detector_name_list[disk].append(
+ MetricName(disk, disk_type, stage, iotype, "latency")
+ )
+ if disks:
+ logging.warning(
+ "disks: %s not in available disk list, so they will be ignored.",
+ disks,
+ )
+ if not self._detector_name_list:
+ logging.critical("the disks to detection is empty, ai_block_io will exit.")
+ Report.report_pass(
+ "the disks to detection is empty, ai_block_io will exit."
+ )
+ exit(1)
def __init_detector(self):
train_data_duration, train_update_duration = (
@@ -88,26 +115,39 @@ class SlowIODetection:
train_data_duration, train_update_duration, slow_io_detection_frequency
)
sliding_window_type = self._config_parser.sliding_window_type
- window_size, window_threshold = (self._config_parser.get_window_size_and_window_minimum_threshold())
+ window_size, window_threshold = (
+ self._config_parser.get_window_size_and_window_minimum_threshold()
+ )
for disk, metric_name_list in self._detector_name_list.items():
- threshold = ThresholdFactory().get_threshold(
- threshold_type,
- boxplot_parameter=self._config_parser.boxplot_parameter,
- n_sigma_paramter=self._config_parser.n_sigma_parameter,
- data_queue_size=data_queue_size,
- data_queue_update_size=update_size,
- )
- sliding_window = SlidingWindowFactory().get_sliding_window(
- sliding_window_type,
- queue_length=window_size,
- threshold=window_threshold,
- )
disk_detector = DiskDetector(disk)
for metric_name in metric_name_list:
+ threshold = ThresholdFactory().get_threshold(
+ threshold_type,
+ boxplot_parameter=self._config_parser.boxplot_parameter,
+ n_sigma_paramter=self._config_parser.n_sigma_parameter,
+ data_queue_size=data_queue_size,
+ data_queue_update_size=update_size,
+ )
+ abs_threshold = self._config_parser.get_tot_lim(
+ metric_name.disk_type, metric_name.io_access_type_name
+ )
+ if abs_threshold is None:
+ logging.warning(
+ "disk %s, disk type %s, io type %s, get tot lim error, so it will be ignored.",
+ disk,
+ metric_name.disk_type,
+ metric_name.io_access_type_name,
+ )
+ sliding_window = SlidingWindowFactory().get_sliding_window(
+ sliding_window_type,
+ queue_length=window_size,
+ threshold=window_threshold,
+ abs_threshold=abs_threshold,
+ )
detector = Detector(metric_name, threshold, sliding_window)
disk_detector.add_detector(detector)
- logging.info(f'disk: [{disk}] add detector:\n [{disk_detector}]')
+ logging.info(f"disk: [{disk}] add detector:\n [{disk_detector}]")
self._disk_detectors[disk] = disk_detector
def launch(self):
@@ -138,14 +178,17 @@ class SlowIODetection:
logging.debug("step3. Report slow io event to sysSentry.")
for slow_io_event in slow_io_event_list:
metric_name: MetricName = slow_io_event[1]
+ window_info = slow_io_event[2]
+ root_cause = slow_io_event[3]
alarm_content = {
- "driver_name": f"{metric_name.get_disk_name()}",
- "reason": "disk_slow",
- "block_stack": f"{metric_name.get_stage_name()}",
- "io_type": f"{metric_name.get_io_access_type_name()}",
+ "driver_name": f"{metric_name.disk_name}",
+ "reason": root_cause,
+ "block_stack": f"{metric_name.stage_name}",
+ "io_type": f"{metric_name.io_access_type_name}",
"alarm_source": "ai_block_io",
"alarm_type": "latency",
- "details": f"current window is: {slow_io_event[2]}, threshold is: {slow_io_event[3]}.",
+ "details": f"disk type: {metric_name.disk_type}, current window: {window_info[1]}, "
+ f"ai threshold: {window_info[2]}, abs threshold: {window_info[3]}.",
}
Xalarm.major(alarm_content)
logging.warning(alarm_content)
diff --git a/src/python/sentryPlugins/ai_block_io/config_parser.py b/src/python/sentryPlugins/ai_block_io/config_parser.py
index a357766..3388cd4 100644
--- a/src/python/sentryPlugins/ai_block_io/config_parser.py
+++ b/src/python/sentryPlugins/ai_block_io/config_parser.py
@@ -20,59 +20,62 @@ from .utils import get_threshold_type_enum, get_sliding_window_type_enum, get_lo
LOG_FORMAT = "%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s"
-ALL_STAGE_LIST = ['throtl', 'wbt', 'gettag', 'plug', 'deadline', 'hctx', 'requeue', 'rq_driver', 'bio']
-ALL_IOTPYE_LIST = ['read', 'write']
+ALL_STAGE_LIST = [
+ "throtl",
+ "wbt",
+ "gettag",
+ "plug",
+ "deadline",
+ "hctx",
+ "requeue",
+ "rq_driver",
+ "bio",
+]
+ALL_IOTPYE_LIST = ["read", "write"]
+DISK_TYPE_MAP = {
+ 0: "nvme_ssd",
+ 1: "sata_ssd",
+ 2: "sata_hdd",
+}
def init_log_format(log_level: str):
logging.basicConfig(level=get_log_level(log_level.lower()), format=LOG_FORMAT)
if log_level.lower() not in ("info", "warning", "error", "debug"):
logging.warning(
- f"the log_level: {log_level} you set is invalid, use default value: info."
+ "the log_level: %s you set is invalid, use default value: info.", log_level
)
class ConfigParser:
- DEFAULT_ABSOLUTE_THRESHOLD = 40
- DEFAULT_SLOW_IO_DETECTION_FREQUENCY = 1
- DEFAULT_LOG_LEVEL = "info"
-
- DEFAULT_STAGE = 'throtl,wbt,gettag,plug,deadline,hctx,requeue,rq_driver,bio'
- DEFAULT_IOTYPE = 'read,write'
-
- DEFAULT_ALGORITHM_TYPE = "boxplot"
- DEFAULT_TRAIN_DATA_DURATION = 24
- DEFAULT_TRAIN_UPDATE_DURATION = 2
- DEFAULT_BOXPLOT_PARAMETER = 1.5
- DEFAULT_N_SIGMA_PARAMETER = 3
-
- DEFAULT_SLIDING_WINDOW_TYPE = "not_continuous"
- DEFAULT_WINDOW_SIZE = 30
- DEFAULT_WINDOW_MINIMUM_THRESHOLD = 6
+ DEFAULT_CONF = {
+ "log": {"level": "info"},
+ "common": {
+ "slow_io_detect_frequency": 1,
+ "disk": None,
+ "stage": "throtl,wbt,gettag,plug,deadline,hctx,requeue,rq_driver,bio",
+ "iotype": "read,write",
+ },
+ "algorithm": {
+ "train_data_duration": 24.0,
+ "train_update_duration": 2.0,
+ "algorithm_type": get_threshold_type_enum("boxplot"),
+ "boxplot_parameter": 1.5,
+ "n_sigma_parameter": 3.0,
+ },
+ "sliding_window": {
+ "sliding_window_type": get_sliding_window_type_enum("not_continuous"),
+ "window_size": 30,
+ "window_minimum_threshold": 6,
+ },
+ "latency_sata_ssd": {"read_tot_lim": 50000, "write_tot_lim": 50000},
+ "latency_nvme_ssd": {"read_tot_lim": 500, "write_tot_lim": 500},
+ "latency_sata_hdd": {"read_tot_lim": 50000, "write_tot_lim": 50000},
+ }
def __init__(self, config_file_name):
- self.__absolute_threshold = ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD
- self.__slow_io_detect_frequency = (
- ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY
- )
- self.__log_level = ConfigParser.DEFAULT_LOG_LEVEL
- self.__disks_to_detection = None
- self.__stage = ConfigParser.DEFAULT_STAGE
- self.__iotype = ConfigParser.DEFAULT_IOTYPE
-
- self.__algorithm_type = get_threshold_type_enum(
- ConfigParser.DEFAULT_ALGORITHM_TYPE
- )
- self.__train_data_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION
- self.__train_update_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION
- self.__boxplot_parameter = ConfigParser.DEFAULT_BOXPLOT_PARAMETER
- self.__n_sigma_parameter = ConfigParser.DEFAULT_N_SIGMA_PARAMETER
-
- self.__sliding_window_type = ConfigParser.DEFAULT_SLIDING_WINDOW_TYPE
- self.__window_size = ConfigParser.DEFAULT_WINDOW_SIZE
- self.__window_minimum_threshold = ConfigParser.DEFAULT_WINDOW_MINIMUM_THRESHOLD
-
- self.__config_file_name = config_file_name
+ self._conf = ConfigParser.DEFAULT_CONF
+ self._config_file_name = config_file_name
def _get_config_value(
self,
@@ -156,30 +159,21 @@ class ConfigParser:
return value
- def __read_absolute_threshold(self, items_common: dict):
- self.__absolute_threshold = self._get_config_value(
- items_common,
- "absolute_threshold",
- float,
- ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD,
- gt=0,
- )
-
- def __read__slow_io_detect_frequency(self, items_common: dict):
- self.__slow_io_detect_frequency = self._get_config_value(
+ def _read_slow_io_detect_frequency(self, items_common: dict):
+ self._conf["common"]["slow_io_detect_frequency"] = self._get_config_value(
items_common,
"slow_io_detect_frequency",
int,
- ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY,
+ self.DEFAULT_CONF["common"]["slow_io_detect_frequency"],
gt=0,
le=300,
)
- def __read__disks_to_detect(self, items_common: dict):
+ def _read_disks_to_detect(self, items_common: dict):
disks_to_detection = items_common.get("disk")
if disks_to_detection is None:
logging.warning("config of disk not found, the default value will be used.")
- self.__disks_to_detection = None
+ self._conf["common"]["disk"] = None
return
disks_to_detection = disks_to_detection.strip()
if not disks_to_detection:
@@ -189,40 +183,46 @@ class ConfigParser:
)
exit(1)
disk_list = disks_to_detection.split(",")
+ disk_list = [disk.strip() for disk in disk_list]
if len(disk_list) == 1 and disk_list[0] == "default":
- self.__disks_to_detection = None
+ self._conf["common"]["disk"] = None
return
- self.__disks_to_detection = disk_list
+ self._conf["common"]["disk"] = disk_list
- def __read__train_data_duration(self, items_algorithm: dict):
- self.__train_data_duration = self._get_config_value(
+ def _read_train_data_duration(self, items_algorithm: dict):
+ self._conf["common"]["train_data_duration"] = self._get_config_value(
items_algorithm,
"train_data_duration",
float,
- ConfigParser.DEFAULT_TRAIN_DATA_DURATION,
+ self.DEFAULT_CONF["algorithm"]["train_data_duration"],
gt=0,
le=720,
)
- def __read__train_update_duration(self, items_algorithm: dict):
- default_train_update_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION
- if default_train_update_duration > self.__train_data_duration:
- default_train_update_duration = self.__train_data_duration / 2
- self.__train_update_duration = self._get_config_value(
+ def _read_train_update_duration(self, items_algorithm: dict):
+ default_train_update_duration = self.DEFAULT_CONF["algorithm"][
+ "train_update_duration"
+ ]
+ if default_train_update_duration > self._conf["common"]["train_data_duration"]:
+ default_train_update_duration = (
+ self._conf["common"]["train_data_duration"] / 2
+ )
+ self._conf["common"]["train_update_duration"] = self._get_config_value(
items_algorithm,
"train_update_duration",
float,
default_train_update_duration,
gt=0,
- le=self.__train_data_duration,
+ le=self._conf["common"]["train_data_duration"],
)
- def __read__algorithm_type_and_parameter(self, items_algorithm: dict):
- algorithm_type = items_algorithm.get(
- "algorithm_type", ConfigParser.DEFAULT_ALGORITHM_TYPE
- )
- self.__algorithm_type = get_threshold_type_enum(algorithm_type)
- if self.__algorithm_type is None:
+ def _read_algorithm_type_and_parameter(self, items_algorithm: dict):
+ algorithm_type = items_algorithm.get("algorithm_type")
+ if algorithm_type is not None:
+ self._conf["algorithm"]["algorithm_type"] = get_threshold_type_enum(
+ algorithm_type
+ )
+ if self._conf["algorithm"]["algorithm_type"] is None:
logging.critical(
"the algorithm_type: %s you set is invalid. ai_block_io plug will exit.",
algorithm_type,
@@ -231,129 +231,175 @@ class ConfigParser:
f"the algorithm_type: {algorithm_type} you set is invalid. ai_block_io plug will exit."
)
exit(1)
-
- if self.__algorithm_type == ThresholdType.NSigmaThreshold:
- self.__n_sigma_parameter = self._get_config_value(
+ elif self._conf["algorithm"]["algorithm_type"] == ThresholdType.NSigmaThreshold:
+ self._conf["algorithm"]["n_sigma_parameter"] = self._get_config_value(
items_algorithm,
"n_sigma_parameter",
float,
- ConfigParser.DEFAULT_N_SIGMA_PARAMETER,
+ self.DEFAULT_CONF["algorithm"]["n_sigma_parameter"],
gt=0,
le=10,
)
- elif self.__algorithm_type == ThresholdType.BoxplotThreshold:
- self.__boxplot_parameter = self._get_config_value(
+ elif (
+ self._conf["algorithm"]["algorithm_type"] == ThresholdType.BoxplotThreshold
+ ):
+ self._conf["algorithm"]["boxplot_parameter"] = self._get_config_value(
items_algorithm,
"boxplot_parameter",
float,
- ConfigParser.DEFAULT_BOXPLOT_PARAMETER,
+ self.DEFAULT_CONF["algorithm"]["boxplot_parameter"],
gt=0,
le=10,
)
- def __read__stage(self, items_algorithm: dict):
- stage_str = items_algorithm.get('stage', ConfigParser.DEFAULT_STAGE)
- stage_list = stage_str.split(',')
- if len(stage_list) == 1 and stage_list[0] == '':
- logging.critical('stage value not allow is empty, exiting...')
+ def _read_stage(self, items_algorithm: dict):
+ stage_str = items_algorithm.get(
+ "stage", self.DEFAULT_CONF["common"]["stage"]
+ ).strip()
+ stage_list = stage_str.split(",")
+ stage_list = [stage.strip() for stage in stage_list]
+ if len(stage_list) == 1 and stage_list[0] == "":
+ logging.critical("stage value not allow is empty, exiting...")
exit(1)
- if len(stage_list) == 1 and stage_list[0] == 'default':
- logging.warning(f'stage will enable default value: {ConfigParser.DEFAULT_STAGE}')
- self.__stage = ALL_STAGE_LIST
+ if len(stage_list) == 1 and stage_list[0] == "default":
+ logging.warning(
+ "stage will enable default value: %s",
+ self.DEFAULT_CONF["common"]["stage"],
+ )
+ self._conf["common"]["stage"] = ALL_STAGE_LIST
return
for stage in stage_list:
if stage not in ALL_STAGE_LIST:
- logging.critical(f'stage: {stage} is not valid stage, ai_block_io will exit...')
+ logging.critical(
+ "stage: %s is not valid stage, ai_block_io will exit...", stage
+ )
exit(1)
dup_stage_list = set(stage_list)
- if 'bio' not in dup_stage_list:
- logging.critical('stage must contains bio stage, exiting...')
+ if "bio" not in dup_stage_list:
+ logging.critical("stage must contains bio stage, exiting...")
exit(1)
- self.__stage = dup_stage_list
-
- def __read__iotype(self, items_algorithm: dict):
- iotype_str = items_algorithm.get('iotype', ConfigParser.DEFAULT_IOTYPE)
- iotype_list = iotype_str.split(',')
- if len(iotype_list) == 1 and iotype_list[0] == '':
- logging.critical('iotype value not allow is empty, exiting...')
+ self._conf["common"]["stage"] = dup_stage_list
+
+ def _read_iotype(self, items_algorithm: dict):
+ iotype_str = items_algorithm.get(
+ "iotype", self.DEFAULT_CONF["common"]["iotype"]
+ ).strip()
+ iotype_list = iotype_str.split(",")
+ iotype_list = [iotype.strip() for iotype in iotype_list]
+ if len(iotype_list) == 1 and iotype_list[0] == "":
+ logging.critical("iotype value not allow is empty, exiting...")
exit(1)
- if len(iotype_list) == 1 and iotype_list[0] == 'default':
- logging.warning(f'iotype will enable default value: {ConfigParser.DEFAULT_IOTYPE}')
- self.__iotype = ALL_IOTPYE_LIST
+ if len(iotype_list) == 1 and iotype_list[0] == "default":
+ logging.warning(
+ "iotype will enable default value: %s",
+ self.DEFAULT_CONF["common"]["iotype"],
+ )
+ self._conf["common"]["iotype"] = ALL_IOTPYE_LIST
return
for iotype in iotype_list:
if iotype not in ALL_IOTPYE_LIST:
- logging.critical(f'iotype: {iotype} is not valid iotype, ai_block_io will exit...')
+ logging.critical(
+ "iotype: %s is not valid iotype, ai_block_io will exit...", iotype
+ )
exit(1)
dup_iotype_list = set(iotype_list)
- self.__iotype = dup_iotype_list
+ self._conf["common"]["iotype"] = dup_iotype_list
+
+ def _read_sliding_window_type(self, items_sliding_window: dict):
+ sliding_window_type = items_sliding_window.get("sliding_window_type")
+ if sliding_window_type is not None:
+ self._conf["sliding_window"]["sliding_window_type"] = (
+ get_sliding_window_type_enum(sliding_window_type)
+ )
+ if self._conf["sliding_window"]["sliding_window_type"] is None:
+ logging.critical(
+ "the sliding_window_type: %s you set is invalid. ai_block_io plug will exit.",
+ sliding_window_type,
+ )
+ Report.report_pass(
+ f"the sliding_window_type: {sliding_window_type} you set is invalid. ai_block_io plug will exit."
+ )
+ exit(1)
- def __read__window_size(self, items_sliding_window: dict):
- self.__window_size = self._get_config_value(
+ def _read_window_size(self, items_sliding_window: dict):
+ self._conf["sliding_window"]["window_size"] = self._get_config_value(
items_sliding_window,
"window_size",
int,
- ConfigParser.DEFAULT_WINDOW_SIZE,
+ self.DEFAULT_CONF["sliding_window"]["window_size"],
gt=0,
le=3600,
)
- def __read__window_minimum_threshold(self, items_sliding_window: dict):
- default_window_minimum_threshold = ConfigParser.DEFAULT_WINDOW_MINIMUM_THRESHOLD
- if default_window_minimum_threshold > self.__window_size:
- default_window_minimum_threshold = self.__window_size / 2
- self.__window_minimum_threshold = self._get_config_value(
- items_sliding_window,
- "window_minimum_threshold",
- int,
- default_window_minimum_threshold,
- gt=0,
- le=self.__window_size,
+ def _read_window_minimum_threshold(self, items_sliding_window: dict):
+ default_window_minimum_threshold = self.DEFAULT_CONF["sliding_window"][
+ "window_minimum_threshold"
+ ]
+ if (
+ default_window_minimum_threshold
+ > self._conf["sliding_window"]["window_size"]
+ ):
+ default_window_minimum_threshold = (
+ self._conf["sliding_window"]["window_size"] / 2
+ )
+ self._conf["sliding_window"]["window_minimum_threshold"] = (
+ self._get_config_value(
+ items_sliding_window,
+ "window_minimum_threshold",
+ int,
+ default_window_minimum_threshold,
+ gt=0,
+ le=self._conf["sliding_window"]["window_size"],
+ )
)
def read_config_from_file(self):
- if not os.path.exists(self.__config_file_name):
- init_log_format(self.__log_level)
+ if not os.path.exists(self._config_file_name):
+ init_log_format(self._conf["log"]["level"])
logging.critical(
"config file %s not found, ai_block_io plug will exit.",
- self.__config_file_name,
+ self._config_file_name,
)
Report.report_pass(
- f"config file {self.__config_file_name} not found, ai_block_io plug will exit."
+ f"config file {self._config_file_name} not found, ai_block_io plug will exit."
)
exit(1)
con = configparser.ConfigParser()
try:
- con.read(self.__config_file_name, encoding="utf-8")
+ con.read(self._config_file_name, encoding="utf-8")
except configparser.Error as e:
- init_log_format(self.__log_level)
+ init_log_format(self._conf["log"]["level"])
logging.critical(
- f"config file read error: %s, ai_block_io plug will exit.", e
+ "config file read error: %s, ai_block_io plug will exit.", e
)
Report.report_pass(
f"config file read error: {e}, ai_block_io plug will exit."
)
exit(1)
- if con.has_section('log'):
- items_log = dict(con.items('log'))
+ if con.has_section("log"):
+ items_log = dict(con.items("log"))
# 情况一没有log则使用默认值
# 情况二有log值为空或异常使用默认值
# 情况三有log值正常则使用该值
- self.__log_level = items_log.get('level', ConfigParser.DEFAULT_LOG_LEVEL)
- init_log_format(self.__log_level)
+ self._conf["log"]["level"] = items_log.get(
+ "level", self.DEFAULT_CONF["log"]["level"]
+ )
+ init_log_format(self._conf["log"]["level"])
else:
- init_log_format(self.__log_level)
- logging.warning(f"log section parameter not found, it will be set to default value.")
+ init_log_format(self._conf["log"]["level"])
+ logging.warning(
+ "log section parameter not found, it will be set to default value."
+ )
if con.has_section("common"):
items_common = dict(con.items("common"))
- self.__read_absolute_threshold(items_common)
- self.__read__slow_io_detect_frequency(items_common)
- self.__read__disks_to_detect(items_common)
- self.__read__stage(items_common)
- self.__read__iotype(items_common)
+
+ self._read_slow_io_detect_frequency(items_common)
+ self._read_disks_to_detect(items_common)
+ self._read_stage(items_common)
+ self._read_iotype(items_common)
else:
logging.warning(
"common section parameter not found, it will be set to default value."
@@ -361,9 +407,9 @@ class ConfigParser:
if con.has_section("algorithm"):
items_algorithm = dict(con.items("algorithm"))
- self.__read__train_data_duration(items_algorithm)
- self.__read__train_update_duration(items_algorithm)
- self.__read__algorithm_type_and_parameter(items_algorithm)
+ self._read_train_data_duration(items_algorithm)
+ self._read_train_update_duration(items_algorithm)
+ self._read_algorithm_type_and_parameter(items_algorithm)
else:
logging.warning(
"algorithm section parameter not found, it will be set to default value."
@@ -371,101 +417,162 @@ class ConfigParser:
if con.has_section("sliding_window"):
items_sliding_window = dict(con.items("sliding_window"))
- sliding_window_type = items_sliding_window.get(
- "sliding_window_type", ConfigParser.DEFAULT_SLIDING_WINDOW_TYPE
+
+ self._read_window_size(items_sliding_window)
+ self._read_window_minimum_threshold(items_sliding_window)
+ else:
+ logging.warning(
+ "sliding_window section parameter not found, it will be set to default value."
+ )
+
+ if con.has_section("latency_sata_ssd"):
+ items_latency_sata_ssd = dict(con.items("latency_sata_ssd"))
+ self._conf["latency_sata_ssd"]["read_tot_lim"] = self._get_config_value(
+ items_latency_sata_ssd,
+ "read_tot_lim",
+ int,
+ self.DEFAULT_CONF["latency_sata_ssd"]["read_tot_lim"],
+ gt=0,
)
- self.__sliding_window_type = get_sliding_window_type_enum(
- sliding_window_type
+ self._conf["latency_sata_ssd"]["write_tot_lim"] = self._get_config_value(
+ items_latency_sata_ssd,
+ "write_tot_lim",
+ int,
+ self.DEFAULT_CONF["latency_sata_ssd"]["write_tot_lim"],
+ gt=0,
)
- self.__read__window_size(items_sliding_window)
- self.__read__window_minimum_threshold(items_sliding_window)
else:
logging.warning(
- "sliding_window section parameter not found, it will be set to default value."
+ "latency_sata_ssd section parameter not found, it will be set to default value."
+ )
+ if con.has_section("latency_nvme_ssd"):
+ items_latency_nvme_ssd = dict(con.items("latency_nvme_ssd"))
+ self._conf["latency_nvme_ssd"]["read_tot_lim"] = self._get_config_value(
+ items_latency_nvme_ssd,
+ "read_tot_lim",
+ int,
+ self.DEFAULT_CONF["latency_nvme_ssd"]["read_tot_lim"],
+ gt=0,
+ )
+ self._conf["latency_nvme_ssd"]["write_tot_lim"] = self._get_config_value(
+ items_latency_nvme_ssd,
+ "write_tot_lim",
+ int,
+ self.DEFAULT_CONF["latency_nvme_ssd"]["write_tot_lim"],
+ gt=0,
+ )
+ else:
+ logging.warning(
+ "latency_nvme_ssd section parameter not found, it will be set to default value."
+ )
+ if con.has_section("latency_sata_hdd"):
+ items_latency_sata_hdd = dict(con.items("latency_sata_hdd"))
+ self._conf["latency_sata_hdd"]["read_tot_lim"] = self._get_config_value(
+ items_latency_sata_hdd,
+ "read_tot_lim",
+ int,
+ self.DEFAULT_CONF["latency_sata_hdd"]["read_tot_lim"],
+ gt=0,
+ )
+ self._conf["latency_sata_hdd"]["write_tot_lim"] = self._get_config_value(
+ items_latency_sata_hdd,
+ "write_tot_lim",
+ int,
+ self.DEFAULT_CONF["latency_sata_hdd"]["write_tot_lim"],
+ gt=0,
+ )
+ else:
+ logging.warning(
+ "latency_sata_hdd section parameter not found, it will be set to default value."
)
self.__print_all_config_value()
- def __repr__(self):
- config_str = {
- 'log.level': self.__log_level,
- 'common.absolute_threshold': self.__absolute_threshold,
- 'common.slow_io_detect_frequency': self.__slow_io_detect_frequency,
- 'common.disk': self.__disks_to_detection,
- 'common.stage': self.__stage,
- 'common.iotype': self.__iotype,
- 'algorithm.train_data_duration': self.__train_data_duration,
- 'algorithm.train_update_duration': self.__train_update_duration,
- 'algorithm.algorithm_type': self.__algorithm_type,
- 'algorithm.boxplot_parameter': self.__boxplot_parameter,
- 'algorithm.n_sigma_parameter': self.__n_sigma_parameter,
- 'sliding_window.sliding_window_type': self.__sliding_window_type,
- 'sliding_window.window_size': self.__window_size,
- 'sliding_window.window_minimum_threshold': self.__window_minimum_threshold
- }
- return str(config_str)
+ def __repr__(self) -> str:
+ return str(self._conf)
+
+ def __str__(self) -> str:
+ return str(self._conf)
def __print_all_config_value(self):
- logging.info(f"all config is follow:\n {self}")
+ logging.info("all config is follow:\n %s", self)
+
+ def get_tot_lim(self, disk_type, io_type):
+ if io_type == "read":
+ return self._conf.get(
+ f"latency_{DISK_TYPE_MAP.get(disk_type, '')}", {}
+ ).get("read_tot_lim", None)
+ elif io_type == "write":
+ return self._conf.get(
+ f"latency_{DISK_TYPE_MAP.get(disk_type, '')}", {}
+ ).get("write_tot_lim", None)
+ else:
+ return None
def get_train_data_duration_and_train_update_duration(self):
- return self.__train_data_duration, self.__train_update_duration
+ return (
+ self._conf["common"]["train_data_duration"],
+ self._conf["common"]["train_update_duration"],
+ )
def get_window_size_and_window_minimum_threshold(self):
- return self.__window_size, self.__window_minimum_threshold
+ return (
+ self._conf["sliding_window"]["window_size"],
+ self._conf["sliding_window"]["window_minimum_threshold"],
+ )
@property
def slow_io_detect_frequency(self):
- return self.__slow_io_detect_frequency
+ return self._conf["common"]["slow_io_detect_frequency"]
@property
def algorithm_type(self):
- return self.__algorithm_type
+ return self._conf["algorithm"]["algorithm_type"]
@property
def sliding_window_type(self):
- return self.__sliding_window_type
+ return self._conf["sliding_window"]["sliding_window_type"]
@property
def train_data_duration(self):
- return self.__train_data_duration
+ return self._conf["common"]["train_data_duration"]
@property
def train_update_duration(self):
- return self.__train_update_duration
+ return self._conf["common"]["train_update_duration"]
@property
def window_size(self):
- return self.__window_size
+ return self._conf["sliding_window"]["window_size"]
@property
def window_minimum_threshold(self):
- return self.__window_minimum_threshold
+ return self._conf["sliding_window"]["window_minimum_threshold"]
@property
def absolute_threshold(self):
- return self.__absolute_threshold
+ return self._conf["common"]["absolute_threshold"]
@property
def log_level(self):
- return self.__log_level
+ return self._conf["log"]["level"]
@property
def disks_to_detection(self):
- return self.__disks_to_detection
+ return self._conf["common"]["disk"]
@property
def stage(self):
- return self.__stage
+ return self._conf["common"]["stage"]
@property
def iotype(self):
- return self.__iotype
+ return self._conf["common"]["iotype"]
@property
def boxplot_parameter(self):
- return self.__boxplot_parameter
+ return self._conf["algorithm"]["boxplot_parameter"]
@property
def n_sigma_parameter(self):
- return self.__n_sigma_parameter
+ return self._conf["algorithm"]["n_sigma_parameter"]
diff --git a/src/python/sentryPlugins/ai_block_io/data_access.py b/src/python/sentryPlugins/ai_block_io/data_access.py
index ed997e6..1bc5ed8 100644
--- a/src/python/sentryPlugins/ai_block_io/data_access.py
+++ b/src/python/sentryPlugins/ai_block_io/data_access.py
@@ -16,6 +16,7 @@ from sentryCollector.collect_plugin import (
Result_Messages,
get_io_data,
is_iocollect_valid,
+ get_disk_type
)
diff --git a/src/python/sentryPlugins/ai_block_io/detector.py b/src/python/sentryPlugins/ai_block_io/detector.py
index e710ddd..87bd1dd 100644
--- a/src/python/sentryPlugins/ai_block_io/detector.py
+++ b/src/python/sentryPlugins/ai_block_io/detector.py
@@ -17,9 +17,6 @@ from .utils import get_metric_value_from_io_data_dict_by_metric_name
class Detector:
- _metric_name: MetricName = None
- _threshold: Threshold = None
- _slidingWindow: SlidingWindow = None
def __init__(self, metric_name: MetricName, threshold: Threshold, sliding_window: SlidingWindow):
self._metric_name = metric_name
@@ -40,18 +37,24 @@ class Detector:
metric_value = get_metric_value_from_io_data_dict_by_metric_name(io_data_dict_with_disk_name, self._metric_name)
if metric_value is None:
logging.debug('not found metric value, so return None.')
- return False, None, None
+ return (False, False), None, None, None
logging.debug(f'input metric value: {str(metric_value)}')
self._threshold.push_latest_data_to_queue(metric_value)
detection_result = self._slidingWindow.is_slow_io_event(metric_value)
- logging.debug(f'Detection result: {str(detection_result)}')
+ # 检测到慢周期由Detector负责打印info级别日志
+ if detection_result[0][1]:
+ logging.info(f'[abnormal period happen]: disk info: {self._metric_name}, window: {detection_result[1]}, '
+ f'current value: {metric_value}, ai threshold: {detection_result[2]}, '
+ f'absolute threshold: {detection_result[3]}')
+ else:
+ logging.debug(f'Detection result: {str(detection_result)}')
logging.debug(f'exit Detector: {self}')
return detection_result
def __repr__(self):
- return (f'disk_name: {self._metric_name.get_disk_name()}, stage_name: {self._metric_name.get_stage_name()},'
- f' io_type_name: {self._metric_name.get_io_access_type_name()},'
- f' metric_name: {self._metric_name.get_metric_name()}, threshold_type: {self._threshold},'
+ return (f'disk_name: {self._metric_name.disk_name}, stage_name: {self._metric_name.stage_name},'
+ f' io_type_name: {self._metric_name.io_access_type_name},'
+ f' metric_name: {self._metric_name.metric_name}, threshold_type: {self._threshold},'
f' sliding_window_type: {self._slidingWindow}')
@@ -65,13 +68,38 @@ class DiskDetector:
self._detector_list.append(detector)
def is_slow_io_event(self, io_data_dict_with_disk_name: dict):
- # 只有bio阶段发生异常就认为发生了慢IO事件
- # todo根因诊断
+ """
+ 根因诊断逻辑只有bio阶段发生异常才认为发生了慢IO事件即bio阶段异常是慢IO事件的必要条件
+ 情况一bio异常rq_driver也异常则慢盘
+ 情况二bio异常rq_driver无异常且有内核IO栈任意阶段异常则IO栈异常
+ 情况三bio异常rq_driver无异常且无内核IO栈任意阶段异常则IO压力大
+ 情况四bio异常则UNKNOWN
+ """
+ diagnosis_info = {"bio": [], "rq_driver": [], "io_stage": []}
for detector in self._detector_list:
+ # result返回内容(是否检测到慢IO是否检测到慢周期)、窗口、ai阈值、绝对阈值
+ # 示例: (False, False), self._io_data_queue, self._ai_threshold, self._abs_threshold
result = detector.is_slow_io_event(io_data_dict_with_disk_name)
- if result[0] and detector.get_metric_name().get_stage_name() == 'bio':
- return result[0], detector.get_metric_name(), result[1], result[2]
- return False, None, None, None
+ if result[0][0]:
+ if detector.get_metric_name().stage_name == "bio":
+ diagnosis_info["bio"].append((detector.get_metric_name(), result))
+ elif detector.get_metric_name().stage_name == "rq_driver":
+ diagnosis_info["rq_driver"].append((detector.get_metric_name(), result))
+ else:
+ diagnosis_info["io_stage"].append((detector.get_metric_name(), result))
+
+ # 返回内容1是否检测到慢IO事件、2MetricName、3滑动窗口及阈值、4慢IO事件根因
+ root_cause = None
+ if len(diagnosis_info["bio"]) == 0:
+ return False, None, None, None
+ elif len(diagnosis_info["rq_driver"]) != 0:
+ root_cause = "[Root Causedisk slow]"
+ elif len(diagnosis_info["io_stage"]) != 0:
+ stage = diagnosis_info["io_stage"][0][1].get_stage_name()
+ root_cause = f"[Root Causeio stage slow, stage: {stage}]"
+ if root_cause is None:
+ root_cause = "[Root Causehigh io pressure]"
+ return True, diagnosis_info["bio"][0][0], diagnosis_info["bio"][0][1], root_cause
def __repr__(self):
msg = f'disk: {self._disk_name}, '
diff --git a/src/python/sentryPlugins/ai_block_io/io_data.py b/src/python/sentryPlugins/ai_block_io/io_data.py
index 0e17051..d341b55 100644
--- a/src/python/sentryPlugins/ai_block_io/io_data.py
+++ b/src/python/sentryPlugins/ai_block_io/io_data.py
@@ -45,30 +45,10 @@ class IOData:
time_stamp: float = field(default_factory=lambda: datetime.now().timestamp())
+@dataclass(frozen=True)
class MetricName:
- _disk_name: str = None
- _stage_name: str = None
- _io_access_type_name: str = None
- _metric_name: str = None
-
- def __init__(self, disk_name: str, stage_name: str, io_access_type_name: str, metric_name: str):
- self._disk_name = disk_name
- self._stage_name = stage_name
- self._io_access_type_name = io_access_type_name
- self._metric_name = metric_name
-
- def get_disk_name(self):
- return self._disk_name
-
- def get_stage_name(self):
- return self._stage_name
-
- def get_io_access_type_name(self):
- return self._io_access_type_name
-
- def get_metric_name(self):
- return self._metric_name
-
- def __repr__(self):
- return (f'disk: {self._disk_name}, stage: {self._stage_name}, io_access_type: {self._io_access_type_name},'
- f'metric: {self._metric_name}')
+ disk_name: str
+ disk_type: str
+ stage_name: str
+ io_access_type_name: str
+ metric_name: str
diff --git a/src/python/sentryPlugins/ai_block_io/sliding_window.py b/src/python/sentryPlugins/ai_block_io/sliding_window.py
index 89191e5..d7c402a 100644
--- a/src/python/sentryPlugins/ai_block_io/sliding_window.py
+++ b/src/python/sentryPlugins/ai_block_io/sliding_window.py
@@ -21,15 +21,11 @@ class SlidingWindowType(Enum):
class SlidingWindow:
- _ai_threshold = None
- _queue_length = None
- _queue_threshold = None
- _io_data_queue: list = None
- _io_data_queue_abnormal_tag: list = None
-
- def __init__(self, queue_length: int, threshold: int):
+ def __init__(self, queue_length: int, threshold: int, abs_threshold: int = None):
self._queue_length = queue_length
self._queue_threshold = threshold
+ self._ai_threshold = None
+ self._abs_threshold = abs_threshold
self._io_data_queue = []
self._io_data_queue_abnormal_tag = []
@@ -38,7 +34,12 @@ class SlidingWindow:
self._io_data_queue.pop(0)
self._io_data_queue_abnormal_tag.pop(0)
self._io_data_queue.append(data)
- self._io_data_queue_abnormal_tag.append(data >= self._ai_threshold if self._ai_threshold is not None else False)
+ tag = False
+ if ((self._ai_threshold is not None and data >= self._ai_threshold) or
+ (self._abs_threshold is not None and data >= self._abs_threshold)):
+ tag = True
+ self._io_data_queue_abnormal_tag.append(tag)
+ return tag
def update(self, threshold):
if self._ai_threshold == threshold:
@@ -49,7 +50,7 @@ class SlidingWindow:
self._io_data_queue_abnormal_tag.append(data >= self._ai_threshold)
def is_slow_io_event(self, data):
- return False, None, None
+ return False, None, None, None
def __repr__(self):
return "[SlidingWindow]"
@@ -57,12 +58,13 @@ class SlidingWindow:
class NotContinuousSlidingWindow(SlidingWindow):
def is_slow_io_event(self, data):
- super().push(data)
- if len(self._io_data_queue) < self._queue_length or self._ai_threshold is None:
- return False, self._io_data_queue, self._ai_threshold
+ is_abnormal_period = super().push(data)
+ is_slow_io_event = False
+ if len(self._io_data_queue) < self._queue_length or (self._ai_threshold is None and self._abs_threshold is None):
+ is_slow_io_event = False
if self._io_data_queue_abnormal_tag.count(True) >= self._queue_threshold:
- return True, self._io_data_queue, self._ai_threshold
- return False, self._io_data_queue, self._ai_threshold
+ is_slow_io_event = True
+ return (is_slow_io_event, is_abnormal_period), self._io_data_queue, self._ai_threshold, self._abs_threshold
def __repr__(self):
return f"[NotContinuousSlidingWindow, window size: {self._queue_length}, threshold: {self._queue_threshold}]"
@@ -70,18 +72,20 @@ class NotContinuousSlidingWindow(SlidingWindow):
class ContinuousSlidingWindow(SlidingWindow):
def is_slow_io_event(self, data):
- super().push(data)
- if len(self._io_data_queue) < self._queue_length or self._ai_threshold is None:
- return False, self._io_data_queue, self._ai_threshold
+ is_abnormal_period = super().push(data)
+ is_slow_io_event = False
+ if len(self._io_data_queue) < self._queue_length or (self._ai_threshold is None and self._abs_threshold is None):
+ is_slow_io_event = False
consecutive_count = 0
for tag in self._io_data_queue_abnormal_tag:
if tag:
consecutive_count += 1
if consecutive_count >= self._queue_threshold:
- return True, self._io_data_queue, self._ai_threshold
+ is_slow_io_event = True
+ break
else:
consecutive_count = 0
- return False, self._io_data_queue, self._ai_threshold
+ return (is_slow_io_event, is_abnormal_period), self._io_data_queue, self._ai_threshold, self._abs_threshold
def __repr__(self):
return f"[ContinuousSlidingWindow, window size: {self._queue_length}, threshold: {self._queue_threshold}]"
@@ -89,20 +93,23 @@ class ContinuousSlidingWindow(SlidingWindow):
class MedianSlidingWindow(SlidingWindow):
def is_slow_io_event(self, data):
- super().push(data)
- if len(self._io_data_queue) < self._queue_length or self._ai_threshold is None:
- return False, self._io_data_queue, self._ai_threshold
+ is_abnormal_period = super().push(data)
+ is_slow_io_event = False
+ if len(self._io_data_queue) < self._queue_length or (self._ai_threshold is None and self._abs_threshold is None):
+ is_slow_io_event = False
median = np.median(self._io_data_queue)
if median >= self._ai_threshold:
- return True, self._io_data_queue, self._ai_threshold
- return False, self._io_data_queue, self._ai_threshold
+ is_slow_io_event = True
+ return (is_slow_io_event, is_abnormal_period), self._io_data_queue, self._ai_threshold, self._abs_threshold
def __repr__(self):
return f"[MedianSlidingWindow, window size: {self._queue_length}]"
class SlidingWindowFactory:
- def get_sliding_window(self, sliding_window_type: SlidingWindowType, *args, **kwargs):
+ def get_sliding_window(
+ self, sliding_window_type: SlidingWindowType, *args, **kwargs
+ ):
if sliding_window_type == SlidingWindowType.NotContinuousSlidingWindow:
return NotContinuousSlidingWindow(*args, **kwargs)
elif sliding_window_type == SlidingWindowType.ContinuousSlidingWindow:
diff --git a/src/python/sentryPlugins/ai_block_io/utils.py b/src/python/sentryPlugins/ai_block_io/utils.py
index 0ed37b9..d6f4067 100644
--- a/src/python/sentryPlugins/ai_block_io/utils.py
+++ b/src/python/sentryPlugins/ai_block_io/utils.py
@@ -19,53 +19,57 @@ from .io_data import MetricName, IOData
def get_threshold_type_enum(algorithm_type: str):
- if algorithm_type.lower() == 'absolute':
+ if algorithm_type.lower() == "absolute":
return ThresholdType.AbsoluteThreshold
- if algorithm_type.lower() == 'boxplot':
+ if algorithm_type.lower() == "boxplot":
return ThresholdType.BoxplotThreshold
- if algorithm_type.lower() == 'n_sigma':
+ if algorithm_type.lower() == "n_sigma":
return ThresholdType.NSigmaThreshold
return None
def get_sliding_window_type_enum(sliding_window_type: str):
- if sliding_window_type.lower() == 'not_continuous':
+ if sliding_window_type.lower() == "not_continuous":
return SlidingWindowType.NotContinuousSlidingWindow
- if sliding_window_type.lower() == 'continuous':
+ if sliding_window_type.lower() == "continuous":
return SlidingWindowType.ContinuousSlidingWindow
- if sliding_window_type.lower() == 'median':
+ if sliding_window_type.lower() == "median":
return SlidingWindowType.MedianSlidingWindow
- logging.warning(f"the sliding window type: {sliding_window_type} you set is invalid, use default value: not_continuous")
- return SlidingWindowType.NotContinuousSlidingWindow
+ return None
-def get_metric_value_from_io_data_dict_by_metric_name(io_data_dict: dict, metric_name: MetricName):
+def get_metric_value_from_io_data_dict_by_metric_name(
+ io_data_dict: dict, metric_name: MetricName
+):
try:
- io_data: IOData = io_data_dict[metric_name.get_disk_name()]
- io_stage_data = asdict(io_data)[metric_name.get_stage_name()]
- base_data = io_stage_data[metric_name.get_io_access_type_name()]
- metric_value = base_data[metric_name.get_metric_name()]
+ io_data: IOData = io_data_dict[metric_name.disk_name]
+ io_stage_data = asdict(io_data)[metric_name.stage_name]
+ base_data = io_stage_data[metric_name.io_access_type_name]
+ metric_value = base_data[metric_name.metric_name]
return metric_value
except KeyError:
return None
-def get_data_queue_size_and_update_size(training_data_duration: float, train_update_duration: float,
- slow_io_detect_frequency: int):
+def get_data_queue_size_and_update_size(
+ training_data_duration: float,
+ train_update_duration: float,
+ slow_io_detect_frequency: int,
+):
data_queue_size = int(training_data_duration * 60 * 60 / slow_io_detect_frequency)
update_size = int(train_update_duration * 60 * 60 / slow_io_detect_frequency)
return data_queue_size, update_size
def get_log_level(log_level: str):
- if log_level.lower() == 'debug':
+ if log_level.lower() == "debug":
return logging.DEBUG
- elif log_level.lower() == 'info':
+ elif log_level.lower() == "info":
return logging.INFO
- elif log_level.lower() == 'warning':
+ elif log_level.lower() == "warning":
return logging.WARNING
- elif log_level.lower() == 'error':
+ elif log_level.lower() == "error":
return logging.ERROR
- elif log_level.lower() == 'critical':
+ elif log_level.lower() == "critical":
return logging.CRITICAL
return logging.INFO
--
2.23.0