From dd60181b3f81772383247fa71863e58f9e68d373 Mon Sep 17 00:00:00 2001 From: MaSkingx Date: Mon, 21 Apr 2025 01:47:26 +0000 Subject: [PATCH] discruption case refactoring Signed-off-by: MaSkingx --- 0007-discruption-case-refactoring.patch | 311 ++++++++++++++++++++++++ gala-anteater.spec | 6 +- 2 files changed, 316 insertions(+), 1 deletion(-) create mode 100644 0007-discruption-case-refactoring.patch diff --git a/0007-discruption-case-refactoring.patch b/0007-discruption-case-refactoring.patch new file mode 100644 index 0000000..43500e8 --- /dev/null +++ b/0007-discruption-case-refactoring.patch @@ -0,0 +1,311 @@ +From ccb733aa3c4eceb596e8f9a9d39cec78a16a6320 Mon Sep 17 00:00:00 2001 +From: wangfenglai1 +Date: Mon, 20 Jan 2025 09:42:49 +0800 +Subject: [PATCH] discruption case refactoring + +--- + .../model/detector/disruption_detector.py | 138 ++++++++++++++---- + config/module/container_disruption.job.json | 13 +- + 2 files changed, 119 insertions(+), 32 deletions(-) + +diff --git a/anteater/model/detector/disruption_detector.py b/anteater/model/detector/disruption_detector.py +index 7a5a998..90ccfd3 100644 +--- a/anteater/model/detector/disruption_detector.py ++++ b/anteater/model/detector/disruption_detector.py +@@ -10,15 +10,17 @@ + # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + # See the Mulan PSL v2 for more details. + # ******************************************************************************/ ++from typing import Dict, List, Tuple + from abc import ABC ++from datetime import datetime ++import pytz ++from math import floor ++from itertools import groupby + ++import requests + import numpy as np + import pandas as pd + +-from itertools import groupby +-from typing import List, Tuple +-from math import floor +- + from anteater.core.ts import TimeSeries + from anteater.core.anomaly import Anomaly, RootCause + from anteater.core.kpi import KPI, ModelConfig, Feature +@@ -42,7 +44,11 @@ class ContainerDisruptionDetector(Detector): + self.q = 1e-3 + self.level = 0.98 + self.smooth_win = 3 +- ++ ++ self.container_num = 0 ++ self.start_time = None ++ self.end_time = None ++ + @timer + def _execute(self, kpis: List[KPI], features: List[Feature], **kwargs) \ + -> List[Anomaly]: +@@ -52,13 +58,17 @@ class ContainerDisruptionDetector(Detector): + return anomalies + + def detect_and_rca(self, kpis: List[KPI]): +- start, end = dt.last(minutes=1) ++ start, end = dt.last(minutes=20) + machine_ids = self.get_unique_machine_id(start, end, kpis) + anomalies = [] + for _id in machine_ids: + for kpi in kpis: + anomalies.extend(self.detect_signal_kpi(kpi, _id)) + ++ logger.info('total machine number is %d, container number is %d .', ++ len(machine_ids), self.container_num) ++ self.container_num = 0 ++ + return anomalies + + def detect_signal_kpi(self, kpi, machine_id: str) -> List[Anomaly]: +@@ -74,14 +84,19 @@ class ContainerDisruptionDetector(Detector): + def get_kpi_ts_list(self, metric, machine_id: str, look_back): + + if GlobalVariable.is_test_model: +- strat_time, end_time = GlobalVariable.start_time, GlobalVariable.end_time ++ start_time, end_time = GlobalVariable.start_time, GlobalVariable.end_time ++ self.start_time = start_time ++ self.end_time = end_time + + ts_list = self.data_loader.get_metric( +- strat_time, end_time, metric, machine_id=machine_id) ++ start_time, end_time, metric, machine_id=machine_id) + point_count = self.data_loader.expected_point_length(strat_time, end_time) + + else: + start, end = dt.last(minutes=look_back) ++ self.start_time = start ++ self.end_time = end ++ + point_count = self.data_loader.expected_point_length(start, end) + ts_list = self.data_loader.get_metric( + start, end, metric, machine_id=machine_id) +@@ -106,14 +121,14 @@ class ContainerDisruptionDetector(Detector): + score=float(_score), + entity_name=kpi.entity_name, + root_causes=_root_causes, +- details={'event_source': 'spot'}) +- for _ts, _score, _root_causes in ts_scores ++ details={'event_source': 'spot', 'info': _extra_info}) ++ for _ts, _score, _extra_info, _root_causes in ts_scores + ] + + return anomalies + + def cal_spot_score(self, metric, machine_id: str, **kwargs) \ +- -> List[Tuple[TimeSeries, int, List[RootCause]]]: ++ -> List[Tuple[TimeSeries, int, Dict, List[RootCause]]]: + """Calculates metrics' ab score based on n-sigma method""" + look_back = kwargs.get('look_back') + obs_size = kwargs.get('obs_size') +@@ -121,7 +136,13 @@ class ContainerDisruptionDetector(Detector): + + point_count, ts_list = self.get_kpi_ts_list(metric, machine_id, look_back) + ts_scores = [] ++ root_causes = [] ++ extra_info = {} ++ logger.info('machine %s, total detected %d containers.', ++ machine_id, len(ts_list)) ++ self.container_num += len(ts_list) + for _ts in ts_list: ++ # import pdb;pdb.set_trace() + detect_result = ts_dbscan_detector.detect(_ts.values) + if len(detect_result) != len(_ts.values): + raise "" +@@ -130,7 +151,7 @@ class ContainerDisruptionDetector(Detector): + dedup_values = [k for k, g in groupby(test_data)] + train_dedup_values = [k for k, g in groupby(train_data)] + if sum(_ts.values) == 0 or \ +- np.max(_ts.values) < 1e8 or \ ++ np.max(_ts.values) < 1e3 or \ + len(_ts.values) < point_count * 0.6 or \ + len(_ts.values) > point_count * 1.5 or \ + all(x == _ts.values[0] for x in _ts.values) or\ +@@ -157,7 +178,7 @@ class ContainerDisruptionDetector(Detector): + noise_ratio = np.random.randint(-1e5, 1e5, size=_ts_series_train.shape) / 1e6 + noise_data = noise_ratio * _ts_series_train + _ts_series_train += noise_data +- logger.warning("peak data are same.") ++ # logger.warning("peak data are same.") + + _ts_series_train, mean, std = Normalization.clip_transform( + _ts_series_train[np.newaxis, :], is_clip=False) +@@ -177,23 +198,38 @@ class ContainerDisruptionDetector(Detector): + bound_result = np.array(_ts_series_test > thr_with_alarms["thresholds"], dtype=np.int32) + result += bound_result + output = np.sum(result) +- print('data: ', _ts.values) +- print('spot result: ', result) +- logger.info('spot detected: %d , total: %d , metric: %s, image: %s', +- output, obs_size, _ts.metric, _ts.labels['container_id']) ++ if output >= 3: ++ print('data: ', _ts.values) ++ print('spot result: ', result) ++ container_hostname = _ts.labels.get('container_name', '') ++ machine_id = _ts.labels.get('machine_id', '') ++ logger.info('spot detected: %d , total: %d , metric: %s, host_name: %s', ++ output, obs_size, _ts.metric, container_hostname) ++ ++ extra_info = self.get_container_extra_info(machine_id, ++ container_hostname, ++ self.start_time, ++ self.end_time, ++ obs_size) ++ print('extra_info', extra_info) ++ root_causes = self.find_discruption_source(_ts, ts_list) + + score = divide(output, obs_size) +- +- root_causes = self.find_discruption_source(_ts, ts_list) +- ts_scores.append((_ts, score, root_causes)) ++ ++ ts_scores.append((_ts, score, extra_info, root_causes)) + + return ts_scores + + def find_discruption_source(self, victim_ts: TimeSeries, all_ts: List[TimeSeries]) \ + -> List[RootCause]: + root_causes = [] ++ tmp_causes = [] + for ts in all_ts: +- if ts is victim_ts: ++ # container_hostname = ts.labels.get('container_hostname', '') ++ # info = self.queryContainerInfo(container_hostname) if container_hostname else {} ++ # cpu_num = info.get('cpu', 0) ++ cpu_num = int(ts.labels.get('cpu_num', '0')) ++ if ts is victim_ts and cpu_num < 5: + continue + + agg_data = [] +@@ -213,15 +249,54 @@ class ContainerDisruptionDetector(Detector): + + sorted_metrics_correlation = abs(metrics_correlation.iloc[0]).sort_values(ascending=False) + # print("sorted_metrics_correlation:", sorted_metrics_correlation) ++ ++ causes = { ++ 'score': round(sorted_metrics_correlation.values[-1], 3), ++ 'cpu_num': cpu_num, ++ 'metric': ts.metric, ++ 'labels':ts.labels ++ } ++ causes['labels']['cpu_num'] = cpu_num ++ ++ if causes['score'] > 0.5: ++ tmp_causes.append(causes) ++ ++ # root_causes.append(RootCause( ++ # metric=ts.metric, ++ # labels=ts.labels, ++ # score=round(sorted_metrics_correlation.values[-1], 3))) + +- root_causes.append(RootCause( +- metric=ts.metric, +- labels=ts.labels, +- score=round(sorted_metrics_correlation.values[-1], 3))) ++ tmp_causes.sort(key=lambda x: (x['labels']['cpu_num'], x['score']), reverse=True) + +- root_causes.sort(key=lambda x: x.score, reverse=True) ++ root_causes = [RootCause(metric=causes['metric'], labels=causes['labels'], score=causes['score']) ++ for causes in tmp_causes] ++ ++ # print("root_causes:", root_causes) + + return root_causes[:3] ++ ++ def get_container_extra_info(self, machine_id: str, ++ container_name: str, ++ start_time: datetime, ++ end_time: datetime, ++ obs_size: int) -> Dict: ++ extra_metrics = self.config.params.get('extra_metrics', '').split(',') ++ # print(extra_metrics) ++ result = {'container_name': container_name, ++ 'machine_id': machine_id} ++ for metric in extra_metrics: ++ ts_list = self.data_loader.get_metric(start_time, end_time, metric, machine_id=machine_id) ++ for _ts in ts_list: ++ if container_name == _ts.labels.get('container_name', ''): ++ values = _ts.values ++ trend = self.cal_trend(values, obs_size) ++ result[metric] = trend ++ result['appkey'] = _ts.labels.get('appkey', '') ++ result['cpu_num'] = int(_ts.labels.get('cpu_num', '0')) ++ # print("***", container_name, metric, _ts.values, trend) ++ break ++ return result ++ + + @staticmethod + def _normalize_df(df): +@@ -276,3 +351,14 @@ class ContainerDisruptionDetector(Detector): + smooth_win = self.smooth_win + + return smooth_win ++ ++ @staticmethod ++ def cal_trend(metric_values: list, obs_size: int) -> float: ++ pre = metric_values[:-obs_size] ++ check = metric_values[-obs_size:] ++ pre_mean = np.mean(pre) ++ check_mean = np.mean(check) ++ trend = (check_mean - pre_mean) / pre_mean if pre_mean > 0 else 0.0 ++ # print('trend: ', round(trend, 3), pre_mean, check_mean, '|||', pre, check) ++ ++ return round(trend, 3) +diff --git a/config/module/container_disruption.job.json b/config/module/container_disruption.job.json +index c96f06f..c627b33 100644 +--- a/config/module/container_disruption.job.json ++++ b/config/module/container_disruption.job.json +@@ -1,6 +1,6 @@ + { + "name": "ContainerDisruptionDetector", +- "enable": false, ++ "enable": true, + "job_type": "anomaly_detection", + "keywords": [ + "app" +@@ -10,15 +10,15 @@ + "template": "simple", + "kpis": [ + { +- "metric": "gala_gopher_sli_container_cpu_rundelay", ++ "metric": "sli.cpu.schedlat_rundelay", + "entity_name": "sli", + "atrend": "rise", + "enable": true, + "params": { + "method": "max", +- "look_back": 15, +- "obs_size": 5, +- "outlier_ratio_th": 0.6, ++ "look_back": 20, ++ "obs_size": 10, ++ "outlier_ratio_th": 0.5, + "smooth_params": { + "method": "conv_smooth", + "box_pts": 3 +@@ -34,6 +34,7 @@ + "min_train_hours": 24, + "min_predict_minutes": 5, + "top_n": 20, ++ "extra_metrics": "cpu.busy,load.1min,octo_service.tp999,octo_service.tp99", + "preprocessor": { + "smooth_type": "rolling", + "smooth_window": 18, +@@ -119,4 +120,4 @@ + "metric": "gala_gopher_container_fs_inodes_total" + } + ] +-} +\ No newline at end of file ++} +-- +Gitee diff --git a/gala-anteater.spec b/gala-anteater.spec index 78cbf77..33aa3cd 100644 --- a/gala-anteater.spec +++ b/gala-anteater.spec @@ -2,7 +2,7 @@ Name: gala-anteater Version: 1.2.1 -Release: 7 +Release: 8 Summary: A time-series anomaly detection platform for operating system. License: MulanPSL2 URL: https://gitee.com/openeuler/gala-anteater @@ -17,6 +17,7 @@ patch2: 0003-remove-unuse-code.patch patch3: 0004-fixbug-wrong-label-for-dbscan.patch patch4: 0005-add-detect-type-for-usad-model.patch patch5: 0006-add-local-file-test-case.patch +patch6: 0007-discruption-case-refactoring.patch %description Abnormal detection module for A-Ops project @@ -84,6 +85,9 @@ fi %changelog +* Mon Apr 21 2025 maxin - 1.2.1-8 +- discruption case refactoring + * Fri Apr 18 2025 maxin - 1.2.1-7 - add local file test case