diff --git a/0001-add-new-feature-slow-node-detection.patch b/0001-add-new-feature-slow-node-detection.patch new file mode 100644 index 0000000..bc127db --- /dev/null +++ b/0001-add-new-feature-slow-node-detection.patch @@ -0,0 +1,777 @@ +From 2e30b68154f5d0b6a68eab1bf8408363bbf28a76 Mon Sep 17 00:00:00 2001 +From: huangbin +Date: Tue, 5 Nov 2024 15:37:00 +0800 +Subject: [PATCH] add new feature slow node detection + +--- + anteater/model/detector/slow_node_detector.py | 397 ++++++++++++++++++ + config/module/slow_node_detection.job.json | 352 ++++++++++++++++ + 2 files changed, 749 insertions(+) + create mode 100644 anteater/model/detector/slow_node_detector.py + create mode 100644 config/module/slow_node_detection.job.json + +diff --git a/anteater/model/detector/slow_node_detector.py b/anteater/model/detector/slow_node_detector.py +new file mode 100644 +index 0000000..15a6cee +--- /dev/null ++++ b/anteater/model/detector/slow_node_detector.py +@@ -0,0 +1,397 @@ ++#!/usr/bin/python3 ++# ****************************************************************************** ++# Copyright (c) 2023 Huawei Technologies Co., Ltd. ++# gala-anteater is licensed under Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, ++# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, ++# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. ++# See the Mulan PSL v2 for more details. ++# ******************************************************************************/ ++import time ++import json ++import os.path ++import pprint ++import traceback ++from typing import List ++ ++import numpy as np ++import pandas as pd ++ ++from anteater.core.slow_node_response import AIJobDetectResult, ResultCode, NodeData ++from anteater.core.anomaly import Anomaly, RootCause ++from anteater.core.kpi import KPI, ModelConfig, Feature ++from anteater.utils.datetime import DateTimeManager as dt ++from anteater.utils.timer import timer ++from anteater.utils.log import logger ++from anteater.source.metric_loader import MetricLoader ++from anteater.model.detector.base import Detector ++from anteater.model.process.rank_table_loader import GroupDataLoader ++from anteater.model.algorithms.slow_node_algs import time_node_detectors, space_node_detectors ++ ++ ++class SlowNodeDetector(Detector): ++ def __init__(self, data_loader: MetricLoader, config: ModelConfig, **kwargs): ++ """The detector base class initializer""" ++ super().__init__(data_loader, **kwargs) ++ self.config = config ++ self.max_num_normal_results = self.config.params.get("max_num_normal_results", 10) ++ self.record_kpi_value = self.config.params.get("record_kpi", False) ++ self.hccl_domain, self.rank_table = self._init_hccl_and_rank_table() ++ ++ def _init_hccl_and_rank_table(self): ++ params = self.config.params ++ hccl_domain_path = params.get("hccl_domain_json") ++ rank_table_path = params.get("rank_table_json") ++ ++ hccl_domain = {} ++ rank_table = {} ++ ++ if os.path.exists(hccl_domain_path): ++ try: ++ with open(rank_table_path, 'r', encoding='utf-8') as f_out: ++ hccl_domain = json.load(f_out) ++ except Exception: ++ logger.error(f"Read hccl domain info fail!") ++ if os.path.exists(rank_table_path): ++ try: ++ with open(rank_table_path, 'r', encoding='utf-8') as f_out: ++ rank_table = json.load(f_out) ++ except Exception: ++ logger.error(f"Read rank table info fail!") ++ ++ return hccl_domain, rank_table ++ ++ @staticmethod ++ def npu_id2host_id(machines2devices: dict): ++ npu_id2host_id_dict = {} ++ npu_ids = [] ++ hosts_ids = [] ++ for machine_ip, devices in machines2devices.items(): ++ if devices == [""]: ++ hosts_ids.append(machine_ip) ++ else: ++ npu_ids.append(machine_ip) ++ ++ for npu_id in npu_ids: ++ for host_id in hosts_ids: ++ if npu_id.split(":")[0] in host_id: ++ npu_id2host_id_dict[npu_id] = host_id ++ break ++ ++ return npu_id2host_id_dict, hosts_ids ++ ++ def get_host_ids_by_npu_ids(self, npu_ids: dict, npu_id2host_id_dict: dict, hosts_ids: list) -> list: ++ host_ids = [] ++ if npu_ids: ++ for npu_id in npu_ids: ++ host_id = npu_id2host_id_dict.get(npu_id, "") ++ if host_id: ++ host_ids.append(host_id) ++ else: ++ host_ids = hosts_ids ++ ++ return host_ids ++ ++ @timer ++ def _execute(self, kpis: List[KPI], features: List[Feature], **kwargs) \ ++ -> List[Anomaly]: ++ # save to kafka response ++ anomalies = [] ++ ++ logger.info('Execute cdt model: %s.', self.__class__.__name__) ++ start, end = dt.last(minutes=30) ++ # 获取machine_ids, ++ machines_to_devices = self.get_machines_to_devices(start, end, kpis) ++ npu_id2host_id, hosts_ids = self.npu_id2host_id(machines_to_devices) ++ ++ group_dataloader = GroupDataLoader(self.hccl_domain, self.rank_table, machines_to_devices) ++ group_ranks: list = group_dataloader.get_group_ranks() ++ all_results = [] ++ for kpi in kpis: ++ for ranks in group_ranks: ++ machine_ids: dict = group_dataloader.rank_table_loader.get_group_nodes_by_ranks(ranks) ++ host_ids: list = self.get_host_ids_by_npu_ids(machine_ids, npu_id2host_id, hosts_ids) ++ group_result = self.group_detect_single_kpi(kpi, machine_ids, host_ids) ++ all_results.extend(group_result) ++ ++ response, all_anomaly_nodes = self.gen_final_alarm(kpis, all_results) ++ ++ if response.result_code == ResultCode.anomaly: ++ all_anomaly_nodes = sorted(list(set(all_anomaly_nodes))) ++ anomaly = Anomaly( ++ machine_id=json.dumps(all_anomaly_nodes), ++ metric="slow_node_metric", ++ labels={"instance": "node_ip"}, ++ score=1.0, ++ entity_name="sli", ++ details={"detect_method": "slow_node_detection"}, ++ description=response) ++ anomalies.append(anomaly) ++ ++ return anomalies ++ ++ def gen_final_alarm(self, kpis: List[KPI], detect_results: List): ++ response = AIJobDetectResult() ++ all_anomaly_nodes = [] ++ ++ for index, result in enumerate(detect_results): ++ try: ++ aomaly_devices = result.get("anomaly_devices") ++ all_anomaly_nodes.extend(aomaly_devices) ++ response = self.group_detect_ret_agg(response, result, kpis) ++ except Exception: ++ logger.error(traceback.format_exc()) ++ logger.info("accomplishment: %s/%s", index + 1, len(detect_results)) ++ ++ return response, all_anomaly_nodes ++ ++ def group_detect_single_kpi(self, kpi: KPI, machine_ids: dict, host_ids: list) -> list: ++ """Detects kpi based on signal time series anomaly detection model""" ++ # 普罗会一次性抓到所有的数据,需要根据machine_id, device_id去对数据作分组 ++ # get数据 ++ metric_name: str = kpi.metric ++ ++ all_machines_ts = [] ++ for machine_id in machine_ids: ++ single_machine_ts_list = self.get_kpi_ts_list(metric_name, machine_id, kpi.params) ++ all_machines_ts.extend(single_machine_ts_list) ++ for host_id in host_ids: ++ single_machine_ts_list = self.get_kpi_ts_list(metric_name, host_id, kpi.params) ++ all_machines_ts.extend(single_machine_ts_list) ++ ++ anomaly_devices = [] ++ anomaly_locations = {} ++ space_anomaly_locations = {} ++ ++ detect_data, min_data_len = self.preprocessing_data(metric_name, all_machines_ts) ++ detection_results = { ++ "anomaly_devices": anomaly_devices, ++ "anomaly_locations": anomaly_locations, ++ "detect_result_type": "TIME", ++ "metric_name": metric_name, ++ "group_data": detect_data, ++ } ++ if min_data_len == 0: ++ logger.warning("GROUP data contains EMPTY DATA. GROUP_DATA:%s", pprint.pformat(all_machines_ts)) ++ return [detection_results] ++ logger.info("work on %s, %s start.", metric_name, "slow_node_detection") ++ ++ # 时间检测 ++ # logger.info("work on %s, %s started.", metric_name, "time_node_compare") ++ time_anomaly_locations = self.time_node_compare(kpi, detect_data) ++ logger.info(f"time_node_compare result: {self.output_anomaly_devices(metric_name, time_anomaly_locations)}.") ++ # logger.info("work on %s, %s finished.", metric_name, "time_node_compare") ++ ++ # 空间维度对比 ++ # 若指标空间维度配置为空,则不进行均质化对比 ++ if kpi.params.get("space_detector") is not None: ++ # 四个以上的对象才进行均质化 ++ if len(all_machines_ts) >= 4: ++ # 空间维度对比,输出异常节点 ++ space_anomaly_locations = self.space_nodes_compare(kpi, detect_data) ++ logger.info( ++ f"space_nodes_compare result: {self.output_anomaly_devices(metric_name, space_anomaly_locations)}.") ++ else: ++ logger.info(f"Skip space nodes compare, due to nodes number{len(all_machines_ts)} is smaller than 4.") ++ else: ++ logger.info(f"Skip space nodes compare.") ++ ++ # 时间空间结果融合 ++ anomaly_locations, detect_result_type = self.time_space_agg(time_anomaly_locations, space_anomaly_locations, ++ metric_name) ++ ++ anomaly_devices = self.output_anomaly_devices(metric_name, anomaly_locations) ++ detection_results["anomaly_devices"] = anomaly_devices ++ detection_results["anomaly_locations"] = anomaly_locations ++ detection_results["detect_result_type"] = detect_result_type ++ ++ logger.info(f'''Time and space aggregated result: {anomaly_devices}.''') ++ logger.info("work on %s, %s end.\n", metric_name, "slow_node_detection") ++ ++ return [detection_results] ++ ++ @staticmethod ++ def output_anomaly_devices(metric: str, anomaly_location: dict): ++ anomaly_devices = [] ++ for device_info in anomaly_location.keys(): ++ # 异常点数大于0, 则认为该指标出现异常 ++ if np.sum(anomaly_location[device_info][metric][1]) > 0: ++ anomaly_devices.append(device_info) ++ ++ return anomaly_devices ++ ++ @staticmethod ++ def preprocessing_data(metric_name: str, metric_data: list): ++ if len(metric_data) == 0: ++ return {}, 0 ++ ++ detect_data = {} ++ length = 0 ++ for index, metric_ts in enumerate(metric_data): ++ time_stamps = metric_ts.time_stamps ++ length = len(time_stamps) ++ values = metric_ts.values ++ labels = metric_ts.labels ++ if labels.get("id"): ++ device_label = f'''{labels.get("instance")}*{labels.get("id")}''' ++ else: ++ device_label = f'''{labels.get("instance")}*-1''' ++ detect_data[device_label] = pd.DataFrame({"timestamp": time_stamps, metric_name: values}) ++ ++ return detect_data, length ++ ++ def time_node_compare(self, kpi: KPI, detect_data: dict): ++ metric_name = kpi.metric ++ cfg = kpi.params.get("time_detector", {}) ++ detector_class = time_node_detectors.get(cfg.get("type")) ++ ++ time_node_detector = detector_class(metric_name=metric_name, cfg=cfg) ++ time_node_detector.fit(detect_data) ++ locations = time_node_detector.predict(detect_data) ++ expert_alarm_window_size = kpi.params.get("alarm_filter_window_size") ++ ++ for device_info, anomaly_locations in locations.items(): ++ filter_labels = self.alarm_filter(anomaly_locations[metric_name][1], expert_alarm_window_size) ++ locations[device_info][metric_name][1][:] = filter_labels ++ ++ return locations ++ ++ def space_nodes_compare(self, kpi: KPI, detect_data: dict): ++ metric_name = kpi.metric ++ cfg = kpi.params.get("space_detector", {}) ++ detector_class = space_node_detectors.get(cfg.get("type")) ++ space_detector = detector_class(cfg) ++ df = pd.DataFrame() ++ column_list = [] ++ for device_label, infer_data in detect_data.items(): ++ df[device_label] = infer_data[metric_name] ++ column_list.append(device_label) ++ ++ detect_node_data = df[column_list].values ++ labels = space_detector.detect(detect_node_data) ++ ++ labels = np.swapaxes(labels, 0, 1) ++ space_detect_locations = {} ++ ++ i = 0 ++ for device_label in column_list: ++ space_detect_locations[device_label] = {} ++ space_detect_locations[device_label][metric_name] = detect_data[device_label]["timestamp"], labels[i] ++ i += 1 ++ return space_detect_locations ++ ++ def get_kpi_ts_list(self, metric, machine_id: str, kpi_params: dict): ++ look_back = self.config.params.get("look_back", 10) ++ metric_type = kpi_params.get("metric_type", "device") ++ start, end = dt.last(minutes=look_back) ++ # from datetime import timedelta ++ # # 6:40 ++ # start = start - timedelta(hours=4.5) ++ # end = end - timedelta(hours=4.5) ++ ++ if metric_type == "device": ++ # npu device ++ ts_list = self.data_loader.get_metric(start, end, metric, instance=machine_id) ++ else: ++ # host ++ op = kpi_params.get("method", "avg") ++ ts_list = self.data_loader.get_metric(start, end, metric, operator=op, keys="instance", instance=machine_id) ++ ++ return ts_list ++ ++ @staticmethod ++ def alarm_filter(labels, alarm_filter_window_size): ++ copy_labels = np.zeros(len(labels)) ++ start_index = alarm_filter_window_size ++ alarm_points = set() ++ for i in range(start_index, len(labels) + 1): ++ is_sequential_alarm = (np.sum(labels[i - alarm_filter_window_size:i]) >= alarm_filter_window_size) ++ if not is_sequential_alarm: ++ if np.sum(labels[i - alarm_filter_window_size:i]) > 0: ++ alarm_points.add(i - alarm_filter_window_size) ++ else: ++ copy_labels[i - alarm_filter_window_size:i] = labels[i - alarm_filter_window_size:i] ++ # if alarm_points: ++ # logger.info(f"Alert Remove from point loc", list(alarm_points)) ++ ++ return copy_labels ++ ++ @staticmethod ++ def time_space_agg(time_anomaly_locations, space_anomaly_locations, metric_name): ++ detect_result_type = {} ++ ++ for node_id in time_anomaly_locations.keys(): ++ time_ret = np.sum(time_anomaly_locations[node_id][metric_name][1]) ++ if space_anomaly_locations: ++ space_ret = np.sum(space_anomaly_locations[node_id][metric_name][1]) ++ # 如果均质化没有报错则消除告警 ++ # 若空间检测和时间检测结果都为空,则返回正常值 ++ # 若时间维度和空间维度都出现异常,以空间维度为主返回结果 ++ if space_ret == 0 or (space_ret > 0 and time_ret >= 0): ++ time_anomaly_locations[node_id][metric_name] = space_anomaly_locations[node_id][metric_name] ++ detect_result_type.setdefault(node_id, {}).setdefault(metric_name, "SPACE") ++ else: ++ detect_result_type.setdefault(node_id, {}).setdefault(metric_name, "TIME") ++ else: ++ detect_result_type.setdefault(node_id, {}).setdefault(metric_name, "TIME") ++ ++ return time_anomaly_locations, detect_result_type ++ ++ @staticmethod ++ def _get_kpi_params(kpis: List[KPI], metric_name): ++ for kpi in kpis: ++ if kpi.metric == metric_name: ++ return kpi.params ++ ++ return {} ++ ++ def group_detect_ret_agg(self, response, detect_result, kpis: List[KPI]): ++ anomaly_device_labels = detect_result.get("anomaly_devices") ++ anomaly_locations = detect_result.get("anomaly_locations") ++ metric_name = detect_result.get("metric_name") ++ detect_result_type = detect_result.get("detect_result_type") ++ group_data = detect_result.get("group_data") ++ if len(anomaly_device_labels) == 0: ++ return response ++ else: ++ response.result_code = ResultCode.anomaly ++ kpi_params = self._get_kpi_params(kpis, metric_name) ++ response(kpi_params.get('type', "compute")) ++ ++ keep_devices = [] ++ omitted_devices = [] ++ for device_label in anomaly_device_labels: ++ method_type = detect_result_type.get(device_label, {}).get(metric_name, "TIME") ++ if method_type == "SPACE": ++ normal_devices = sorted(set(group_data.keys()) - set(anomaly_device_labels)) ++ keep_devices = normal_devices[:self.max_num_normal_results] ++ omitted_devices = normal_devices[self.max_num_normal_results:] ++ abnormal_node_data = NodeData(metric_name, device_label, method_type, keep_devices, omitted_devices) ++ time_stamp_data, values = anomaly_locations[device_label][metric_name] ++ label_dict = dict(zip(time_stamp_data.tolist(), values.tolist())) ++ ++ # see user requirements for this real kpi value ++ if self.record_kpi_value: ++ # record anomaly kpi value ++ g_ts, g_value = group_data[device_label].values[:, 0], group_data[device_label].values[:, 1] ++ kpi_data = [] ++ for key, value in sorted(zip(g_ts.tolist(), g_value.tolist()), key=lambda x: x[0]): ++ kpi_data.append({str(key): str(value), "abnormal": label_dict.get(key, 0)}) ++ ++ abnormal_node_data.kpi_data = kpi_data ++ response.abnormal_detail.append(abnormal_node_data) ++ ++ if keep_devices: ++ for device_label in keep_devices: ++ normal_node_data = NodeData(metric_name, device_label, "SPACE") ++ # see user requirements for this real kpi value ++ if self.record_kpi_value: ++ # record normal kpi data for space compare ++ g_ts, g_value = group_data[device_label].values[:, 0], group_data[device_label].values[:, 1] ++ kpi_data = [{str(key): str(value)} for key, value in zip(g_ts.tolist(), g_value.tolist())] ++ normal_node_data.kpi_data = kpi_data ++ response.normal_detail.append(normal_node_data) ++ return response +diff --git a/config/module/slow_node_detection.job.json b/config/module/slow_node_detection.job.json +new file mode 100644 +index 0000000..91ff621 +--- /dev/null ++++ b/config/module/slow_node_detection.job.json +@@ -0,0 +1,352 @@ ++{ ++ "name": "SlowNodeDetector", ++ "enable": true, ++ "job_type": "anomaly_detection", ++ "keywords": [ ++ "app" ++ ], ++ "root_cause_num": 20, ++ "detector": "slow-node-detection", ++ "template": "slow_node", ++ "model_config": { ++ "name": "disruption_model", ++ "params": { ++ "record_kpi": false, ++ "max_num_normal_results": 16, ++ "look_back": 20, ++ "obs_size": 5, ++ "outlier_ratio_th": 0.6, ++ "hccl_domain_json": "./hccl_domain.json", ++ "rank_table_json": "./hccl_domain.json" ++ } ++ }, ++ "kpis": [ ++ { ++ "metric": "gala_gopher_cpu_total_used_per", ++ "entity_name": "sli", ++ "atrend": "rise", ++ "enable": true, ++ "params": { ++ "metric_type": "host", ++ "method": "avg", ++ "priority": 30, ++ "alarm_filter_window_size": 5, ++ "space_detector": null, ++ "time_detector": { ++ "preprocess_eps": 0.1, ++ "preprocess_min_samples": 10, ++ "type": "SlidingWindowKSigmaDetector", ++ "n_sigma_method": { ++ "type": "SlidingWindowNSigma", ++ "training_window_size": 40, ++ "min_update_window_size": 10, ++ "min_std_val": 0.0001, ++ "bias": 0.01, ++ "abs_bias": 0, ++ "nsigma_coefficient": 4, ++ "detect_type": "upper_bound", ++ "min_expert_lower_bound": null, ++ "max_expert_lower_bound": null, ++ "min_expert_upper_bound": 70, ++ "max_expert_upper_bound": 80 ++ } ++ }, ++ "type": "compute" ++ } ++ }, ++ { ++ "metric": "gala_gopher_mem_util", ++ "entity_name": "sli", ++ "atrend": "rise", ++ "enable": true, ++ "params": { ++ "metric_type": "host", ++ "method": "sum", ++ "priority": 20, ++ "alarm_filter_window_size": 5, ++ "space_detector": { ++ "first_gap_rate": 0.3, ++ "second_gap_rate": 0.2, ++ "base_threshold": 150, ++ "discrete_rate": 1.5, ++ "nsigma_coefficient": 2, ++ "discrete_point_suppression_ratio": 0.03, ++ "non_major_anomaly_suppression": 0.1, ++ "type": "OuterDataDetector" ++ }, ++ "time_detector": { ++ "preprocess_eps": 0.1, ++ "preprocess_min_samples": 10, ++ "type": "SlidingWindowKSigmaDetector", ++ "n_sigma_method": { ++ "type": "SlidingWindowNSigma", ++ "training_window_size": 40, ++ "min_update_window_size": 10, ++ "min_std_val": 0.0001, ++ "bias": 0.1, ++ "abs_bias": 5, ++ "nsigma_coefficient": 4, ++ "detect_type": "upper_bound", ++ "min_expert_lower_bound": null, ++ "max_expert_lower_bound": null, ++ "min_expert_upper_bound": 50, ++ "max_expert_upper_bound": null ++ } ++ }, ++ "type": "compute" ++ } ++ }, ++ { ++ "metric": "gala_gopher_disk_wspeed_kB", ++ "entity_name": "sli", ++ "atrend": "rise", ++ "enable": true, ++ "params": { ++ "metric_type": "host", ++ "method": "sum", ++ "priority": 5, ++ "alarm_filter_window_size": 30, ++ "space_detector": null, ++ "time_detector": { ++ "preprocess_eps": 0.1, ++ "preprocess_min_samples": 10, ++ "type": "SlidingWindowKSigmaDetector", ++ "n_sigma_method": { ++ "type": "SlidingWindowNSigma", ++ "training_window_size": 60, ++ "min_update_window_size": 10, ++ "min_std_val": 0.0001, ++ "bias": 0.3, ++ "abs_bias": 0, ++ "nsigma_coefficient": 3, ++ "detect_type": "lower_bound", ++ "min_expert_lower_bound": null, ++ "max_expert_lower_bound": null, ++ "min_expert_upper_bound": null, ++ "max_expert_upper_bound": null ++ } ++ }, ++ "type": "storage" ++ } ++ }, ++ { ++ "metric": "gala_gopher_nic_tx_dropped", ++ "entity_name": "sli", ++ "atrend": "rise", ++ "enable": true, ++ "params": { ++ "metric_type": "host", ++ "method": "sum", ++ "priority": 5, ++ "alarm_filter_window_size": 5, ++ "space_detector": null, ++ "time_detector": { ++ "preprocess_eps": 0.1, ++ "preprocess_min_samples": 10, ++ "type": "SlidingWindowKSigmaDetector", ++ "n_sigma_method": { ++ "type": "SlidingWindowNSigma", ++ "training_window_size": 40, ++ "min_update_window_size": 10, ++ "min_std_val": 0.0001, ++ "bias": 0.05, ++ "abs_bias": 0, ++ "nsigma_coefficient": 4, ++ "detect_type": "upper_bound", ++ "min_expert_lower_bound": null, ++ "max_expert_lower_bound": null, ++ "min_expert_upper_bound": 10, ++ "max_expert_upper_bound": null ++ } ++ }, ++ "type": "network" ++ } ++ }, ++ { ++ "metric": "gala_gopher_nic_tx_errs", ++ "entity_name": "sli", ++ "atrend": "rise", ++ "enable": true, ++ "params": { ++ "metric_type": "host", ++ "method": "sum", ++ "priority": 5, ++ "alarm_filter_window_size": 5, ++ "space_detector": null, ++ "time_detector": { ++ "preprocess_eps": 0.1, ++ "preprocess_min_samples": 10, ++ "type": "SlidingWindowKSigmaDetector", ++ "n_sigma_method": { ++ "type": "SlidingWindowNSigma", ++ "training_window_size": 40, ++ "min_update_window_size": 10, ++ "min_std_val": 0.0001, ++ "bias": 0.05, ++ "abs_bias": 0, ++ "nsigma_coefficient": 4, ++ "detect_type": "upper_bound", ++ "min_expert_lower_bound": null, ++ "max_expert_lower_bound": null, ++ "min_expert_upper_bound": 10, ++ "max_expert_upper_bound": null ++ } ++ }, ++ "type": "network" ++ } ++ }, ++ { ++ "metric": "npu_chip_info_temperature", ++ "entity_name": "sli", ++ "atrend": "rise", ++ "enable": true, ++ "params": { ++ "metric_type": "device", ++ "method": "max", ++ "priority": 25, ++ "alarm_filter_window_size": 12, ++ "space_detector": null, ++ "time_detector": { ++ "preprocess_eps": 0.1, ++ "preprocess_min_samples": 10, ++ "type": "SlidingWindowKSigmaDetector", ++ "n_sigma_method": { ++ "type": "SlidingWindowNSigma", ++ "training_window_size": 40, ++ "min_update_window_size": 10, ++ "min_std_val": 0.0001, ++ "bias": 0.01, ++ "abs_bias": 0, ++ "nsigma_coefficient": 4, ++ "detect_type": "upper_bound", ++ "min_expert_lower_bound": null, ++ "max_expert_lower_bound": null, ++ "min_expert_upper_bound": 70, ++ "max_expert_upper_bound": 100 ++ } ++ }, ++ "type": "compute" ++ } ++ }, ++ { ++ "metric": "npu_chip_info_hbm_used_memory", ++ "entity_name": "sli", ++ "atrend": "rise", ++ "enable": true, ++ "params": { ++ "metric_type": "device", ++ "method": "max", ++ "priority": 30, ++ "alarm_filter_window_size": 5, ++ "space_detector": { ++ "dist_metric": "euclidean", ++ "eps": 0.4, ++ "cv_threshold": 0.03, ++ "min_samples": 2, ++ "window_size": 100, ++ "scaling": false, ++ "type": "SlidingWindowDBSCAN" ++ }, ++ "time_detector": { ++ "preprocess_eps": 0.1, ++ "preprocess_min_samples": 10, ++ "type": "SlidingWindowKSigmaDetector", ++ "n_sigma_method": { ++ "type": "SlidingWindowNSigma", ++ "training_window_size": 40, ++ "min_update_window_size": 10, ++ "min_std_val": 0.0001, ++ "bias": 0.02, ++ "abs_bias": 5, ++ "nsigma_coefficient": 4, ++ "detect_type": "upper_bound", ++ "min_expert_lower_bound": null, ++ "max_expert_lower_bound": null, ++ "min_expert_upper_bound": null, ++ "max_expert_upper_bound": null ++ } ++ }, ++ "type": "compute" ++ } ++ }, ++ { ++ "metric": "npu_chip_info_aicore_current_freq", ++ "entity_name": "sli", ++ "atrend": "rise", ++ "enable": true, ++ "params": { ++ "metric_type": "device", ++ "method": "max", ++ "priority": 30, ++ "alarm_filter_window_size": 5, ++ "space_detector": { ++ "dist_metric": "euclidean", ++ "eps": 0.4, ++ "cv_threshold": 0.03, ++ "min_samples": 2, ++ "window_size": 100, ++ "scaling": true, ++ "type": "SlidingWindowDBSCAN" ++ }, ++ "time_detector": { ++ "preprocess_eps": 0.1, ++ "preprocess_min_samples": 10, ++ "type": "SlidingWindowKSigmaDetector", ++ "n_sigma_method": { ++ "type": "SlidingWindowNSigma", ++ "training_window_size": 40, ++ "min_update_window_size": 10, ++ "min_std_val": 0.0001, ++ "bias": 0.05, ++ "abs_bias": 0, ++ "nsigma_coefficient": 4, ++ "detect_type": "lower_bound", ++ "min_expert_lower_bound": null, ++ "max_expert_lower_bound": null, ++ "min_expert_upper_bound": 10, ++ "max_expert_upper_bound": null ++ } ++ }, ++ "type": "compute" ++ } ++ }, ++ { ++ "metric": "npu_chip_roce_tx_err_pkt_num", ++ "entity_name": "sli", ++ "atrend": "rise", ++ "enable": true, ++ "params": { ++ "metric_type": "device", ++ "method": "max", ++ "priority": 30, ++ "alarm_filter_window_size": 5, ++ "space_detector": null, ++ "time_detector": { ++ "preprocess_eps": 0.1, ++ "preprocess_min_samples": 10, ++ "type": "SlidingWindowKSigmaDetector", ++ "n_sigma_method": { ++ "type": "SlidingWindowNSigma", ++ "training_window_size": 40, ++ "min_update_window_size": 10, ++ "min_std_val": 0.0001, ++ "bias": 0.05, ++ "abs_bias": 0, ++ "nsigma_coefficient": 4, ++ "detect_type": "upper_bound", ++ "min_expert_lower_bound": null, ++ "max_expert_lower_bound": null, ++ "min_expert_upper_bound": 10, ++ "max_expert_upper_bound": null ++ } ++ }, ++ "type": "network" ++ } ++ } ++ ], ++ "features": [ ++ { ++ "metric": "gala_gopher_container_cpu_usage_seconds_total" ++ } ++ ] ++} +\ No newline at end of file +-- +2.33.0 + diff --git a/0002-configure-group-in-json.patch b/0002-configure-group-in-json.patch new file mode 100644 index 0000000..3ec9cad --- /dev/null +++ b/0002-configure-group-in-json.patch @@ -0,0 +1,121 @@ +From acefcbdbb4891aa0b3f1b7afe500b8fdef440806 Mon Sep 17 00:00:00 2001 +From: huangbin +Date: Fri, 8 Nov 2024 16:42:52 +0800 +Subject: [PATCH] configure-group-in-json + +--- + anteater/model/detector/slow_node_detector.py | 28 +++++++++++-------- + config/module/slow_node_detection.job.json | 9 ++++-- + 2 files changed, 24 insertions(+), 13 deletions(-) + +diff --git a/anteater/model/detector/slow_node_detector.py b/anteater/model/detector/slow_node_detector.py +index 15a6cee..d5d10e1 100644 +--- a/anteater/model/detector/slow_node_detector.py ++++ b/anteater/model/detector/slow_node_detector.py +@@ -55,6 +55,9 @@ class SlowNodeDetector(Detector): + hccl_domain = json.load(f_out) + except Exception: + logger.error(f"Read hccl domain info fail!") ++ if not hccl_domain: ++ # 增加手动设置hccl_domain ++ hccl_domain = params.get("hccl_domain", {}) + if os.path.exists(rank_table_path): + try: + with open(rank_table_path, 'r', encoding='utf-8') as f_out: +@@ -106,15 +109,15 @@ class SlowNodeDetector(Detector): + # 获取machine_ids, + machines_to_devices = self.get_machines_to_devices(start, end, kpis) + npu_id2host_id, hosts_ids = self.npu_id2host_id(machines_to_devices) +- + group_dataloader = GroupDataLoader(self.hccl_domain, self.rank_table, machines_to_devices) + group_ranks: list = group_dataloader.get_group_ranks() + all_results = [] + for kpi in kpis: +- for ranks in group_ranks: ++ for index, ranks in enumerate(group_ranks): ++ logger.info(f"Groups-{index}, metric: {kpi.metric}, start detection.") + machine_ids: dict = group_dataloader.rank_table_loader.get_group_nodes_by_ranks(ranks) + host_ids: list = self.get_host_ids_by_npu_ids(machine_ids, npu_id2host_id, hosts_ids) +- group_result = self.group_detect_single_kpi(kpi, machine_ids, host_ids) ++ group_result = self.group_detect_single_kpi(kpi, machine_ids, host_ids, ranks) + all_results.extend(group_result) + + response, all_anomaly_nodes = self.gen_final_alarm(kpis, all_results) +@@ -148,19 +151,26 @@ class SlowNodeDetector(Detector): + + return response, all_anomaly_nodes + +- def group_detect_single_kpi(self, kpi: KPI, machine_ids: dict, host_ids: list) -> list: ++ def group_detect_single_kpi(self, kpi: KPI, machine_ids: dict, host_ids: list, ranks) -> list: + """Detects kpi based on signal time series anomaly detection model""" + # 普罗会一次性抓到所有的数据,需要根据machine_id, device_id去对数据作分组 +- # get数据 + metric_name: str = kpi.metric + + all_machines_ts = [] + for machine_id in machine_ids: + single_machine_ts_list = self.get_kpi_ts_list(metric_name, machine_id, kpi.params) +- all_machines_ts.extend(single_machine_ts_list) ++ if single_machine_ts_list: ++ # 根据ranks匹配组内device的指标 ++ local_ranks = [int(rank) % 8 for rank in ranks] ++ for single_machine_ts in single_machine_ts_list: ++ ts_id = int(single_machine_ts.labels.get("id", -1)) ++ if ts_id in local_ranks: ++ all_machines_ts.append(single_machine_ts) ++ + for host_id in host_ids: + single_machine_ts_list = self.get_kpi_ts_list(metric_name, host_id, kpi.params) + all_machines_ts.extend(single_machine_ts_list) ++ logger.info(f"Metric-{metric_name} single group has data {len(all_machines_ts)}. ranks: {ranks}") + + anomaly_devices = [] + anomaly_locations = {} +@@ -195,7 +205,7 @@ class SlowNodeDetector(Detector): + logger.info( + f"space_nodes_compare result: {self.output_anomaly_devices(metric_name, space_anomaly_locations)}.") + else: +- logger.info(f"Skip space nodes compare, due to nodes number{len(all_machines_ts)} is smaller than 4.") ++ logger.info(f"Skip space nodes compare, due to nodes number {len(all_machines_ts)} is smaller than 4.") + else: + logger.info(f"Skip space nodes compare.") + +@@ -287,10 +297,6 @@ class SlowNodeDetector(Detector): + look_back = self.config.params.get("look_back", 10) + metric_type = kpi_params.get("metric_type", "device") + start, end = dt.last(minutes=look_back) +- # from datetime import timedelta +- # # 6:40 +- # start = start - timedelta(hours=4.5) +- # end = end - timedelta(hours=4.5) + + if metric_type == "device": + # npu device +diff --git a/config/module/slow_node_detection.job.json b/config/module/slow_node_detection.job.json +index 91ff621..27a6d53 100644 +--- a/config/module/slow_node_detection.job.json ++++ b/config/module/slow_node_detection.job.json +@@ -17,7 +17,12 @@ + "obs_size": 5, + "outlier_ratio_th": 0.6, + "hccl_domain_json": "./hccl_domain.json", +- "rank_table_json": "./hccl_domain.json" ++ "hccl_domain":{ ++ "pp": 2, ++ "dp": 1, ++ "tp": 1 ++ }, ++ "rank_table_json": "./rank_table.json" + } + }, + "kpis": [ +@@ -349,4 +354,4 @@ + "metric": "gala_gopher_container_cpu_usage_seconds_total" + } + ] +-} +\ No newline at end of file ++} +-- +2.33.0 + diff --git a/0003-remove-unuse-code.patch b/0003-remove-unuse-code.patch new file mode 100644 index 0000000..253965c --- /dev/null +++ b/0003-remove-unuse-code.patch @@ -0,0 +1,41 @@ +From 74a86e20d09cbe3bba0a776f4a1c72941267f7f5 Mon Sep 17 00:00:00 2001 +From: huangbin +Date: Sat, 9 Nov 2024 16:36:16 +0800 +Subject: [PATCH] remove-unuse-code + +--- + .../model/rca/rca_graph/prune_bidirected_graph.py | 14 -------------- + 1 file changed, 14 deletions(-) + +diff --git a/anteater/model/rca/rca_graph/prune_bidirected_graph.py b/anteater/model/rca/rca_graph/prune_bidirected_graph.py +index 96f3e0b..57c4c8c 100644 +--- a/anteater/model/rca/rca_graph/prune_bidirected_graph.py ++++ b/anteater/model/rca/rca_graph/prune_bidirected_graph.py +@@ -12,7 +12,6 @@ import os + import json + from typing import Dict + import networkx as nx +-import matplotlib.pyplot as plt + from anteater.model.rca.rca_graph.utils import create_meta_graph + from anteater.utils.log import logger + +@@ -126,16 +125,3 @@ class PrunedMetaGraph: + + return self._meta_graph + +-def plot_network(graph, front_end_metric=None): +- # 绘制图 +- pos = nx.circular_layout(graph) # 圆形布局,起到美化作用 +- color_values = [] +- for node in graph.nodes(): +- if node == front_end_metric: +- color_values.append(1.0) +- else: +- color_values.append(0.25) +- nx.draw(graph, pos, with_labels=True, font_weight="bold", node_color=color_values, cmap=plt.get_cmap('viridis')) +- plt.title("DiGraph") +- plt.show() +- +-- +2.43.0 + diff --git a/gala-anteater-1.1.0.tar.gz b/gala-anteater-1.1.0.tar.gz deleted file mode 100644 index bb9c79e..0000000 Binary files a/gala-anteater-1.1.0.tar.gz and /dev/null differ diff --git a/gala-anteater-1.2.1.tar.gz b/gala-anteater-1.2.1.tar.gz new file mode 100644 index 0000000..2f3de1f Binary files /dev/null and b/gala-anteater-1.2.1.tar.gz differ diff --git a/gala-anteater.spec b/gala-anteater.spec index c5c821d..fe637df 100644 --- a/gala-anteater.spec +++ b/gala-anteater.spec @@ -1,8 +1,8 @@ %define debug_package %{nil} Name: gala-anteater -Version: 1.1.0 -Release: 1 +Version: 1.2.1 +Release: 4 Summary: A time-series anomaly detection platform for operating system. License: MulanPSL2 URL: https://gitee.com/openeuler/gala-anteater @@ -11,14 +11,20 @@ BuildRoot: %{_builddir}/%{name}-%{version} BuildRequires: procps-ng python3-setuptools Requires: python3-gala-anteater = %{version}-%{release} +patch0: 0001-add-new-feature-slow-node-detection.patch +patch1: 0002-configure-group-in-json.patch +patch2: 0003-remove-unuse-code.patch + %description Abnormal detection module for A-Ops project + %package -n python3-gala-anteater Summary: Python3 package of gala-anteater Requires: python3-APScheduler python3-kafka-python python3-joblib python3-numpy Requires: python3-pandas python3-requests python3-scikit-learn python3-pytorch -Requires: python3-pyyaml +Requires: python3-pyyaml python3-networkx python3-pyArango python3-statsmodels + %description -n python3-gala-anteater Python3 package of gala-anteater @@ -56,14 +62,17 @@ fi %config(noreplace) %{_sysconfdir}/%{name}/config/gala-anteater.yaml %config(noreplace) %{_sysconfdir}/%{name}/config/log.settings.ini %config(noreplace) %{_sysconfdir}/%{name}/module/app_sli_rtt.job.json +%config(noreplace) %{_sysconfdir}/%{name}/module/container_disruption.job.json %config(noreplace) %{_sysconfdir}/%{name}/module/disk_throughput.job.json %config(noreplace) %{_sysconfdir}/%{name}/module/jvm_oom.job.json %config(noreplace) %{_sysconfdir}/%{name}/module/proc_io_latency.job.json +%config(noreplace) %{_sysconfdir}/%{name}/module/rca.job.json %config(noreplace) %{_sysconfdir}/%{name}/module/sys_io_latency.job.json %config(noreplace) %{_sysconfdir}/%{name}/module/sys_nic_loss.job.json %config(noreplace) %{_sysconfdir}/%{name}/module/sys_tcp_establish.job.json %config(noreplace) %{_sysconfdir}/%{name}/module/sys_tcp_transmission_latency.job.json %config(noreplace) %{_sysconfdir}/%{name}/module/usad_model.job.json +%config(noreplace) %{_sysconfdir}/%{name}/module/slow_node_detection.job.json /usr/lib/systemd/system/gala-anteater.service %files -n python3-gala-anteater @@ -72,6 +81,27 @@ fi %changelog +* Mon Nov 11 2024 huangbin - 1.2.1-4 +- solve conflict. + +* Sat Nov 9 2024 huangbin - 1.2.1-3 +- remove unuse code plot network. + +* Fri Nov 8 2024 huangbin - 1.2.1-2 +- Add new feature slow node detection. + +* Tue Nov 5 2024 huangbin - 1.2.1-1 +- Add new feature slow node detection. + +* Sat Sep 21 2024 huangbin - 1.2.0-3 +- Fixbug with rca time range extend. + +* Wed Sep 18 2024 huangbin - 1.2.0-2 +- Fixbug with large value exceed + +* Sat Aug 31 2024 huangbin - 1.2.0-1 +- Upgrade anteater version to 1.2.0 + * Thu Aug 31 2023 Li Zhenxing - 1.1.0-1 - Upgrade anteater version to 1.1.0 diff --git a/gala-anteater.yaml b/gala-anteater.yaml index 668f0aa..e7dc027 100644 --- a/gala-anteater.yaml +++ b/gala-anteater.yaml @@ -1,4 +1,4 @@ -seperator: . -src_repo: openeuler/gala -tag_prefix: ^ -version_control: gitee.com +seperator: . +src_repo: openeuler/gala +tag_prefix: ^ +version_control: gitee.com