sysSentry/add-root-cause-analysis.patch

1254 lines
55 KiB
Diff
Raw Normal View History

From 24f8eddad364e83cfc5b6b1607462ffe524b59f1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E8=B4=BA=E6=9C=89=E5=BF=97?= <1037617413@qq.com>
Date: Sat, 12 Oct 2024 21:59:18 +0800
Subject: [PATCH] add root cause analysis
---
config/plugins/ai_block_io.ini | 15 +-
.../sentryPlugins/ai_block_io/ai_block_io.py | 133 +++--
.../ai_block_io/config_parser.py | 465 +++++++++++-------
.../sentryPlugins/ai_block_io/data_access.py | 1 +
.../sentryPlugins/ai_block_io/detector.py | 54 +-
.../sentryPlugins/ai_block_io/io_data.py | 32 +-
.../ai_block_io/sliding_window.py | 57 ++-
src/python/sentryPlugins/ai_block_io/utils.py | 44 +-
8 files changed, 491 insertions(+), 310 deletions(-)
diff --git a/config/plugins/ai_block_io.ini b/config/plugins/ai_block_io.ini
index a814d52..422cfa3 100644
--- a/config/plugins/ai_block_io.ini
+++ b/config/plugins/ai_block_io.ini
@@ -2,7 +2,6 @@
level=info
[common]
-absolute_threshold=40
slow_io_detect_frequency=1
disk=default
stage=bio
@@ -18,4 +17,16 @@ n_sigma_parameter=3
[sliding_window]
sliding_window_type=not_continuous
window_size=30
-window_minimum_threshold=6
\ No newline at end of file
+window_minimum_threshold=6
+
+[latency_sata_ssd]
+read_tot_lim=50000
+write_tot_lim=50000
+
+[latency_nvme_ssd]
+read_tot_lim=500
+write_tot_lim=500
+
+[latency_sata_hdd]
+read_tot_lim=50000
+write_tot_lim=50000
\ No newline at end of file
diff --git a/src/python/sentryPlugins/ai_block_io/ai_block_io.py b/src/python/sentryPlugins/ai_block_io/ai_block_io.py
index e1052ec..dd661a1 100644
--- a/src/python/sentryPlugins/ai_block_io/ai_block_io.py
+++ b/src/python/sentryPlugins/ai_block_io/ai_block_io.py
@@ -12,13 +12,18 @@
import time
import signal
import logging
+from collections import defaultdict
from .detector import Detector, DiskDetector
-from .threshold import ThresholdFactory, AbsoluteThreshold
+from .threshold import ThresholdFactory
from .sliding_window import SlidingWindowFactory
from .utils import get_data_queue_size_and_update_size
from .config_parser import ConfigParser
-from .data_access import get_io_data_from_collect_plug, check_collect_valid
+from .data_access import (
+ get_io_data_from_collect_plug,
+ check_collect_valid,
+ get_disk_type,
+)
from .io_data import MetricName
from .alarm_report import Xalarm, Report
@@ -34,7 +39,7 @@ def sig_handler(signum, frame):
class SlowIODetection:
_config_parser = None
_disk_list = None
- _detector_name_list = {}
+ _detector_name_list = defaultdict(list)
_disk_detectors = {}
def __init__(self, config_parser: ConfigParser):
@@ -43,9 +48,13 @@ class SlowIODetection:
self.__init_detector()
def __init_detector_name_list(self):
- self._disk_list = check_collect_valid(self._config_parser.slow_io_detect_frequency)
+ self._disk_list = check_collect_valid(
+ self._config_parser.slow_io_detect_frequency
+ )
if self._disk_list is None:
- Report.report_pass("get available disk error, please check if the collector plug is enable. exiting...")
+ Report.report_pass(
+ "get available disk error, please check if the collector plug is enable. exiting..."
+ )
exit(1)
logging.info(f"ai_block_io plug has found disks: {self._disk_list}")
@@ -56,27 +65,45 @@ class SlowIODetection:
# 情况2is not None and len = 0则不启动任何磁盘检测
# 情况3len = 0则取交集
if disks is None:
- logging.warning("you not specify any disk or use default, so ai_block_io will enable all available disk.")
- for disk in self._disk_list:
- for stage in stages:
- for iotype in iotypes:
- if disk not in self._detector_name_list:
- self._detector_name_list[disk] = []
- self._detector_name_list[disk].append(MetricName(disk, stage, iotype, "latency"))
- else:
- for disk in disks:
- if disk in self._disk_list:
- for stage in stages:
- for iotype in iotypes:
- if disk not in self._detector_name_list:
- self._detector_name_list[disk] = []
- self._detector_name_list[disk].append(MetricName(disk, stage, iotype, "latency"))
- else:
- logging.warning("disk: [%s] not in available disk list, so it will be ignored.", disk)
- if len(self._detector_name_list) == 0:
- logging.critical("the disks to detection is empty, ai_block_io will exit.")
- Report.report_pass("the disks to detection is empty, ai_block_io will exit.")
- exit(1)
+ logging.warning(
+ "you not specify any disk or use default, so ai_block_io will enable all available disk."
+ )
+ for disk in self._disk_list:
+ if disks is not None:
+ if disk not in disks:
+ continue
+ disks.remove(disk)
+
+ disk_type_result = get_disk_type(disk)
+ if disk_type_result["ret"] == 0 and disk_type_result["message"] in (
+ '0',
+ '1',
+ '2',
+ ):
+ disk_type = int(disk_type_result["message"])
+ else:
+ logging.warning(
+ "%s get disk type error, return %s, so it will be ignored.",
+ disk,
+ disk_type_result,
+ )
+ continue
+ for stage in stages:
+ for iotype in iotypes:
+ self._detector_name_list[disk].append(
+ MetricName(disk, disk_type, stage, iotype, "latency")
+ )
+ if disks:
+ logging.warning(
+ "disks: %s not in available disk list, so they will be ignored.",
+ disks,
+ )
+ if not self._detector_name_list:
+ logging.critical("the disks to detection is empty, ai_block_io will exit.")
+ Report.report_pass(
+ "the disks to detection is empty, ai_block_io will exit."
+ )
+ exit(1)
def __init_detector(self):
train_data_duration, train_update_duration = (
@@ -88,26 +115,39 @@ class SlowIODetection:
train_data_duration, train_update_duration, slow_io_detection_frequency
)
sliding_window_type = self._config_parser.sliding_window_type
- window_size, window_threshold = (self._config_parser.get_window_size_and_window_minimum_threshold())
+ window_size, window_threshold = (
+ self._config_parser.get_window_size_and_window_minimum_threshold()
+ )
for disk, metric_name_list in self._detector_name_list.items():
- threshold = ThresholdFactory().get_threshold(
- threshold_type,
- boxplot_parameter=self._config_parser.boxplot_parameter,
- n_sigma_paramter=self._config_parser.n_sigma_parameter,
- data_queue_size=data_queue_size,
- data_queue_update_size=update_size,
- )
- sliding_window = SlidingWindowFactory().get_sliding_window(
- sliding_window_type,
- queue_length=window_size,
- threshold=window_threshold,
- )
disk_detector = DiskDetector(disk)
for metric_name in metric_name_list:
+ threshold = ThresholdFactory().get_threshold(
+ threshold_type,
+ boxplot_parameter=self._config_parser.boxplot_parameter,
+ n_sigma_paramter=self._config_parser.n_sigma_parameter,
+ data_queue_size=data_queue_size,
+ data_queue_update_size=update_size,
+ )
+ abs_threshold = self._config_parser.get_tot_lim(
+ metric_name.disk_type, metric_name.io_access_type_name
+ )
+ if abs_threshold is None:
+ logging.warning(
+ "disk %s, disk type %s, io type %s, get tot lim error, so it will be ignored.",
+ disk,
+ metric_name.disk_type,
+ metric_name.io_access_type_name,
+ )
+ sliding_window = SlidingWindowFactory().get_sliding_window(
+ sliding_window_type,
+ queue_length=window_size,
+ threshold=window_threshold,
+ abs_threshold=abs_threshold,
+ )
detector = Detector(metric_name, threshold, sliding_window)
disk_detector.add_detector(detector)
- logging.info(f'disk: [{disk}] add detector:\n [{disk_detector}]')
+ logging.info(f"disk: [{disk}] add detector:\n [{disk_detector}]")
self._disk_detectors[disk] = disk_detector
def launch(self):
@@ -138,14 +178,17 @@ class SlowIODetection:
logging.debug("step3. Report slow io event to sysSentry.")
for slow_io_event in slow_io_event_list:
metric_name: MetricName = slow_io_event[1]
+ window_info = slow_io_event[2]
+ root_cause = slow_io_event[3]
alarm_content = {
- "driver_name": f"{metric_name.get_disk_name()}",
- "reason": "disk_slow",
- "block_stack": f"{metric_name.get_stage_name()}",
- "io_type": f"{metric_name.get_io_access_type_name()}",
+ "driver_name": f"{metric_name.disk_name}",
+ "reason": root_cause,
+ "block_stack": f"{metric_name.stage_name}",
+ "io_type": f"{metric_name.io_access_type_name}",
"alarm_source": "ai_block_io",
"alarm_type": "latency",
- "details": f"current window is: {slow_io_event[2]}, threshold is: {slow_io_event[3]}.",
+ "details": f"disk type: {metric_name.disk_type}, current window: {window_info[1]}, "
+ f"ai threshold: {window_info[2]}, abs threshold: {window_info[3]}.",
}
Xalarm.major(alarm_content)
logging.warning(alarm_content)
diff --git a/src/python/sentryPlugins/ai_block_io/config_parser.py b/src/python/sentryPlugins/ai_block_io/config_parser.py
index a357766..3388cd4 100644
--- a/src/python/sentryPlugins/ai_block_io/config_parser.py
+++ b/src/python/sentryPlugins/ai_block_io/config_parser.py
@@ -20,59 +20,62 @@ from .utils import get_threshold_type_enum, get_sliding_window_type_enum, get_lo
LOG_FORMAT = "%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s"
-ALL_STAGE_LIST = ['throtl', 'wbt', 'gettag', 'plug', 'deadline', 'hctx', 'requeue', 'rq_driver', 'bio']
-ALL_IOTPYE_LIST = ['read', 'write']
+ALL_STAGE_LIST = [
+ "throtl",
+ "wbt",
+ "gettag",
+ "plug",
+ "deadline",
+ "hctx",
+ "requeue",
+ "rq_driver",
+ "bio",
+]
+ALL_IOTPYE_LIST = ["read", "write"]
+DISK_TYPE_MAP = {
+ 0: "nvme_ssd",
+ 1: "sata_ssd",
+ 2: "sata_hdd",
+}
def init_log_format(log_level: str):
logging.basicConfig(level=get_log_level(log_level.lower()), format=LOG_FORMAT)
if log_level.lower() not in ("info", "warning", "error", "debug"):
logging.warning(
- f"the log_level: {log_level} you set is invalid, use default value: info."
+ "the log_level: %s you set is invalid, use default value: info.", log_level
)
class ConfigParser:
- DEFAULT_ABSOLUTE_THRESHOLD = 40
- DEFAULT_SLOW_IO_DETECTION_FREQUENCY = 1
- DEFAULT_LOG_LEVEL = "info"
-
- DEFAULT_STAGE = 'throtl,wbt,gettag,plug,deadline,hctx,requeue,rq_driver,bio'
- DEFAULT_IOTYPE = 'read,write'
-
- DEFAULT_ALGORITHM_TYPE = "boxplot"
- DEFAULT_TRAIN_DATA_DURATION = 24
- DEFAULT_TRAIN_UPDATE_DURATION = 2
- DEFAULT_BOXPLOT_PARAMETER = 1.5
- DEFAULT_N_SIGMA_PARAMETER = 3
-
- DEFAULT_SLIDING_WINDOW_TYPE = "not_continuous"
- DEFAULT_WINDOW_SIZE = 30
- DEFAULT_WINDOW_MINIMUM_THRESHOLD = 6
+ DEFAULT_CONF = {
+ "log": {"level": "info"},
+ "common": {
+ "slow_io_detect_frequency": 1,
+ "disk": None,
+ "stage": "throtl,wbt,gettag,plug,deadline,hctx,requeue,rq_driver,bio",
+ "iotype": "read,write",
+ },
+ "algorithm": {
+ "train_data_duration": 24.0,
+ "train_update_duration": 2.0,
+ "algorithm_type": get_threshold_type_enum("boxplot"),
+ "boxplot_parameter": 1.5,
+ "n_sigma_parameter": 3.0,
+ },
+ "sliding_window": {
+ "sliding_window_type": get_sliding_window_type_enum("not_continuous"),
+ "window_size": 30,
+ "window_minimum_threshold": 6,
+ },
+ "latency_sata_ssd": {"read_tot_lim": 50000, "write_tot_lim": 50000},
+ "latency_nvme_ssd": {"read_tot_lim": 500, "write_tot_lim": 500},
+ "latency_sata_hdd": {"read_tot_lim": 50000, "write_tot_lim": 50000},
+ }
def __init__(self, config_file_name):
- self.__absolute_threshold = ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD
- self.__slow_io_detect_frequency = (
- ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY
- )
- self.__log_level = ConfigParser.DEFAULT_LOG_LEVEL
- self.__disks_to_detection = None
- self.__stage = ConfigParser.DEFAULT_STAGE
- self.__iotype = ConfigParser.DEFAULT_IOTYPE
-
- self.__algorithm_type = get_threshold_type_enum(
- ConfigParser.DEFAULT_ALGORITHM_TYPE
- )
- self.__train_data_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION
- self.__train_update_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION
- self.__boxplot_parameter = ConfigParser.DEFAULT_BOXPLOT_PARAMETER
- self.__n_sigma_parameter = ConfigParser.DEFAULT_N_SIGMA_PARAMETER
-
- self.__sliding_window_type = ConfigParser.DEFAULT_SLIDING_WINDOW_TYPE
- self.__window_size = ConfigParser.DEFAULT_WINDOW_SIZE
- self.__window_minimum_threshold = ConfigParser.DEFAULT_WINDOW_MINIMUM_THRESHOLD
-
- self.__config_file_name = config_file_name
+ self._conf = ConfigParser.DEFAULT_CONF
+ self._config_file_name = config_file_name
def _get_config_value(
self,
@@ -156,30 +159,21 @@ class ConfigParser:
return value
- def __read_absolute_threshold(self, items_common: dict):
- self.__absolute_threshold = self._get_config_value(
- items_common,
- "absolute_threshold",
- float,
- ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD,
- gt=0,
- )
-
- def __read__slow_io_detect_frequency(self, items_common: dict):
- self.__slow_io_detect_frequency = self._get_config_value(
+ def _read_slow_io_detect_frequency(self, items_common: dict):
+ self._conf["common"]["slow_io_detect_frequency"] = self._get_config_value(
items_common,
"slow_io_detect_frequency",
int,
- ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY,
+ self.DEFAULT_CONF["common"]["slow_io_detect_frequency"],
gt=0,
le=300,
)
- def __read__disks_to_detect(self, items_common: dict):
+ def _read_disks_to_detect(self, items_common: dict):
disks_to_detection = items_common.get("disk")
if disks_to_detection is None:
logging.warning("config of disk not found, the default value will be used.")
- self.__disks_to_detection = None
+ self._conf["common"]["disk"] = None
return
disks_to_detection = disks_to_detection.strip()
if not disks_to_detection:
@@ -189,40 +183,46 @@ class ConfigParser:
)
exit(1)
disk_list = disks_to_detection.split(",")
+ disk_list = [disk.strip() for disk in disk_list]
if len(disk_list) == 1 and disk_list[0] == "default":
- self.__disks_to_detection = None
+ self._conf["common"]["disk"] = None
return
- self.__disks_to_detection = disk_list
+ self._conf["common"]["disk"] = disk_list
- def __read__train_data_duration(self, items_algorithm: dict):
- self.__train_data_duration = self._get_config_value(
+ def _read_train_data_duration(self, items_algorithm: dict):
+ self._conf["common"]["train_data_duration"] = self._get_config_value(
items_algorithm,
"train_data_duration",
float,
- ConfigParser.DEFAULT_TRAIN_DATA_DURATION,
+ self.DEFAULT_CONF["algorithm"]["train_data_duration"],
gt=0,
le=720,
)
- def __read__train_update_duration(self, items_algorithm: dict):
- default_train_update_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION
- if default_train_update_duration > self.__train_data_duration:
- default_train_update_duration = self.__train_data_duration / 2
- self.__train_update_duration = self._get_config_value(
+ def _read_train_update_duration(self, items_algorithm: dict):
+ default_train_update_duration = self.DEFAULT_CONF["algorithm"][
+ "train_update_duration"
+ ]
+ if default_train_update_duration > self._conf["common"]["train_data_duration"]:
+ default_train_update_duration = (
+ self._conf["common"]["train_data_duration"] / 2
+ )
+ self._conf["common"]["train_update_duration"] = self._get_config_value(
items_algorithm,
"train_update_duration",
float,
default_train_update_duration,
gt=0,
- le=self.__train_data_duration,
+ le=self._conf["common"]["train_data_duration"],
)
- def __read__algorithm_type_and_parameter(self, items_algorithm: dict):
- algorithm_type = items_algorithm.get(
- "algorithm_type", ConfigParser.DEFAULT_ALGORITHM_TYPE
- )
- self.__algorithm_type = get_threshold_type_enum(algorithm_type)
- if self.__algorithm_type is None:
+ def _read_algorithm_type_and_parameter(self, items_algorithm: dict):
+ algorithm_type = items_algorithm.get("algorithm_type")
+ if algorithm_type is not None:
+ self._conf["algorithm"]["algorithm_type"] = get_threshold_type_enum(
+ algorithm_type
+ )
+ if self._conf["algorithm"]["algorithm_type"] is None:
logging.critical(
"the algorithm_type: %s you set is invalid. ai_block_io plug will exit.",
algorithm_type,
@@ -231,129 +231,175 @@ class ConfigParser:
f"the algorithm_type: {algorithm_type} you set is invalid. ai_block_io plug will exit."
)
exit(1)
-
- if self.__algorithm_type == ThresholdType.NSigmaThreshold:
- self.__n_sigma_parameter = self._get_config_value(
+ elif self._conf["algorithm"]["algorithm_type"] == ThresholdType.NSigmaThreshold:
+ self._conf["algorithm"]["n_sigma_parameter"] = self._get_config_value(
items_algorithm,
"n_sigma_parameter",
float,
- ConfigParser.DEFAULT_N_SIGMA_PARAMETER,
+ self.DEFAULT_CONF["algorithm"]["n_sigma_parameter"],
gt=0,
le=10,
)
- elif self.__algorithm_type == ThresholdType.BoxplotThreshold:
- self.__boxplot_parameter = self._get_config_value(
+ elif (
+ self._conf["algorithm"]["algorithm_type"] == ThresholdType.BoxplotThreshold
+ ):
+ self._conf["algorithm"]["boxplot_parameter"] = self._get_config_value(
items_algorithm,
"boxplot_parameter",
float,
- ConfigParser.DEFAULT_BOXPLOT_PARAMETER,
+ self.DEFAULT_CONF["algorithm"]["boxplot_parameter"],
gt=0,
le=10,
)
- def __read__stage(self, items_algorithm: dict):
- stage_str = items_algorithm.get('stage', ConfigParser.DEFAULT_STAGE)
- stage_list = stage_str.split(',')
- if len(stage_list) == 1 and stage_list[0] == '':
- logging.critical('stage value not allow is empty, exiting...')
+ def _read_stage(self, items_algorithm: dict):
+ stage_str = items_algorithm.get(
+ "stage", self.DEFAULT_CONF["common"]["stage"]
+ ).strip()
+ stage_list = stage_str.split(",")
+ stage_list = [stage.strip() for stage in stage_list]
+ if len(stage_list) == 1 and stage_list[0] == "":
+ logging.critical("stage value not allow is empty, exiting...")
exit(1)
- if len(stage_list) == 1 and stage_list[0] == 'default':
- logging.warning(f'stage will enable default value: {ConfigParser.DEFAULT_STAGE}')
- self.__stage = ALL_STAGE_LIST
+ if len(stage_list) == 1 and stage_list[0] == "default":
+ logging.warning(
+ "stage will enable default value: %s",
+ self.DEFAULT_CONF["common"]["stage"],
+ )
+ self._conf["common"]["stage"] = ALL_STAGE_LIST
return
for stage in stage_list:
if stage not in ALL_STAGE_LIST:
- logging.critical(f'stage: {stage} is not valid stage, ai_block_io will exit...')
+ logging.critical(
+ "stage: %s is not valid stage, ai_block_io will exit...", stage
+ )
exit(1)
dup_stage_list = set(stage_list)
- if 'bio' not in dup_stage_list:
- logging.critical('stage must contains bio stage, exiting...')
+ if "bio" not in dup_stage_list:
+ logging.critical("stage must contains bio stage, exiting...")
exit(1)
- self.__stage = dup_stage_list
-
- def __read__iotype(self, items_algorithm: dict):
- iotype_str = items_algorithm.get('iotype', ConfigParser.DEFAULT_IOTYPE)
- iotype_list = iotype_str.split(',')
- if len(iotype_list) == 1 and iotype_list[0] == '':
- logging.critical('iotype value not allow is empty, exiting...')
+ self._conf["common"]["stage"] = dup_stage_list
+
+ def _read_iotype(self, items_algorithm: dict):
+ iotype_str = items_algorithm.get(
+ "iotype", self.DEFAULT_CONF["common"]["iotype"]
+ ).strip()
+ iotype_list = iotype_str.split(",")
+ iotype_list = [iotype.strip() for iotype in iotype_list]
+ if len(iotype_list) == 1 and iotype_list[0] == "":
+ logging.critical("iotype value not allow is empty, exiting...")
exit(1)
- if len(iotype_list) == 1 and iotype_list[0] == 'default':
- logging.warning(f'iotype will enable default value: {ConfigParser.DEFAULT_IOTYPE}')
- self.__iotype = ALL_IOTPYE_LIST
+ if len(iotype_list) == 1 and iotype_list[0] == "default":
+ logging.warning(
+ "iotype will enable default value: %s",
+ self.DEFAULT_CONF["common"]["iotype"],
+ )
+ self._conf["common"]["iotype"] = ALL_IOTPYE_LIST
return
for iotype in iotype_list:
if iotype not in ALL_IOTPYE_LIST:
- logging.critical(f'iotype: {iotype} is not valid iotype, ai_block_io will exit...')
+ logging.critical(
+ "iotype: %s is not valid iotype, ai_block_io will exit...", iotype
+ )
exit(1)
dup_iotype_list = set(iotype_list)
- self.__iotype = dup_iotype_list
+ self._conf["common"]["iotype"] = dup_iotype_list
+
+ def _read_sliding_window_type(self, items_sliding_window: dict):
+ sliding_window_type = items_sliding_window.get("sliding_window_type")
+ if sliding_window_type is not None:
+ self._conf["sliding_window"]["sliding_window_type"] = (
+ get_sliding_window_type_enum(sliding_window_type)
+ )
+ if self._conf["sliding_window"]["sliding_window_type"] is None:
+ logging.critical(
+ "the sliding_window_type: %s you set is invalid. ai_block_io plug will exit.",
+ sliding_window_type,
+ )
+ Report.report_pass(
+ f"the sliding_window_type: {sliding_window_type} you set is invalid. ai_block_io plug will exit."
+ )
+ exit(1)
- def __read__window_size(self, items_sliding_window: dict):
- self.__window_size = self._get_config_value(
+ def _read_window_size(self, items_sliding_window: dict):
+ self._conf["sliding_window"]["window_size"] = self._get_config_value(
items_sliding_window,
"window_size",
int,
- ConfigParser.DEFAULT_WINDOW_SIZE,
+ self.DEFAULT_CONF["sliding_window"]["window_size"],
gt=0,
le=3600,
)
- def __read__window_minimum_threshold(self, items_sliding_window: dict):
- default_window_minimum_threshold = ConfigParser.DEFAULT_WINDOW_MINIMUM_THRESHOLD
- if default_window_minimum_threshold > self.__window_size:
- default_window_minimum_threshold = self.__window_size / 2
- self.__window_minimum_threshold = self._get_config_value(
- items_sliding_window,
- "window_minimum_threshold",
- int,
- default_window_minimum_threshold,
- gt=0,
- le=self.__window_size,
+ def _read_window_minimum_threshold(self, items_sliding_window: dict):
+ default_window_minimum_threshold = self.DEFAULT_CONF["sliding_window"][
+ "window_minimum_threshold"
+ ]
+ if (
+ default_window_minimum_threshold
+ > self._conf["sliding_window"]["window_size"]
+ ):
+ default_window_minimum_threshold = (
+ self._conf["sliding_window"]["window_size"] / 2
+ )
+ self._conf["sliding_window"]["window_minimum_threshold"] = (
+ self._get_config_value(
+ items_sliding_window,
+ "window_minimum_threshold",
+ int,
+ default_window_minimum_threshold,
+ gt=0,
+ le=self._conf["sliding_window"]["window_size"],
+ )
)
def read_config_from_file(self):
- if not os.path.exists(self.__config_file_name):
- init_log_format(self.__log_level)
+ if not os.path.exists(self._config_file_name):
+ init_log_format(self._conf["log"]["level"])
logging.critical(
"config file %s not found, ai_block_io plug will exit.",
- self.__config_file_name,
+ self._config_file_name,
)
Report.report_pass(
- f"config file {self.__config_file_name} not found, ai_block_io plug will exit."
+ f"config file {self._config_file_name} not found, ai_block_io plug will exit."
)
exit(1)
con = configparser.ConfigParser()
try:
- con.read(self.__config_file_name, encoding="utf-8")
+ con.read(self._config_file_name, encoding="utf-8")
except configparser.Error as e:
- init_log_format(self.__log_level)
+ init_log_format(self._conf["log"]["level"])
logging.critical(
- f"config file read error: %s, ai_block_io plug will exit.", e
+ "config file read error: %s, ai_block_io plug will exit.", e
)
Report.report_pass(
f"config file read error: {e}, ai_block_io plug will exit."
)
exit(1)
- if con.has_section('log'):
- items_log = dict(con.items('log'))
+ if con.has_section("log"):
+ items_log = dict(con.items("log"))
# 情况一没有log则使用默认值
# 情况二有log值为空或异常使用默认值
# 情况三有log值正常则使用该值
- self.__log_level = items_log.get('level', ConfigParser.DEFAULT_LOG_LEVEL)
- init_log_format(self.__log_level)
+ self._conf["log"]["level"] = items_log.get(
+ "level", self.DEFAULT_CONF["log"]["level"]
+ )
+ init_log_format(self._conf["log"]["level"])
else:
- init_log_format(self.__log_level)
- logging.warning(f"log section parameter not found, it will be set to default value.")
+ init_log_format(self._conf["log"]["level"])
+ logging.warning(
+ "log section parameter not found, it will be set to default value."
+ )
if con.has_section("common"):
items_common = dict(con.items("common"))
- self.__read_absolute_threshold(items_common)
- self.__read__slow_io_detect_frequency(items_common)
- self.__read__disks_to_detect(items_common)
- self.__read__stage(items_common)
- self.__read__iotype(items_common)
+
+ self._read_slow_io_detect_frequency(items_common)
+ self._read_disks_to_detect(items_common)
+ self._read_stage(items_common)
+ self._read_iotype(items_common)
else:
logging.warning(
"common section parameter not found, it will be set to default value."
@@ -361,9 +407,9 @@ class ConfigParser:
if con.has_section("algorithm"):
items_algorithm = dict(con.items("algorithm"))
- self.__read__train_data_duration(items_algorithm)
- self.__read__train_update_duration(items_algorithm)
- self.__read__algorithm_type_and_parameter(items_algorithm)
+ self._read_train_data_duration(items_algorithm)
+ self._read_train_update_duration(items_algorithm)
+ self._read_algorithm_type_and_parameter(items_algorithm)
else:
logging.warning(
"algorithm section parameter not found, it will be set to default value."
@@ -371,101 +417,162 @@ class ConfigParser:
if con.has_section("sliding_window"):
items_sliding_window = dict(con.items("sliding_window"))
- sliding_window_type = items_sliding_window.get(
- "sliding_window_type", ConfigParser.DEFAULT_SLIDING_WINDOW_TYPE
+
+ self._read_window_size(items_sliding_window)
+ self._read_window_minimum_threshold(items_sliding_window)
+ else:
+ logging.warning(
+ "sliding_window section parameter not found, it will be set to default value."
+ )
+
+ if con.has_section("latency_sata_ssd"):
+ items_latency_sata_ssd = dict(con.items("latency_sata_ssd"))
+ self._conf["latency_sata_ssd"]["read_tot_lim"] = self._get_config_value(
+ items_latency_sata_ssd,
+ "read_tot_lim",
+ int,
+ self.DEFAULT_CONF["latency_sata_ssd"]["read_tot_lim"],
+ gt=0,
)
- self.__sliding_window_type = get_sliding_window_type_enum(
- sliding_window_type
+ self._conf["latency_sata_ssd"]["write_tot_lim"] = self._get_config_value(
+ items_latency_sata_ssd,
+ "write_tot_lim",
+ int,
+ self.DEFAULT_CONF["latency_sata_ssd"]["write_tot_lim"],
+ gt=0,
)
- self.__read__window_size(items_sliding_window)
- self.__read__window_minimum_threshold(items_sliding_window)
else:
logging.warning(
- "sliding_window section parameter not found, it will be set to default value."
+ "latency_sata_ssd section parameter not found, it will be set to default value."
+ )
+ if con.has_section("latency_nvme_ssd"):
+ items_latency_nvme_ssd = dict(con.items("latency_nvme_ssd"))
+ self._conf["latency_nvme_ssd"]["read_tot_lim"] = self._get_config_value(
+ items_latency_nvme_ssd,
+ "read_tot_lim",
+ int,
+ self.DEFAULT_CONF["latency_nvme_ssd"]["read_tot_lim"],
+ gt=0,
+ )
+ self._conf["latency_nvme_ssd"]["write_tot_lim"] = self._get_config_value(
+ items_latency_nvme_ssd,
+ "write_tot_lim",
+ int,
+ self.DEFAULT_CONF["latency_nvme_ssd"]["write_tot_lim"],
+ gt=0,
+ )
+ else:
+ logging.warning(
+ "latency_nvme_ssd section parameter not found, it will be set to default value."
+ )
+ if con.has_section("latency_sata_hdd"):
+ items_latency_sata_hdd = dict(con.items("latency_sata_hdd"))
+ self._conf["latency_sata_hdd"]["read_tot_lim"] = self._get_config_value(
+ items_latency_sata_hdd,
+ "read_tot_lim",
+ int,
+ self.DEFAULT_CONF["latency_sata_hdd"]["read_tot_lim"],
+ gt=0,
+ )
+ self._conf["latency_sata_hdd"]["write_tot_lim"] = self._get_config_value(
+ items_latency_sata_hdd,
+ "write_tot_lim",
+ int,
+ self.DEFAULT_CONF["latency_sata_hdd"]["write_tot_lim"],
+ gt=0,
+ )
+ else:
+ logging.warning(
+ "latency_sata_hdd section parameter not found, it will be set to default value."
)
self.__print_all_config_value()
- def __repr__(self):
- config_str = {
- 'log.level': self.__log_level,
- 'common.absolute_threshold': self.__absolute_threshold,
- 'common.slow_io_detect_frequency': self.__slow_io_detect_frequency,
- 'common.disk': self.__disks_to_detection,
- 'common.stage': self.__stage,
- 'common.iotype': self.__iotype,
- 'algorithm.train_data_duration': self.__train_data_duration,
- 'algorithm.train_update_duration': self.__train_update_duration,
- 'algorithm.algorithm_type': self.__algorithm_type,
- 'algorithm.boxplot_parameter': self.__boxplot_parameter,
- 'algorithm.n_sigma_parameter': self.__n_sigma_parameter,
- 'sliding_window.sliding_window_type': self.__sliding_window_type,
- 'sliding_window.window_size': self.__window_size,
- 'sliding_window.window_minimum_threshold': self.__window_minimum_threshold
- }
- return str(config_str)
+ def __repr__(self) -> str:
+ return str(self._conf)
+
+ def __str__(self) -> str:
+ return str(self._conf)
def __print_all_config_value(self):
- logging.info(f"all config is follow:\n {self}")
+ logging.info("all config is follow:\n %s", self)
+
+ def get_tot_lim(self, disk_type, io_type):
+ if io_type == "read":
+ return self._conf.get(
+ f"latency_{DISK_TYPE_MAP.get(disk_type, '')}", {}
+ ).get("read_tot_lim", None)
+ elif io_type == "write":
+ return self._conf.get(
+ f"latency_{DISK_TYPE_MAP.get(disk_type, '')}", {}
+ ).get("write_tot_lim", None)
+ else:
+ return None
def get_train_data_duration_and_train_update_duration(self):
- return self.__train_data_duration, self.__train_update_duration
+ return (
+ self._conf["common"]["train_data_duration"],
+ self._conf["common"]["train_update_duration"],
+ )
def get_window_size_and_window_minimum_threshold(self):
- return self.__window_size, self.__window_minimum_threshold
+ return (
+ self._conf["sliding_window"]["window_size"],
+ self._conf["sliding_window"]["window_minimum_threshold"],
+ )
@property
def slow_io_detect_frequency(self):
- return self.__slow_io_detect_frequency
+ return self._conf["common"]["slow_io_detect_frequency"]
@property
def algorithm_type(self):
- return self.__algorithm_type
+ return self._conf["algorithm"]["algorithm_type"]
@property
def sliding_window_type(self):
- return self.__sliding_window_type
+ return self._conf["sliding_window"]["sliding_window_type"]
@property
def train_data_duration(self):
- return self.__train_data_duration
+ return self._conf["common"]["train_data_duration"]
@property
def train_update_duration(self):
- return self.__train_update_duration
+ return self._conf["common"]["train_update_duration"]
@property
def window_size(self):
- return self.__window_size
+ return self._conf["sliding_window"]["window_size"]
@property
def window_minimum_threshold(self):
- return self.__window_minimum_threshold
+ return self._conf["sliding_window"]["window_minimum_threshold"]
@property
def absolute_threshold(self):
- return self.__absolute_threshold
+ return self._conf["common"]["absolute_threshold"]
@property
def log_level(self):
- return self.__log_level
+ return self._conf["log"]["level"]
@property
def disks_to_detection(self):
- return self.__disks_to_detection
+ return self._conf["common"]["disk"]
@property
def stage(self):
- return self.__stage
+ return self._conf["common"]["stage"]
@property
def iotype(self):
- return self.__iotype
+ return self._conf["common"]["iotype"]
@property
def boxplot_parameter(self):
- return self.__boxplot_parameter
+ return self._conf["algorithm"]["boxplot_parameter"]
@property
def n_sigma_parameter(self):
- return self.__n_sigma_parameter
+ return self._conf["algorithm"]["n_sigma_parameter"]
diff --git a/src/python/sentryPlugins/ai_block_io/data_access.py b/src/python/sentryPlugins/ai_block_io/data_access.py
index ed997e6..1bc5ed8 100644
--- a/src/python/sentryPlugins/ai_block_io/data_access.py
+++ b/src/python/sentryPlugins/ai_block_io/data_access.py
@@ -16,6 +16,7 @@ from sentryCollector.collect_plugin import (
Result_Messages,
get_io_data,
is_iocollect_valid,
+ get_disk_type
)
diff --git a/src/python/sentryPlugins/ai_block_io/detector.py b/src/python/sentryPlugins/ai_block_io/detector.py
index e710ddd..87bd1dd 100644
--- a/src/python/sentryPlugins/ai_block_io/detector.py
+++ b/src/python/sentryPlugins/ai_block_io/detector.py
@@ -17,9 +17,6 @@ from .utils import get_metric_value_from_io_data_dict_by_metric_name
class Detector:
- _metric_name: MetricName = None
- _threshold: Threshold = None
- _slidingWindow: SlidingWindow = None
def __init__(self, metric_name: MetricName, threshold: Threshold, sliding_window: SlidingWindow):
self._metric_name = metric_name
@@ -40,18 +37,24 @@ class Detector:
metric_value = get_metric_value_from_io_data_dict_by_metric_name(io_data_dict_with_disk_name, self._metric_name)
if metric_value is None:
logging.debug('not found metric value, so return None.')
- return False, None, None
+ return (False, False), None, None, None
logging.debug(f'input metric value: {str(metric_value)}')
self._threshold.push_latest_data_to_queue(metric_value)
detection_result = self._slidingWindow.is_slow_io_event(metric_value)
- logging.debug(f'Detection result: {str(detection_result)}')
+ # 检测到慢周期由Detector负责打印info级别日志
+ if detection_result[0][1]:
+ logging.info(f'[abnormal period happen]: disk info: {self._metric_name}, window: {detection_result[1]}, '
+ f'current value: {metric_value}, ai threshold: {detection_result[2]}, '
+ f'absolute threshold: {detection_result[3]}')
+ else:
+ logging.debug(f'Detection result: {str(detection_result)}')
logging.debug(f'exit Detector: {self}')
return detection_result
def __repr__(self):
- return (f'disk_name: {self._metric_name.get_disk_name()}, stage_name: {self._metric_name.get_stage_name()},'
- f' io_type_name: {self._metric_name.get_io_access_type_name()},'
- f' metric_name: {self._metric_name.get_metric_name()}, threshold_type: {self._threshold},'
+ return (f'disk_name: {self._metric_name.disk_name}, stage_name: {self._metric_name.stage_name},'
+ f' io_type_name: {self._metric_name.io_access_type_name},'
+ f' metric_name: {self._metric_name.metric_name}, threshold_type: {self._threshold},'
f' sliding_window_type: {self._slidingWindow}')
@@ -65,13 +68,38 @@ class DiskDetector:
self._detector_list.append(detector)
def is_slow_io_event(self, io_data_dict_with_disk_name: dict):
- # 只有bio阶段发生异常就认为发生了慢IO事件
- # todo根因诊断
+ """
+ 根因诊断逻辑只有bio阶段发生异常才认为发生了慢IO事件即bio阶段异常是慢IO事件的必要条件
+ 情况一bio异常rq_driver也异常则慢盘
+ 情况二bio异常rq_driver无异常且有内核IO栈任意阶段异常则IO栈异常
+ 情况三bio异常rq_driver无异常且无内核IO栈任意阶段异常则IO压力大
+ 情况四bio异常则UNKNOWN
+ """
+ diagnosis_info = {"bio": [], "rq_driver": [], "io_stage": []}
for detector in self._detector_list:
+ # result返回内容(是否检测到慢IO是否检测到慢周期)、窗口、ai阈值、绝对阈值
+ # 示例: (False, False), self._io_data_queue, self._ai_threshold, self._abs_threshold
result = detector.is_slow_io_event(io_data_dict_with_disk_name)
- if result[0] and detector.get_metric_name().get_stage_name() == 'bio':
- return result[0], detector.get_metric_name(), result[1], result[2]
- return False, None, None, None
+ if result[0][0]:
+ if detector.get_metric_name().stage_name == "bio":
+ diagnosis_info["bio"].append((detector.get_metric_name(), result))
+ elif detector.get_metric_name().stage_name == "rq_driver":
+ diagnosis_info["rq_driver"].append((detector.get_metric_name(), result))
+ else:
+ diagnosis_info["io_stage"].append((detector.get_metric_name(), result))
+
+ # 返回内容1是否检测到慢IO事件、2MetricName、3滑动窗口及阈值、4慢IO事件根因
+ root_cause = None
+ if len(diagnosis_info["bio"]) == 0:
+ return False, None, None, None
+ elif len(diagnosis_info["rq_driver"]) != 0:
+ root_cause = "[Root Causedisk slow]"
+ elif len(diagnosis_info["io_stage"]) != 0:
+ stage = diagnosis_info["io_stage"][0][1].get_stage_name()
+ root_cause = f"[Root Causeio stage slow, stage: {stage}]"
+ if root_cause is None:
+ root_cause = "[Root Causehigh io pressure]"
+ return True, diagnosis_info["bio"][0][0], diagnosis_info["bio"][0][1], root_cause
def __repr__(self):
msg = f'disk: {self._disk_name}, '
diff --git a/src/python/sentryPlugins/ai_block_io/io_data.py b/src/python/sentryPlugins/ai_block_io/io_data.py
index 0e17051..d341b55 100644
--- a/src/python/sentryPlugins/ai_block_io/io_data.py
+++ b/src/python/sentryPlugins/ai_block_io/io_data.py
@@ -45,30 +45,10 @@ class IOData:
time_stamp: float = field(default_factory=lambda: datetime.now().timestamp())
+@dataclass(frozen=True)
class MetricName:
- _disk_name: str = None
- _stage_name: str = None
- _io_access_type_name: str = None
- _metric_name: str = None
-
- def __init__(self, disk_name: str, stage_name: str, io_access_type_name: str, metric_name: str):
- self._disk_name = disk_name
- self._stage_name = stage_name
- self._io_access_type_name = io_access_type_name
- self._metric_name = metric_name
-
- def get_disk_name(self):
- return self._disk_name
-
- def get_stage_name(self):
- return self._stage_name
-
- def get_io_access_type_name(self):
- return self._io_access_type_name
-
- def get_metric_name(self):
- return self._metric_name
-
- def __repr__(self):
- return (f'disk: {self._disk_name}, stage: {self._stage_name}, io_access_type: {self._io_access_type_name},'
- f'metric: {self._metric_name}')
+ disk_name: str
+ disk_type: str
+ stage_name: str
+ io_access_type_name: str
+ metric_name: str
diff --git a/src/python/sentryPlugins/ai_block_io/sliding_window.py b/src/python/sentryPlugins/ai_block_io/sliding_window.py
index 89191e5..d7c402a 100644
--- a/src/python/sentryPlugins/ai_block_io/sliding_window.py
+++ b/src/python/sentryPlugins/ai_block_io/sliding_window.py
@@ -21,15 +21,11 @@ class SlidingWindowType(Enum):
class SlidingWindow:
- _ai_threshold = None
- _queue_length = None
- _queue_threshold = None
- _io_data_queue: list = None
- _io_data_queue_abnormal_tag: list = None
-
- def __init__(self, queue_length: int, threshold: int):
+ def __init__(self, queue_length: int, threshold: int, abs_threshold: int = None):
self._queue_length = queue_length
self._queue_threshold = threshold
+ self._ai_threshold = None
+ self._abs_threshold = abs_threshold
self._io_data_queue = []
self._io_data_queue_abnormal_tag = []
@@ -38,7 +34,12 @@ class SlidingWindow:
self._io_data_queue.pop(0)
self._io_data_queue_abnormal_tag.pop(0)
self._io_data_queue.append(data)
- self._io_data_queue_abnormal_tag.append(data >= self._ai_threshold if self._ai_threshold is not None else False)
+ tag = False
+ if ((self._ai_threshold is not None and data >= self._ai_threshold) or
+ (self._abs_threshold is not None and data >= self._abs_threshold)):
+ tag = True
+ self._io_data_queue_abnormal_tag.append(tag)
+ return tag
def update(self, threshold):
if self._ai_threshold == threshold:
@@ -49,7 +50,7 @@ class SlidingWindow:
self._io_data_queue_abnormal_tag.append(data >= self._ai_threshold)
def is_slow_io_event(self, data):
- return False, None, None
+ return False, None, None, None
def __repr__(self):
return "[SlidingWindow]"
@@ -57,12 +58,13 @@ class SlidingWindow:
class NotContinuousSlidingWindow(SlidingWindow):
def is_slow_io_event(self, data):
- super().push(data)
- if len(self._io_data_queue) < self._queue_length or self._ai_threshold is None:
- return False, self._io_data_queue, self._ai_threshold
+ is_abnormal_period = super().push(data)
+ is_slow_io_event = False
+ if len(self._io_data_queue) < self._queue_length or (self._ai_threshold is None and self._abs_threshold is None):
+ is_slow_io_event = False
if self._io_data_queue_abnormal_tag.count(True) >= self._queue_threshold:
- return True, self._io_data_queue, self._ai_threshold
- return False, self._io_data_queue, self._ai_threshold
+ is_slow_io_event = True
+ return (is_slow_io_event, is_abnormal_period), self._io_data_queue, self._ai_threshold, self._abs_threshold
def __repr__(self):
return f"[NotContinuousSlidingWindow, window size: {self._queue_length}, threshold: {self._queue_threshold}]"
@@ -70,18 +72,20 @@ class NotContinuousSlidingWindow(SlidingWindow):
class ContinuousSlidingWindow(SlidingWindow):
def is_slow_io_event(self, data):
- super().push(data)
- if len(self._io_data_queue) < self._queue_length or self._ai_threshold is None:
- return False, self._io_data_queue, self._ai_threshold
+ is_abnormal_period = super().push(data)
+ is_slow_io_event = False
+ if len(self._io_data_queue) < self._queue_length or (self._ai_threshold is None and self._abs_threshold is None):
+ is_slow_io_event = False
consecutive_count = 0
for tag in self._io_data_queue_abnormal_tag:
if tag:
consecutive_count += 1
if consecutive_count >= self._queue_threshold:
- return True, self._io_data_queue, self._ai_threshold
+ is_slow_io_event = True
+ break
else:
consecutive_count = 0
- return False, self._io_data_queue, self._ai_threshold
+ return (is_slow_io_event, is_abnormal_period), self._io_data_queue, self._ai_threshold, self._abs_threshold
def __repr__(self):
return f"[ContinuousSlidingWindow, window size: {self._queue_length}, threshold: {self._queue_threshold}]"
@@ -89,20 +93,23 @@ class ContinuousSlidingWindow(SlidingWindow):
class MedianSlidingWindow(SlidingWindow):
def is_slow_io_event(self, data):
- super().push(data)
- if len(self._io_data_queue) < self._queue_length or self._ai_threshold is None:
- return False, self._io_data_queue, self._ai_threshold
+ is_abnormal_period = super().push(data)
+ is_slow_io_event = False
+ if len(self._io_data_queue) < self._queue_length or (self._ai_threshold is None and self._abs_threshold is None):
+ is_slow_io_event = False
median = np.median(self._io_data_queue)
if median >= self._ai_threshold:
- return True, self._io_data_queue, self._ai_threshold
- return False, self._io_data_queue, self._ai_threshold
+ is_slow_io_event = True
+ return (is_slow_io_event, is_abnormal_period), self._io_data_queue, self._ai_threshold, self._abs_threshold
def __repr__(self):
return f"[MedianSlidingWindow, window size: {self._queue_length}]"
class SlidingWindowFactory:
- def get_sliding_window(self, sliding_window_type: SlidingWindowType, *args, **kwargs):
+ def get_sliding_window(
+ self, sliding_window_type: SlidingWindowType, *args, **kwargs
+ ):
if sliding_window_type == SlidingWindowType.NotContinuousSlidingWindow:
return NotContinuousSlidingWindow(*args, **kwargs)
elif sliding_window_type == SlidingWindowType.ContinuousSlidingWindow:
diff --git a/src/python/sentryPlugins/ai_block_io/utils.py b/src/python/sentryPlugins/ai_block_io/utils.py
index 0ed37b9..d6f4067 100644
--- a/src/python/sentryPlugins/ai_block_io/utils.py
+++ b/src/python/sentryPlugins/ai_block_io/utils.py
@@ -19,53 +19,57 @@ from .io_data import MetricName, IOData
def get_threshold_type_enum(algorithm_type: str):
- if algorithm_type.lower() == 'absolute':
+ if algorithm_type.lower() == "absolute":
return ThresholdType.AbsoluteThreshold
- if algorithm_type.lower() == 'boxplot':
+ if algorithm_type.lower() == "boxplot":
return ThresholdType.BoxplotThreshold
- if algorithm_type.lower() == 'n_sigma':
+ if algorithm_type.lower() == "n_sigma":
return ThresholdType.NSigmaThreshold
return None
def get_sliding_window_type_enum(sliding_window_type: str):
- if sliding_window_type.lower() == 'not_continuous':
+ if sliding_window_type.lower() == "not_continuous":
return SlidingWindowType.NotContinuousSlidingWindow
- if sliding_window_type.lower() == 'continuous':
+ if sliding_window_type.lower() == "continuous":
return SlidingWindowType.ContinuousSlidingWindow
- if sliding_window_type.lower() == 'median':
+ if sliding_window_type.lower() == "median":
return SlidingWindowType.MedianSlidingWindow
- logging.warning(f"the sliding window type: {sliding_window_type} you set is invalid, use default value: not_continuous")
- return SlidingWindowType.NotContinuousSlidingWindow
+ return None
-def get_metric_value_from_io_data_dict_by_metric_name(io_data_dict: dict, metric_name: MetricName):
+def get_metric_value_from_io_data_dict_by_metric_name(
+ io_data_dict: dict, metric_name: MetricName
+):
try:
- io_data: IOData = io_data_dict[metric_name.get_disk_name()]
- io_stage_data = asdict(io_data)[metric_name.get_stage_name()]
- base_data = io_stage_data[metric_name.get_io_access_type_name()]
- metric_value = base_data[metric_name.get_metric_name()]
+ io_data: IOData = io_data_dict[metric_name.disk_name]
+ io_stage_data = asdict(io_data)[metric_name.stage_name]
+ base_data = io_stage_data[metric_name.io_access_type_name]
+ metric_value = base_data[metric_name.metric_name]
return metric_value
except KeyError:
return None
-def get_data_queue_size_and_update_size(training_data_duration: float, train_update_duration: float,
- slow_io_detect_frequency: int):
+def get_data_queue_size_and_update_size(
+ training_data_duration: float,
+ train_update_duration: float,
+ slow_io_detect_frequency: int,
+):
data_queue_size = int(training_data_duration * 60 * 60 / slow_io_detect_frequency)
update_size = int(train_update_duration * 60 * 60 / slow_io_detect_frequency)
return data_queue_size, update_size
def get_log_level(log_level: str):
- if log_level.lower() == 'debug':
+ if log_level.lower() == "debug":
return logging.DEBUG
- elif log_level.lower() == 'info':
+ elif log_level.lower() == "info":
return logging.INFO
- elif log_level.lower() == 'warning':
+ elif log_level.lower() == "warning":
return logging.WARNING
- elif log_level.lower() == 'error':
+ elif log_level.lower() == "error":
return logging.ERROR
- elif log_level.lower() == 'critical':
+ elif log_level.lower() == "critical":
return logging.CRITICAL
return logging.INFO
--
2.23.0