312 lines
13 KiB
Diff
312 lines
13 KiB
Diff
From ccb733aa3c4eceb596e8f9a9d39cec78a16a6320 Mon Sep 17 00:00:00 2001
|
|
From: wangfenglai1 <wangfenglai1@huawei.com>
|
|
Date: Mon, 20 Jan 2025 09:42:49 +0800
|
|
Subject: [PATCH] discruption case refactoring
|
|
|
|
---
|
|
.../model/detector/disruption_detector.py | 138 ++++++++++++++----
|
|
config/module/container_disruption.job.json | 13 +-
|
|
2 files changed, 119 insertions(+), 32 deletions(-)
|
|
|
|
diff --git a/anteater/model/detector/disruption_detector.py b/anteater/model/detector/disruption_detector.py
|
|
index 7a5a998..90ccfd3 100644
|
|
--- a/anteater/model/detector/disruption_detector.py
|
|
+++ b/anteater/model/detector/disruption_detector.py
|
|
@@ -10,15 +10,17 @@
|
|
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
# See the Mulan PSL v2 for more details.
|
|
# ******************************************************************************/
|
|
+from typing import Dict, List, Tuple
|
|
from abc import ABC
|
|
+from datetime import datetime
|
|
+import pytz
|
|
+from math import floor
|
|
+from itertools import groupby
|
|
|
|
+import requests
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
-from itertools import groupby
|
|
-from typing import List, Tuple
|
|
-from math import floor
|
|
-
|
|
from anteater.core.ts import TimeSeries
|
|
from anteater.core.anomaly import Anomaly, RootCause
|
|
from anteater.core.kpi import KPI, ModelConfig, Feature
|
|
@@ -42,7 +44,11 @@ class ContainerDisruptionDetector(Detector):
|
|
self.q = 1e-3
|
|
self.level = 0.98
|
|
self.smooth_win = 3
|
|
-
|
|
+
|
|
+ self.container_num = 0
|
|
+ self.start_time = None
|
|
+ self.end_time = None
|
|
+
|
|
@timer
|
|
def _execute(self, kpis: List[KPI], features: List[Feature], **kwargs) \
|
|
-> List[Anomaly]:
|
|
@@ -52,13 +58,17 @@ class ContainerDisruptionDetector(Detector):
|
|
return anomalies
|
|
|
|
def detect_and_rca(self, kpis: List[KPI]):
|
|
- start, end = dt.last(minutes=1)
|
|
+ start, end = dt.last(minutes=20)
|
|
machine_ids = self.get_unique_machine_id(start, end, kpis)
|
|
anomalies = []
|
|
for _id in machine_ids:
|
|
for kpi in kpis:
|
|
anomalies.extend(self.detect_signal_kpi(kpi, _id))
|
|
|
|
+ logger.info('total machine number is %d, container number is %d .',
|
|
+ len(machine_ids), self.container_num)
|
|
+ self.container_num = 0
|
|
+
|
|
return anomalies
|
|
|
|
def detect_signal_kpi(self, kpi, machine_id: str) -> List[Anomaly]:
|
|
@@ -74,14 +84,19 @@ class ContainerDisruptionDetector(Detector):
|
|
def get_kpi_ts_list(self, metric, machine_id: str, look_back):
|
|
|
|
if GlobalVariable.is_test_model:
|
|
- strat_time, end_time = GlobalVariable.start_time, GlobalVariable.end_time
|
|
+ start_time, end_time = GlobalVariable.start_time, GlobalVariable.end_time
|
|
+ self.start_time = start_time
|
|
+ self.end_time = end_time
|
|
|
|
ts_list = self.data_loader.get_metric(
|
|
- strat_time, end_time, metric, machine_id=machine_id)
|
|
+ start_time, end_time, metric, machine_id=machine_id)
|
|
point_count = self.data_loader.expected_point_length(strat_time, end_time)
|
|
|
|
else:
|
|
start, end = dt.last(minutes=look_back)
|
|
+ self.start_time = start
|
|
+ self.end_time = end
|
|
+
|
|
point_count = self.data_loader.expected_point_length(start, end)
|
|
ts_list = self.data_loader.get_metric(
|
|
start, end, metric, machine_id=machine_id)
|
|
@@ -106,14 +121,14 @@ class ContainerDisruptionDetector(Detector):
|
|
score=float(_score),
|
|
entity_name=kpi.entity_name,
|
|
root_causes=_root_causes,
|
|
- details={'event_source': 'spot'})
|
|
- for _ts, _score, _root_causes in ts_scores
|
|
+ details={'event_source': 'spot', 'info': _extra_info})
|
|
+ for _ts, _score, _extra_info, _root_causes in ts_scores
|
|
]
|
|
|
|
return anomalies
|
|
|
|
def cal_spot_score(self, metric, machine_id: str, **kwargs) \
|
|
- -> List[Tuple[TimeSeries, int, List[RootCause]]]:
|
|
+ -> List[Tuple[TimeSeries, int, Dict, List[RootCause]]]:
|
|
"""Calculates metrics' ab score based on n-sigma method"""
|
|
look_back = kwargs.get('look_back')
|
|
obs_size = kwargs.get('obs_size')
|
|
@@ -121,7 +136,13 @@ class ContainerDisruptionDetector(Detector):
|
|
|
|
point_count, ts_list = self.get_kpi_ts_list(metric, machine_id, look_back)
|
|
ts_scores = []
|
|
+ root_causes = []
|
|
+ extra_info = {}
|
|
+ logger.info('machine %s, total detected %d containers.',
|
|
+ machine_id, len(ts_list))
|
|
+ self.container_num += len(ts_list)
|
|
for _ts in ts_list:
|
|
+ # import pdb;pdb.set_trace()
|
|
detect_result = ts_dbscan_detector.detect(_ts.values)
|
|
if len(detect_result) != len(_ts.values):
|
|
raise ""
|
|
@@ -130,7 +151,7 @@ class ContainerDisruptionDetector(Detector):
|
|
dedup_values = [k for k, g in groupby(test_data)]
|
|
train_dedup_values = [k for k, g in groupby(train_data)]
|
|
if sum(_ts.values) == 0 or \
|
|
- np.max(_ts.values) < 1e8 or \
|
|
+ np.max(_ts.values) < 1e3 or \
|
|
len(_ts.values) < point_count * 0.6 or \
|
|
len(_ts.values) > point_count * 1.5 or \
|
|
all(x == _ts.values[0] for x in _ts.values) or\
|
|
@@ -157,7 +178,7 @@ class ContainerDisruptionDetector(Detector):
|
|
noise_ratio = np.random.randint(-1e5, 1e5, size=_ts_series_train.shape) / 1e6
|
|
noise_data = noise_ratio * _ts_series_train
|
|
_ts_series_train += noise_data
|
|
- logger.warning("peak data are same.")
|
|
+ # logger.warning("peak data are same.")
|
|
|
|
_ts_series_train, mean, std = Normalization.clip_transform(
|
|
_ts_series_train[np.newaxis, :], is_clip=False)
|
|
@@ -177,23 +198,38 @@ class ContainerDisruptionDetector(Detector):
|
|
bound_result = np.array(_ts_series_test > thr_with_alarms["thresholds"], dtype=np.int32)
|
|
result += bound_result
|
|
output = np.sum(result)
|
|
- print('data: ', _ts.values)
|
|
- print('spot result: ', result)
|
|
- logger.info('spot detected: %d , total: %d , metric: %s, image: %s',
|
|
- output, obs_size, _ts.metric, _ts.labels['container_id'])
|
|
+ if output >= 3:
|
|
+ print('data: ', _ts.values)
|
|
+ print('spot result: ', result)
|
|
+ container_hostname = _ts.labels.get('container_name', '')
|
|
+ machine_id = _ts.labels.get('machine_id', '')
|
|
+ logger.info('spot detected: %d , total: %d , metric: %s, host_name: %s',
|
|
+ output, obs_size, _ts.metric, container_hostname)
|
|
+
|
|
+ extra_info = self.get_container_extra_info(machine_id,
|
|
+ container_hostname,
|
|
+ self.start_time,
|
|
+ self.end_time,
|
|
+ obs_size)
|
|
+ print('extra_info', extra_info)
|
|
+ root_causes = self.find_discruption_source(_ts, ts_list)
|
|
|
|
score = divide(output, obs_size)
|
|
-
|
|
- root_causes = self.find_discruption_source(_ts, ts_list)
|
|
- ts_scores.append((_ts, score, root_causes))
|
|
+
|
|
+ ts_scores.append((_ts, score, extra_info, root_causes))
|
|
|
|
return ts_scores
|
|
|
|
def find_discruption_source(self, victim_ts: TimeSeries, all_ts: List[TimeSeries]) \
|
|
-> List[RootCause]:
|
|
root_causes = []
|
|
+ tmp_causes = []
|
|
for ts in all_ts:
|
|
- if ts is victim_ts:
|
|
+ # container_hostname = ts.labels.get('container_hostname', '')
|
|
+ # info = self.queryContainerInfo(container_hostname) if container_hostname else {}
|
|
+ # cpu_num = info.get('cpu', 0)
|
|
+ cpu_num = int(ts.labels.get('cpu_num', '0'))
|
|
+ if ts is victim_ts and cpu_num < 5:
|
|
continue
|
|
|
|
agg_data = []
|
|
@@ -213,15 +249,54 @@ class ContainerDisruptionDetector(Detector):
|
|
|
|
sorted_metrics_correlation = abs(metrics_correlation.iloc[0]).sort_values(ascending=False)
|
|
# print("sorted_metrics_correlation:", sorted_metrics_correlation)
|
|
+
|
|
+ causes = {
|
|
+ 'score': round(sorted_metrics_correlation.values[-1], 3),
|
|
+ 'cpu_num': cpu_num,
|
|
+ 'metric': ts.metric,
|
|
+ 'labels':ts.labels
|
|
+ }
|
|
+ causes['labels']['cpu_num'] = cpu_num
|
|
+
|
|
+ if causes['score'] > 0.5:
|
|
+ tmp_causes.append(causes)
|
|
+
|
|
+ # root_causes.append(RootCause(
|
|
+ # metric=ts.metric,
|
|
+ # labels=ts.labels,
|
|
+ # score=round(sorted_metrics_correlation.values[-1], 3)))
|
|
|
|
- root_causes.append(RootCause(
|
|
- metric=ts.metric,
|
|
- labels=ts.labels,
|
|
- score=round(sorted_metrics_correlation.values[-1], 3)))
|
|
+ tmp_causes.sort(key=lambda x: (x['labels']['cpu_num'], x['score']), reverse=True)
|
|
|
|
- root_causes.sort(key=lambda x: x.score, reverse=True)
|
|
+ root_causes = [RootCause(metric=causes['metric'], labels=causes['labels'], score=causes['score'])
|
|
+ for causes in tmp_causes]
|
|
+
|
|
+ # print("root_causes:", root_causes)
|
|
|
|
return root_causes[:3]
|
|
+
|
|
+ def get_container_extra_info(self, machine_id: str,
|
|
+ container_name: str,
|
|
+ start_time: datetime,
|
|
+ end_time: datetime,
|
|
+ obs_size: int) -> Dict:
|
|
+ extra_metrics = self.config.params.get('extra_metrics', '').split(',')
|
|
+ # print(extra_metrics)
|
|
+ result = {'container_name': container_name,
|
|
+ 'machine_id': machine_id}
|
|
+ for metric in extra_metrics:
|
|
+ ts_list = self.data_loader.get_metric(start_time, end_time, metric, machine_id=machine_id)
|
|
+ for _ts in ts_list:
|
|
+ if container_name == _ts.labels.get('container_name', ''):
|
|
+ values = _ts.values
|
|
+ trend = self.cal_trend(values, obs_size)
|
|
+ result[metric] = trend
|
|
+ result['appkey'] = _ts.labels.get('appkey', '')
|
|
+ result['cpu_num'] = int(_ts.labels.get('cpu_num', '0'))
|
|
+ # print("***", container_name, metric, _ts.values, trend)
|
|
+ break
|
|
+ return result
|
|
+
|
|
|
|
@staticmethod
|
|
def _normalize_df(df):
|
|
@@ -276,3 +351,14 @@ class ContainerDisruptionDetector(Detector):
|
|
smooth_win = self.smooth_win
|
|
|
|
return smooth_win
|
|
+
|
|
+ @staticmethod
|
|
+ def cal_trend(metric_values: list, obs_size: int) -> float:
|
|
+ pre = metric_values[:-obs_size]
|
|
+ check = metric_values[-obs_size:]
|
|
+ pre_mean = np.mean(pre)
|
|
+ check_mean = np.mean(check)
|
|
+ trend = (check_mean - pre_mean) / pre_mean if pre_mean > 0 else 0.0
|
|
+ # print('trend: ', round(trend, 3), pre_mean, check_mean, '|||', pre, check)
|
|
+
|
|
+ return round(trend, 3)
|
|
diff --git a/config/module/container_disruption.job.json b/config/module/container_disruption.job.json
|
|
index c96f06f..c627b33 100644
|
|
--- a/config/module/container_disruption.job.json
|
|
+++ b/config/module/container_disruption.job.json
|
|
@@ -1,6 +1,6 @@
|
|
{
|
|
"name": "ContainerDisruptionDetector",
|
|
- "enable": false,
|
|
+ "enable": true,
|
|
"job_type": "anomaly_detection",
|
|
"keywords": [
|
|
"app"
|
|
@@ -10,15 +10,15 @@
|
|
"template": "simple",
|
|
"kpis": [
|
|
{
|
|
- "metric": "gala_gopher_sli_container_cpu_rundelay",
|
|
+ "metric": "sli.cpu.schedlat_rundelay",
|
|
"entity_name": "sli",
|
|
"atrend": "rise",
|
|
"enable": true,
|
|
"params": {
|
|
"method": "max",
|
|
- "look_back": 15,
|
|
- "obs_size": 5,
|
|
- "outlier_ratio_th": 0.6,
|
|
+ "look_back": 20,
|
|
+ "obs_size": 10,
|
|
+ "outlier_ratio_th": 0.5,
|
|
"smooth_params": {
|
|
"method": "conv_smooth",
|
|
"box_pts": 3
|
|
@@ -34,6 +34,7 @@
|
|
"min_train_hours": 24,
|
|
"min_predict_minutes": 5,
|
|
"top_n": 20,
|
|
+ "extra_metrics": "cpu.busy,load.1min,octo_service.tp999,octo_service.tp99",
|
|
"preprocessor": {
|
|
"smooth_type": "rolling",
|
|
"smooth_window": 18,
|
|
@@ -119,4 +120,4 @@
|
|
"metric": "gala_gopher_container_fs_inodes_total"
|
|
}
|
|
]
|
|
-}
|
|
\ No newline at end of file
|
|
+}
|
|
--
|
|
Gitee
|