gala-anteater/0007-discruption-case-refactoring.patch
MaSkingx dd60181b3f
discruption case refactoring
Signed-off-by: MaSkingx <maxin@xfusion.com>
2025-04-21 01:47:26 +00:00

312 lines
13 KiB
Diff

From ccb733aa3c4eceb596e8f9a9d39cec78a16a6320 Mon Sep 17 00:00:00 2001
From: wangfenglai1 <wangfenglai1@huawei.com>
Date: Mon, 20 Jan 2025 09:42:49 +0800
Subject: [PATCH] discruption case refactoring
---
.../model/detector/disruption_detector.py | 138 ++++++++++++++----
config/module/container_disruption.job.json | 13 +-
2 files changed, 119 insertions(+), 32 deletions(-)
diff --git a/anteater/model/detector/disruption_detector.py b/anteater/model/detector/disruption_detector.py
index 7a5a998..90ccfd3 100644
--- a/anteater/model/detector/disruption_detector.py
+++ b/anteater/model/detector/disruption_detector.py
@@ -10,15 +10,17 @@
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# ******************************************************************************/
+from typing import Dict, List, Tuple
from abc import ABC
+from datetime import datetime
+import pytz
+from math import floor
+from itertools import groupby
+import requests
import numpy as np
import pandas as pd
-from itertools import groupby
-from typing import List, Tuple
-from math import floor
-
from anteater.core.ts import TimeSeries
from anteater.core.anomaly import Anomaly, RootCause
from anteater.core.kpi import KPI, ModelConfig, Feature
@@ -42,7 +44,11 @@ class ContainerDisruptionDetector(Detector):
self.q = 1e-3
self.level = 0.98
self.smooth_win = 3
-
+
+ self.container_num = 0
+ self.start_time = None
+ self.end_time = None
+
@timer
def _execute(self, kpis: List[KPI], features: List[Feature], **kwargs) \
-> List[Anomaly]:
@@ -52,13 +58,17 @@ class ContainerDisruptionDetector(Detector):
return anomalies
def detect_and_rca(self, kpis: List[KPI]):
- start, end = dt.last(minutes=1)
+ start, end = dt.last(minutes=20)
machine_ids = self.get_unique_machine_id(start, end, kpis)
anomalies = []
for _id in machine_ids:
for kpi in kpis:
anomalies.extend(self.detect_signal_kpi(kpi, _id))
+ logger.info('total machine number is %d, container number is %d .',
+ len(machine_ids), self.container_num)
+ self.container_num = 0
+
return anomalies
def detect_signal_kpi(self, kpi, machine_id: str) -> List[Anomaly]:
@@ -74,14 +84,19 @@ class ContainerDisruptionDetector(Detector):
def get_kpi_ts_list(self, metric, machine_id: str, look_back):
if GlobalVariable.is_test_model:
- strat_time, end_time = GlobalVariable.start_time, GlobalVariable.end_time
+ start_time, end_time = GlobalVariable.start_time, GlobalVariable.end_time
+ self.start_time = start_time
+ self.end_time = end_time
ts_list = self.data_loader.get_metric(
- strat_time, end_time, metric, machine_id=machine_id)
+ start_time, end_time, metric, machine_id=machine_id)
point_count = self.data_loader.expected_point_length(strat_time, end_time)
else:
start, end = dt.last(minutes=look_back)
+ self.start_time = start
+ self.end_time = end
+
point_count = self.data_loader.expected_point_length(start, end)
ts_list = self.data_loader.get_metric(
start, end, metric, machine_id=machine_id)
@@ -106,14 +121,14 @@ class ContainerDisruptionDetector(Detector):
score=float(_score),
entity_name=kpi.entity_name,
root_causes=_root_causes,
- details={'event_source': 'spot'})
- for _ts, _score, _root_causes in ts_scores
+ details={'event_source': 'spot', 'info': _extra_info})
+ for _ts, _score, _extra_info, _root_causes in ts_scores
]
return anomalies
def cal_spot_score(self, metric, machine_id: str, **kwargs) \
- -> List[Tuple[TimeSeries, int, List[RootCause]]]:
+ -> List[Tuple[TimeSeries, int, Dict, List[RootCause]]]:
"""Calculates metrics' ab score based on n-sigma method"""
look_back = kwargs.get('look_back')
obs_size = kwargs.get('obs_size')
@@ -121,7 +136,13 @@ class ContainerDisruptionDetector(Detector):
point_count, ts_list = self.get_kpi_ts_list(metric, machine_id, look_back)
ts_scores = []
+ root_causes = []
+ extra_info = {}
+ logger.info('machine %s, total detected %d containers.',
+ machine_id, len(ts_list))
+ self.container_num += len(ts_list)
for _ts in ts_list:
+ # import pdb;pdb.set_trace()
detect_result = ts_dbscan_detector.detect(_ts.values)
if len(detect_result) != len(_ts.values):
raise ""
@@ -130,7 +151,7 @@ class ContainerDisruptionDetector(Detector):
dedup_values = [k for k, g in groupby(test_data)]
train_dedup_values = [k for k, g in groupby(train_data)]
if sum(_ts.values) == 0 or \
- np.max(_ts.values) < 1e8 or \
+ np.max(_ts.values) < 1e3 or \
len(_ts.values) < point_count * 0.6 or \
len(_ts.values) > point_count * 1.5 or \
all(x == _ts.values[0] for x in _ts.values) or\
@@ -157,7 +178,7 @@ class ContainerDisruptionDetector(Detector):
noise_ratio = np.random.randint(-1e5, 1e5, size=_ts_series_train.shape) / 1e6
noise_data = noise_ratio * _ts_series_train
_ts_series_train += noise_data
- logger.warning("peak data are same.")
+ # logger.warning("peak data are same.")
_ts_series_train, mean, std = Normalization.clip_transform(
_ts_series_train[np.newaxis, :], is_clip=False)
@@ -177,23 +198,38 @@ class ContainerDisruptionDetector(Detector):
bound_result = np.array(_ts_series_test > thr_with_alarms["thresholds"], dtype=np.int32)
result += bound_result
output = np.sum(result)
- print('data: ', _ts.values)
- print('spot result: ', result)
- logger.info('spot detected: %d , total: %d , metric: %s, image: %s',
- output, obs_size, _ts.metric, _ts.labels['container_id'])
+ if output >= 3:
+ print('data: ', _ts.values)
+ print('spot result: ', result)
+ container_hostname = _ts.labels.get('container_name', '')
+ machine_id = _ts.labels.get('machine_id', '')
+ logger.info('spot detected: %d , total: %d , metric: %s, host_name: %s',
+ output, obs_size, _ts.metric, container_hostname)
+
+ extra_info = self.get_container_extra_info(machine_id,
+ container_hostname,
+ self.start_time,
+ self.end_time,
+ obs_size)
+ print('extra_info', extra_info)
+ root_causes = self.find_discruption_source(_ts, ts_list)
score = divide(output, obs_size)
-
- root_causes = self.find_discruption_source(_ts, ts_list)
- ts_scores.append((_ts, score, root_causes))
+
+ ts_scores.append((_ts, score, extra_info, root_causes))
return ts_scores
def find_discruption_source(self, victim_ts: TimeSeries, all_ts: List[TimeSeries]) \
-> List[RootCause]:
root_causes = []
+ tmp_causes = []
for ts in all_ts:
- if ts is victim_ts:
+ # container_hostname = ts.labels.get('container_hostname', '')
+ # info = self.queryContainerInfo(container_hostname) if container_hostname else {}
+ # cpu_num = info.get('cpu', 0)
+ cpu_num = int(ts.labels.get('cpu_num', '0'))
+ if ts is victim_ts and cpu_num < 5:
continue
agg_data = []
@@ -213,15 +249,54 @@ class ContainerDisruptionDetector(Detector):
sorted_metrics_correlation = abs(metrics_correlation.iloc[0]).sort_values(ascending=False)
# print("sorted_metrics_correlation:", sorted_metrics_correlation)
+
+ causes = {
+ 'score': round(sorted_metrics_correlation.values[-1], 3),
+ 'cpu_num': cpu_num,
+ 'metric': ts.metric,
+ 'labels':ts.labels
+ }
+ causes['labels']['cpu_num'] = cpu_num
+
+ if causes['score'] > 0.5:
+ tmp_causes.append(causes)
+
+ # root_causes.append(RootCause(
+ # metric=ts.metric,
+ # labels=ts.labels,
+ # score=round(sorted_metrics_correlation.values[-1], 3)))
- root_causes.append(RootCause(
- metric=ts.metric,
- labels=ts.labels,
- score=round(sorted_metrics_correlation.values[-1], 3)))
+ tmp_causes.sort(key=lambda x: (x['labels']['cpu_num'], x['score']), reverse=True)
- root_causes.sort(key=lambda x: x.score, reverse=True)
+ root_causes = [RootCause(metric=causes['metric'], labels=causes['labels'], score=causes['score'])
+ for causes in tmp_causes]
+
+ # print("root_causes:", root_causes)
return root_causes[:3]
+
+ def get_container_extra_info(self, machine_id: str,
+ container_name: str,
+ start_time: datetime,
+ end_time: datetime,
+ obs_size: int) -> Dict:
+ extra_metrics = self.config.params.get('extra_metrics', '').split(',')
+ # print(extra_metrics)
+ result = {'container_name': container_name,
+ 'machine_id': machine_id}
+ for metric in extra_metrics:
+ ts_list = self.data_loader.get_metric(start_time, end_time, metric, machine_id=machine_id)
+ for _ts in ts_list:
+ if container_name == _ts.labels.get('container_name', ''):
+ values = _ts.values
+ trend = self.cal_trend(values, obs_size)
+ result[metric] = trend
+ result['appkey'] = _ts.labels.get('appkey', '')
+ result['cpu_num'] = int(_ts.labels.get('cpu_num', '0'))
+ # print("***", container_name, metric, _ts.values, trend)
+ break
+ return result
+
@staticmethod
def _normalize_df(df):
@@ -276,3 +351,14 @@ class ContainerDisruptionDetector(Detector):
smooth_win = self.smooth_win
return smooth_win
+
+ @staticmethod
+ def cal_trend(metric_values: list, obs_size: int) -> float:
+ pre = metric_values[:-obs_size]
+ check = metric_values[-obs_size:]
+ pre_mean = np.mean(pre)
+ check_mean = np.mean(check)
+ trend = (check_mean - pre_mean) / pre_mean if pre_mean > 0 else 0.0
+ # print('trend: ', round(trend, 3), pre_mean, check_mean, '|||', pre, check)
+
+ return round(trend, 3)
diff --git a/config/module/container_disruption.job.json b/config/module/container_disruption.job.json
index c96f06f..c627b33 100644
--- a/config/module/container_disruption.job.json
+++ b/config/module/container_disruption.job.json
@@ -1,6 +1,6 @@
{
"name": "ContainerDisruptionDetector",
- "enable": false,
+ "enable": true,
"job_type": "anomaly_detection",
"keywords": [
"app"
@@ -10,15 +10,15 @@
"template": "simple",
"kpis": [
{
- "metric": "gala_gopher_sli_container_cpu_rundelay",
+ "metric": "sli.cpu.schedlat_rundelay",
"entity_name": "sli",
"atrend": "rise",
"enable": true,
"params": {
"method": "max",
- "look_back": 15,
- "obs_size": 5,
- "outlier_ratio_th": 0.6,
+ "look_back": 20,
+ "obs_size": 10,
+ "outlier_ratio_th": 0.5,
"smooth_params": {
"method": "conv_smooth",
"box_pts": 3
@@ -34,6 +34,7 @@
"min_train_hours": 24,
"min_predict_minutes": 5,
"top_n": 20,
+ "extra_metrics": "cpu.busy,load.1min,octo_service.tp999,octo_service.tp99",
"preprocessor": {
"smooth_type": "rolling",
"smooth_window": 18,
@@ -119,4 +120,4 @@
"metric": "gala_gopher_container_fs_inodes_total"
}
]
-}
\ No newline at end of file
+}
--
Gitee