From ccb733aa3c4eceb596e8f9a9d39cec78a16a6320 Mon Sep 17 00:00:00 2001 From: wangfenglai1 Date: Mon, 20 Jan 2025 09:42:49 +0800 Subject: [PATCH] discruption case refactoring --- .../model/detector/disruption_detector.py | 138 ++++++++++++++---- config/module/container_disruption.job.json | 13 +- 2 files changed, 119 insertions(+), 32 deletions(-) diff --git a/anteater/model/detector/disruption_detector.py b/anteater/model/detector/disruption_detector.py index 7a5a998..90ccfd3 100644 --- a/anteater/model/detector/disruption_detector.py +++ b/anteater/model/detector/disruption_detector.py @@ -10,15 +10,17 @@ # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. # ******************************************************************************/ +from typing import Dict, List, Tuple from abc import ABC +from datetime import datetime +import pytz +from math import floor +from itertools import groupby +import requests import numpy as np import pandas as pd -from itertools import groupby -from typing import List, Tuple -from math import floor - from anteater.core.ts import TimeSeries from anteater.core.anomaly import Anomaly, RootCause from anteater.core.kpi import KPI, ModelConfig, Feature @@ -42,7 +44,11 @@ class ContainerDisruptionDetector(Detector): self.q = 1e-3 self.level = 0.98 self.smooth_win = 3 - + + self.container_num = 0 + self.start_time = None + self.end_time = None + @timer def _execute(self, kpis: List[KPI], features: List[Feature], **kwargs) \ -> List[Anomaly]: @@ -52,13 +58,17 @@ class ContainerDisruptionDetector(Detector): return anomalies def detect_and_rca(self, kpis: List[KPI]): - start, end = dt.last(minutes=1) + start, end = dt.last(minutes=20) machine_ids = self.get_unique_machine_id(start, end, kpis) anomalies = [] for _id in machine_ids: for kpi in kpis: anomalies.extend(self.detect_signal_kpi(kpi, _id)) + logger.info('total machine number is %d, container number is %d .', + len(machine_ids), self.container_num) + self.container_num = 0 + return anomalies def detect_signal_kpi(self, kpi, machine_id: str) -> List[Anomaly]: @@ -74,14 +84,19 @@ class ContainerDisruptionDetector(Detector): def get_kpi_ts_list(self, metric, machine_id: str, look_back): if GlobalVariable.is_test_model: - strat_time, end_time = GlobalVariable.start_time, GlobalVariable.end_time + start_time, end_time = GlobalVariable.start_time, GlobalVariable.end_time + self.start_time = start_time + self.end_time = end_time ts_list = self.data_loader.get_metric( - strat_time, end_time, metric, machine_id=machine_id) + start_time, end_time, metric, machine_id=machine_id) point_count = self.data_loader.expected_point_length(strat_time, end_time) else: start, end = dt.last(minutes=look_back) + self.start_time = start + self.end_time = end + point_count = self.data_loader.expected_point_length(start, end) ts_list = self.data_loader.get_metric( start, end, metric, machine_id=machine_id) @@ -106,14 +121,14 @@ class ContainerDisruptionDetector(Detector): score=float(_score), entity_name=kpi.entity_name, root_causes=_root_causes, - details={'event_source': 'spot'}) - for _ts, _score, _root_causes in ts_scores + details={'event_source': 'spot', 'info': _extra_info}) + for _ts, _score, _extra_info, _root_causes in ts_scores ] return anomalies def cal_spot_score(self, metric, machine_id: str, **kwargs) \ - -> List[Tuple[TimeSeries, int, List[RootCause]]]: + -> List[Tuple[TimeSeries, int, Dict, List[RootCause]]]: """Calculates metrics' ab score based on n-sigma method""" look_back = kwargs.get('look_back') obs_size = kwargs.get('obs_size') @@ -121,7 +136,13 @@ class ContainerDisruptionDetector(Detector): point_count, ts_list = self.get_kpi_ts_list(metric, machine_id, look_back) ts_scores = [] + root_causes = [] + extra_info = {} + logger.info('machine %s, total detected %d containers.', + machine_id, len(ts_list)) + self.container_num += len(ts_list) for _ts in ts_list: + # import pdb;pdb.set_trace() detect_result = ts_dbscan_detector.detect(_ts.values) if len(detect_result) != len(_ts.values): raise "" @@ -130,7 +151,7 @@ class ContainerDisruptionDetector(Detector): dedup_values = [k for k, g in groupby(test_data)] train_dedup_values = [k for k, g in groupby(train_data)] if sum(_ts.values) == 0 or \ - np.max(_ts.values) < 1e8 or \ + np.max(_ts.values) < 1e3 or \ len(_ts.values) < point_count * 0.6 or \ len(_ts.values) > point_count * 1.5 or \ all(x == _ts.values[0] for x in _ts.values) or\ @@ -157,7 +178,7 @@ class ContainerDisruptionDetector(Detector): noise_ratio = np.random.randint(-1e5, 1e5, size=_ts_series_train.shape) / 1e6 noise_data = noise_ratio * _ts_series_train _ts_series_train += noise_data - logger.warning("peak data are same.") + # logger.warning("peak data are same.") _ts_series_train, mean, std = Normalization.clip_transform( _ts_series_train[np.newaxis, :], is_clip=False) @@ -177,23 +198,38 @@ class ContainerDisruptionDetector(Detector): bound_result = np.array(_ts_series_test > thr_with_alarms["thresholds"], dtype=np.int32) result += bound_result output = np.sum(result) - print('data: ', _ts.values) - print('spot result: ', result) - logger.info('spot detected: %d , total: %d , metric: %s, image: %s', - output, obs_size, _ts.metric, _ts.labels['container_id']) + if output >= 3: + print('data: ', _ts.values) + print('spot result: ', result) + container_hostname = _ts.labels.get('container_name', '') + machine_id = _ts.labels.get('machine_id', '') + logger.info('spot detected: %d , total: %d , metric: %s, host_name: %s', + output, obs_size, _ts.metric, container_hostname) + + extra_info = self.get_container_extra_info(machine_id, + container_hostname, + self.start_time, + self.end_time, + obs_size) + print('extra_info', extra_info) + root_causes = self.find_discruption_source(_ts, ts_list) score = divide(output, obs_size) - - root_causes = self.find_discruption_source(_ts, ts_list) - ts_scores.append((_ts, score, root_causes)) + + ts_scores.append((_ts, score, extra_info, root_causes)) return ts_scores def find_discruption_source(self, victim_ts: TimeSeries, all_ts: List[TimeSeries]) \ -> List[RootCause]: root_causes = [] + tmp_causes = [] for ts in all_ts: - if ts is victim_ts: + # container_hostname = ts.labels.get('container_hostname', '') + # info = self.queryContainerInfo(container_hostname) if container_hostname else {} + # cpu_num = info.get('cpu', 0) + cpu_num = int(ts.labels.get('cpu_num', '0')) + if ts is victim_ts and cpu_num < 5: continue agg_data = [] @@ -213,15 +249,54 @@ class ContainerDisruptionDetector(Detector): sorted_metrics_correlation = abs(metrics_correlation.iloc[0]).sort_values(ascending=False) # print("sorted_metrics_correlation:", sorted_metrics_correlation) + + causes = { + 'score': round(sorted_metrics_correlation.values[-1], 3), + 'cpu_num': cpu_num, + 'metric': ts.metric, + 'labels':ts.labels + } + causes['labels']['cpu_num'] = cpu_num + + if causes['score'] > 0.5: + tmp_causes.append(causes) + + # root_causes.append(RootCause( + # metric=ts.metric, + # labels=ts.labels, + # score=round(sorted_metrics_correlation.values[-1], 3))) - root_causes.append(RootCause( - metric=ts.metric, - labels=ts.labels, - score=round(sorted_metrics_correlation.values[-1], 3))) + tmp_causes.sort(key=lambda x: (x['labels']['cpu_num'], x['score']), reverse=True) - root_causes.sort(key=lambda x: x.score, reverse=True) + root_causes = [RootCause(metric=causes['metric'], labels=causes['labels'], score=causes['score']) + for causes in tmp_causes] + + # print("root_causes:", root_causes) return root_causes[:3] + + def get_container_extra_info(self, machine_id: str, + container_name: str, + start_time: datetime, + end_time: datetime, + obs_size: int) -> Dict: + extra_metrics = self.config.params.get('extra_metrics', '').split(',') + # print(extra_metrics) + result = {'container_name': container_name, + 'machine_id': machine_id} + for metric in extra_metrics: + ts_list = self.data_loader.get_metric(start_time, end_time, metric, machine_id=machine_id) + for _ts in ts_list: + if container_name == _ts.labels.get('container_name', ''): + values = _ts.values + trend = self.cal_trend(values, obs_size) + result[metric] = trend + result['appkey'] = _ts.labels.get('appkey', '') + result['cpu_num'] = int(_ts.labels.get('cpu_num', '0')) + # print("***", container_name, metric, _ts.values, trend) + break + return result + @staticmethod def _normalize_df(df): @@ -276,3 +351,14 @@ class ContainerDisruptionDetector(Detector): smooth_win = self.smooth_win return smooth_win + + @staticmethod + def cal_trend(metric_values: list, obs_size: int) -> float: + pre = metric_values[:-obs_size] + check = metric_values[-obs_size:] + pre_mean = np.mean(pre) + check_mean = np.mean(check) + trend = (check_mean - pre_mean) / pre_mean if pre_mean > 0 else 0.0 + # print('trend: ', round(trend, 3), pre_mean, check_mean, '|||', pre, check) + + return round(trend, 3) diff --git a/config/module/container_disruption.job.json b/config/module/container_disruption.job.json index c96f06f..c627b33 100644 --- a/config/module/container_disruption.job.json +++ b/config/module/container_disruption.job.json @@ -1,6 +1,6 @@ { "name": "ContainerDisruptionDetector", - "enable": false, + "enable": true, "job_type": "anomaly_detection", "keywords": [ "app" @@ -10,15 +10,15 @@ "template": "simple", "kpis": [ { - "metric": "gala_gopher_sli_container_cpu_rundelay", + "metric": "sli.cpu.schedlat_rundelay", "entity_name": "sli", "atrend": "rise", "enable": true, "params": { "method": "max", - "look_back": 15, - "obs_size": 5, - "outlier_ratio_th": 0.6, + "look_back": 20, + "obs_size": 10, + "outlier_ratio_th": 0.5, "smooth_params": { "method": "conv_smooth", "box_pts": 3 @@ -34,6 +34,7 @@ "min_train_hours": 24, "min_predict_minutes": 5, "top_n": 20, + "extra_metrics": "cpu.busy,load.1min,octo_service.tp999,octo_service.tp99", "preprocessor": { "smooth_type": "rolling", "smooth_window": 18, @@ -119,4 +120,4 @@ "metric": "gala_gopher_container_fs_inodes_total" } ] -} \ No newline at end of file +} -- Gitee