discruption case refactoring
Signed-off-by: MaSkingx <maxin@xfusion.com>
This commit is contained in:
parent
570e710b76
commit
dd60181b3f
311
0007-discruption-case-refactoring.patch
Normal file
311
0007-discruption-case-refactoring.patch
Normal file
@ -0,0 +1,311 @@
|
||||
From ccb733aa3c4eceb596e8f9a9d39cec78a16a6320 Mon Sep 17 00:00:00 2001
|
||||
From: wangfenglai1 <wangfenglai1@huawei.com>
|
||||
Date: Mon, 20 Jan 2025 09:42:49 +0800
|
||||
Subject: [PATCH] discruption case refactoring
|
||||
|
||||
---
|
||||
.../model/detector/disruption_detector.py | 138 ++++++++++++++----
|
||||
config/module/container_disruption.job.json | 13 +-
|
||||
2 files changed, 119 insertions(+), 32 deletions(-)
|
||||
|
||||
diff --git a/anteater/model/detector/disruption_detector.py b/anteater/model/detector/disruption_detector.py
|
||||
index 7a5a998..90ccfd3 100644
|
||||
--- a/anteater/model/detector/disruption_detector.py
|
||||
+++ b/anteater/model/detector/disruption_detector.py
|
||||
@@ -10,15 +10,17 @@
|
||||
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
# See the Mulan PSL v2 for more details.
|
||||
# ******************************************************************************/
|
||||
+from typing import Dict, List, Tuple
|
||||
from abc import ABC
|
||||
+from datetime import datetime
|
||||
+import pytz
|
||||
+from math import floor
|
||||
+from itertools import groupby
|
||||
|
||||
+import requests
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
-from itertools import groupby
|
||||
-from typing import List, Tuple
|
||||
-from math import floor
|
||||
-
|
||||
from anteater.core.ts import TimeSeries
|
||||
from anteater.core.anomaly import Anomaly, RootCause
|
||||
from anteater.core.kpi import KPI, ModelConfig, Feature
|
||||
@@ -42,7 +44,11 @@ class ContainerDisruptionDetector(Detector):
|
||||
self.q = 1e-3
|
||||
self.level = 0.98
|
||||
self.smooth_win = 3
|
||||
-
|
||||
+
|
||||
+ self.container_num = 0
|
||||
+ self.start_time = None
|
||||
+ self.end_time = None
|
||||
+
|
||||
@timer
|
||||
def _execute(self, kpis: List[KPI], features: List[Feature], **kwargs) \
|
||||
-> List[Anomaly]:
|
||||
@@ -52,13 +58,17 @@ class ContainerDisruptionDetector(Detector):
|
||||
return anomalies
|
||||
|
||||
def detect_and_rca(self, kpis: List[KPI]):
|
||||
- start, end = dt.last(minutes=1)
|
||||
+ start, end = dt.last(minutes=20)
|
||||
machine_ids = self.get_unique_machine_id(start, end, kpis)
|
||||
anomalies = []
|
||||
for _id in machine_ids:
|
||||
for kpi in kpis:
|
||||
anomalies.extend(self.detect_signal_kpi(kpi, _id))
|
||||
|
||||
+ logger.info('total machine number is %d, container number is %d .',
|
||||
+ len(machine_ids), self.container_num)
|
||||
+ self.container_num = 0
|
||||
+
|
||||
return anomalies
|
||||
|
||||
def detect_signal_kpi(self, kpi, machine_id: str) -> List[Anomaly]:
|
||||
@@ -74,14 +84,19 @@ class ContainerDisruptionDetector(Detector):
|
||||
def get_kpi_ts_list(self, metric, machine_id: str, look_back):
|
||||
|
||||
if GlobalVariable.is_test_model:
|
||||
- strat_time, end_time = GlobalVariable.start_time, GlobalVariable.end_time
|
||||
+ start_time, end_time = GlobalVariable.start_time, GlobalVariable.end_time
|
||||
+ self.start_time = start_time
|
||||
+ self.end_time = end_time
|
||||
|
||||
ts_list = self.data_loader.get_metric(
|
||||
- strat_time, end_time, metric, machine_id=machine_id)
|
||||
+ start_time, end_time, metric, machine_id=machine_id)
|
||||
point_count = self.data_loader.expected_point_length(strat_time, end_time)
|
||||
|
||||
else:
|
||||
start, end = dt.last(minutes=look_back)
|
||||
+ self.start_time = start
|
||||
+ self.end_time = end
|
||||
+
|
||||
point_count = self.data_loader.expected_point_length(start, end)
|
||||
ts_list = self.data_loader.get_metric(
|
||||
start, end, metric, machine_id=machine_id)
|
||||
@@ -106,14 +121,14 @@ class ContainerDisruptionDetector(Detector):
|
||||
score=float(_score),
|
||||
entity_name=kpi.entity_name,
|
||||
root_causes=_root_causes,
|
||||
- details={'event_source': 'spot'})
|
||||
- for _ts, _score, _root_causes in ts_scores
|
||||
+ details={'event_source': 'spot', 'info': _extra_info})
|
||||
+ for _ts, _score, _extra_info, _root_causes in ts_scores
|
||||
]
|
||||
|
||||
return anomalies
|
||||
|
||||
def cal_spot_score(self, metric, machine_id: str, **kwargs) \
|
||||
- -> List[Tuple[TimeSeries, int, List[RootCause]]]:
|
||||
+ -> List[Tuple[TimeSeries, int, Dict, List[RootCause]]]:
|
||||
"""Calculates metrics' ab score based on n-sigma method"""
|
||||
look_back = kwargs.get('look_back')
|
||||
obs_size = kwargs.get('obs_size')
|
||||
@@ -121,7 +136,13 @@ class ContainerDisruptionDetector(Detector):
|
||||
|
||||
point_count, ts_list = self.get_kpi_ts_list(metric, machine_id, look_back)
|
||||
ts_scores = []
|
||||
+ root_causes = []
|
||||
+ extra_info = {}
|
||||
+ logger.info('machine %s, total detected %d containers.',
|
||||
+ machine_id, len(ts_list))
|
||||
+ self.container_num += len(ts_list)
|
||||
for _ts in ts_list:
|
||||
+ # import pdb;pdb.set_trace()
|
||||
detect_result = ts_dbscan_detector.detect(_ts.values)
|
||||
if len(detect_result) != len(_ts.values):
|
||||
raise ""
|
||||
@@ -130,7 +151,7 @@ class ContainerDisruptionDetector(Detector):
|
||||
dedup_values = [k for k, g in groupby(test_data)]
|
||||
train_dedup_values = [k for k, g in groupby(train_data)]
|
||||
if sum(_ts.values) == 0 or \
|
||||
- np.max(_ts.values) < 1e8 or \
|
||||
+ np.max(_ts.values) < 1e3 or \
|
||||
len(_ts.values) < point_count * 0.6 or \
|
||||
len(_ts.values) > point_count * 1.5 or \
|
||||
all(x == _ts.values[0] for x in _ts.values) or\
|
||||
@@ -157,7 +178,7 @@ class ContainerDisruptionDetector(Detector):
|
||||
noise_ratio = np.random.randint(-1e5, 1e5, size=_ts_series_train.shape) / 1e6
|
||||
noise_data = noise_ratio * _ts_series_train
|
||||
_ts_series_train += noise_data
|
||||
- logger.warning("peak data are same.")
|
||||
+ # logger.warning("peak data are same.")
|
||||
|
||||
_ts_series_train, mean, std = Normalization.clip_transform(
|
||||
_ts_series_train[np.newaxis, :], is_clip=False)
|
||||
@@ -177,23 +198,38 @@ class ContainerDisruptionDetector(Detector):
|
||||
bound_result = np.array(_ts_series_test > thr_with_alarms["thresholds"], dtype=np.int32)
|
||||
result += bound_result
|
||||
output = np.sum(result)
|
||||
- print('data: ', _ts.values)
|
||||
- print('spot result: ', result)
|
||||
- logger.info('spot detected: %d , total: %d , metric: %s, image: %s',
|
||||
- output, obs_size, _ts.metric, _ts.labels['container_id'])
|
||||
+ if output >= 3:
|
||||
+ print('data: ', _ts.values)
|
||||
+ print('spot result: ', result)
|
||||
+ container_hostname = _ts.labels.get('container_name', '')
|
||||
+ machine_id = _ts.labels.get('machine_id', '')
|
||||
+ logger.info('spot detected: %d , total: %d , metric: %s, host_name: %s',
|
||||
+ output, obs_size, _ts.metric, container_hostname)
|
||||
+
|
||||
+ extra_info = self.get_container_extra_info(machine_id,
|
||||
+ container_hostname,
|
||||
+ self.start_time,
|
||||
+ self.end_time,
|
||||
+ obs_size)
|
||||
+ print('extra_info', extra_info)
|
||||
+ root_causes = self.find_discruption_source(_ts, ts_list)
|
||||
|
||||
score = divide(output, obs_size)
|
||||
-
|
||||
- root_causes = self.find_discruption_source(_ts, ts_list)
|
||||
- ts_scores.append((_ts, score, root_causes))
|
||||
+
|
||||
+ ts_scores.append((_ts, score, extra_info, root_causes))
|
||||
|
||||
return ts_scores
|
||||
|
||||
def find_discruption_source(self, victim_ts: TimeSeries, all_ts: List[TimeSeries]) \
|
||||
-> List[RootCause]:
|
||||
root_causes = []
|
||||
+ tmp_causes = []
|
||||
for ts in all_ts:
|
||||
- if ts is victim_ts:
|
||||
+ # container_hostname = ts.labels.get('container_hostname', '')
|
||||
+ # info = self.queryContainerInfo(container_hostname) if container_hostname else {}
|
||||
+ # cpu_num = info.get('cpu', 0)
|
||||
+ cpu_num = int(ts.labels.get('cpu_num', '0'))
|
||||
+ if ts is victim_ts and cpu_num < 5:
|
||||
continue
|
||||
|
||||
agg_data = []
|
||||
@@ -213,15 +249,54 @@ class ContainerDisruptionDetector(Detector):
|
||||
|
||||
sorted_metrics_correlation = abs(metrics_correlation.iloc[0]).sort_values(ascending=False)
|
||||
# print("sorted_metrics_correlation:", sorted_metrics_correlation)
|
||||
+
|
||||
+ causes = {
|
||||
+ 'score': round(sorted_metrics_correlation.values[-1], 3),
|
||||
+ 'cpu_num': cpu_num,
|
||||
+ 'metric': ts.metric,
|
||||
+ 'labels':ts.labels
|
||||
+ }
|
||||
+ causes['labels']['cpu_num'] = cpu_num
|
||||
+
|
||||
+ if causes['score'] > 0.5:
|
||||
+ tmp_causes.append(causes)
|
||||
+
|
||||
+ # root_causes.append(RootCause(
|
||||
+ # metric=ts.metric,
|
||||
+ # labels=ts.labels,
|
||||
+ # score=round(sorted_metrics_correlation.values[-1], 3)))
|
||||
|
||||
- root_causes.append(RootCause(
|
||||
- metric=ts.metric,
|
||||
- labels=ts.labels,
|
||||
- score=round(sorted_metrics_correlation.values[-1], 3)))
|
||||
+ tmp_causes.sort(key=lambda x: (x['labels']['cpu_num'], x['score']), reverse=True)
|
||||
|
||||
- root_causes.sort(key=lambda x: x.score, reverse=True)
|
||||
+ root_causes = [RootCause(metric=causes['metric'], labels=causes['labels'], score=causes['score'])
|
||||
+ for causes in tmp_causes]
|
||||
+
|
||||
+ # print("root_causes:", root_causes)
|
||||
|
||||
return root_causes[:3]
|
||||
+
|
||||
+ def get_container_extra_info(self, machine_id: str,
|
||||
+ container_name: str,
|
||||
+ start_time: datetime,
|
||||
+ end_time: datetime,
|
||||
+ obs_size: int) -> Dict:
|
||||
+ extra_metrics = self.config.params.get('extra_metrics', '').split(',')
|
||||
+ # print(extra_metrics)
|
||||
+ result = {'container_name': container_name,
|
||||
+ 'machine_id': machine_id}
|
||||
+ for metric in extra_metrics:
|
||||
+ ts_list = self.data_loader.get_metric(start_time, end_time, metric, machine_id=machine_id)
|
||||
+ for _ts in ts_list:
|
||||
+ if container_name == _ts.labels.get('container_name', ''):
|
||||
+ values = _ts.values
|
||||
+ trend = self.cal_trend(values, obs_size)
|
||||
+ result[metric] = trend
|
||||
+ result['appkey'] = _ts.labels.get('appkey', '')
|
||||
+ result['cpu_num'] = int(_ts.labels.get('cpu_num', '0'))
|
||||
+ # print("***", container_name, metric, _ts.values, trend)
|
||||
+ break
|
||||
+ return result
|
||||
+
|
||||
|
||||
@staticmethod
|
||||
def _normalize_df(df):
|
||||
@@ -276,3 +351,14 @@ class ContainerDisruptionDetector(Detector):
|
||||
smooth_win = self.smooth_win
|
||||
|
||||
return smooth_win
|
||||
+
|
||||
+ @staticmethod
|
||||
+ def cal_trend(metric_values: list, obs_size: int) -> float:
|
||||
+ pre = metric_values[:-obs_size]
|
||||
+ check = metric_values[-obs_size:]
|
||||
+ pre_mean = np.mean(pre)
|
||||
+ check_mean = np.mean(check)
|
||||
+ trend = (check_mean - pre_mean) / pre_mean if pre_mean > 0 else 0.0
|
||||
+ # print('trend: ', round(trend, 3), pre_mean, check_mean, '|||', pre, check)
|
||||
+
|
||||
+ return round(trend, 3)
|
||||
diff --git a/config/module/container_disruption.job.json b/config/module/container_disruption.job.json
|
||||
index c96f06f..c627b33 100644
|
||||
--- a/config/module/container_disruption.job.json
|
||||
+++ b/config/module/container_disruption.job.json
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "ContainerDisruptionDetector",
|
||||
- "enable": false,
|
||||
+ "enable": true,
|
||||
"job_type": "anomaly_detection",
|
||||
"keywords": [
|
||||
"app"
|
||||
@@ -10,15 +10,15 @@
|
||||
"template": "simple",
|
||||
"kpis": [
|
||||
{
|
||||
- "metric": "gala_gopher_sli_container_cpu_rundelay",
|
||||
+ "metric": "sli.cpu.schedlat_rundelay",
|
||||
"entity_name": "sli",
|
||||
"atrend": "rise",
|
||||
"enable": true,
|
||||
"params": {
|
||||
"method": "max",
|
||||
- "look_back": 15,
|
||||
- "obs_size": 5,
|
||||
- "outlier_ratio_th": 0.6,
|
||||
+ "look_back": 20,
|
||||
+ "obs_size": 10,
|
||||
+ "outlier_ratio_th": 0.5,
|
||||
"smooth_params": {
|
||||
"method": "conv_smooth",
|
||||
"box_pts": 3
|
||||
@@ -34,6 +34,7 @@
|
||||
"min_train_hours": 24,
|
||||
"min_predict_minutes": 5,
|
||||
"top_n": 20,
|
||||
+ "extra_metrics": "cpu.busy,load.1min,octo_service.tp999,octo_service.tp99",
|
||||
"preprocessor": {
|
||||
"smooth_type": "rolling",
|
||||
"smooth_window": 18,
|
||||
@@ -119,4 +120,4 @@
|
||||
"metric": "gala_gopher_container_fs_inodes_total"
|
||||
}
|
||||
]
|
||||
-}
|
||||
\ No newline at end of file
|
||||
+}
|
||||
--
|
||||
Gitee
|
||||
@ -2,7 +2,7 @@
|
||||
|
||||
Name: gala-anteater
|
||||
Version: 1.2.1
|
||||
Release: 7
|
||||
Release: 8
|
||||
Summary: A time-series anomaly detection platform for operating system.
|
||||
License: MulanPSL2
|
||||
URL: https://gitee.com/openeuler/gala-anteater
|
||||
@ -17,6 +17,7 @@ patch2: 0003-remove-unuse-code.patch
|
||||
patch3: 0004-fixbug-wrong-label-for-dbscan.patch
|
||||
patch4: 0005-add-detect-type-for-usad-model.patch
|
||||
patch5: 0006-add-local-file-test-case.patch
|
||||
patch6: 0007-discruption-case-refactoring.patch
|
||||
|
||||
%description
|
||||
Abnormal detection module for A-Ops project
|
||||
@ -84,6 +85,9 @@ fi
|
||||
|
||||
|
||||
%changelog
|
||||
* Mon Apr 21 2025 maxin <maxin@xfusion.com> - 1.2.1-8
|
||||
- discruption case refactoring
|
||||
|
||||
* Fri Apr 18 2025 maxin <maxin@xfusion.com> - 1.2.1-7
|
||||
- add local file test case
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user