1166 lines
41 KiB
Diff
1166 lines
41 KiB
Diff
|
|
From bd32dc01000126d593c188d47404cfdbe1df343e Mon Sep 17 00:00:00 2001
|
||
|
|
From: zhuofeng <zhuofeng2@huawei.com>
|
||
|
|
Date: Thu, 12 Sep 2024 11:29:01 +0800
|
||
|
|
Subject: [PATCH 1/2] add collect module to sysSentry
|
||
|
|
|
||
|
|
---
|
||
|
|
config/collector.conf | 7 +
|
||
|
|
service/sentryCollector.service | 12 +
|
||
|
|
service/sysSentry.service | 2 +-
|
||
|
|
src/python/sentryCollector/__init__.py | 0
|
||
|
|
src/python/sentryCollector/__main__.py | 17 ++
|
||
|
|
src/python/sentryCollector/collect_config.py | 118 ++++++++
|
||
|
|
src/python/sentryCollector/collect_io.py | 239 ++++++++++++++++
|
||
|
|
src/python/sentryCollector/collect_plugin.py | 276 ++++++++++++++++++
|
||
|
|
src/python/sentryCollector/collect_server.py | 285 +++++++++++++++++++
|
||
|
|
src/python/sentryCollector/collectd.py | 99 +++++++
|
||
|
|
src/python/setup.py | 4 +-
|
||
|
|
11 files changed, 1057 insertions(+), 2 deletions(-)
|
||
|
|
create mode 100644 config/collector.conf
|
||
|
|
create mode 100644 service/sentryCollector.service
|
||
|
|
create mode 100644 src/python/sentryCollector/__init__.py
|
||
|
|
create mode 100644 src/python/sentryCollector/__main__.py
|
||
|
|
create mode 100644 src/python/sentryCollector/collect_config.py
|
||
|
|
create mode 100644 src/python/sentryCollector/collect_io.py
|
||
|
|
create mode 100644 src/python/sentryCollector/collect_plugin.py
|
||
|
|
create mode 100644 src/python/sentryCollector/collect_server.py
|
||
|
|
create mode 100644 src/python/sentryCollector/collectd.py
|
||
|
|
|
||
|
|
diff --git a/config/collector.conf b/config/collector.conf
|
||
|
|
new file mode 100644
|
||
|
|
index 0000000..9baa086
|
||
|
|
--- /dev/null
|
||
|
|
+++ b/config/collector.conf
|
||
|
|
@@ -0,0 +1,7 @@
|
||
|
|
+[common]
|
||
|
|
+modules=io
|
||
|
|
+
|
||
|
|
+[io]
|
||
|
|
+period_time=1
|
||
|
|
+max_save=10
|
||
|
|
+disk=default
|
||
|
|
\ No newline at end of file
|
||
|
|
diff --git a/service/sentryCollector.service b/service/sentryCollector.service
|
||
|
|
new file mode 100644
|
||
|
|
index 0000000..4ee07d5
|
||
|
|
--- /dev/null
|
||
|
|
+++ b/service/sentryCollector.service
|
||
|
|
@@ -0,0 +1,12 @@
|
||
|
|
+[Unit]
|
||
|
|
+Description = Collection module added for sysSentry and kernel lock-free collection
|
||
|
|
+
|
||
|
|
+[Service]
|
||
|
|
+ExecStart=/usr/bin/python3 /usr/bin/sentryCollector
|
||
|
|
+ExecStop=/bin/kill $MAINPID
|
||
|
|
+KillMode=process
|
||
|
|
+Restart=on-failure
|
||
|
|
+RestartSec=10s
|
||
|
|
+
|
||
|
|
+[Install]
|
||
|
|
+WantedBy = multi-user.target
|
||
|
|
diff --git a/service/sysSentry.service b/service/sysSentry.service
|
||
|
|
index 4d85a6c..1d8338f 100644
|
||
|
|
--- a/service/sysSentry.service
|
||
|
|
+++ b/service/sysSentry.service
|
||
|
|
@@ -2,7 +2,7 @@
|
||
|
|
Description=EulerOS System Inspection Frame
|
||
|
|
|
||
|
|
[Service]
|
||
|
|
-ExecStart=/usr/bin/syssentry
|
||
|
|
+ExecStart=/usr/bin/python3 /usr/bin/syssentry
|
||
|
|
ExecStop=/bin/kill $MAINPID
|
||
|
|
KillMode=process
|
||
|
|
Restart=on-failure
|
||
|
|
diff --git a/src/python/sentryCollector/__init__.py b/src/python/sentryCollector/__init__.py
|
||
|
|
new file mode 100644
|
||
|
|
index 0000000..e69de29
|
||
|
|
diff --git a/src/python/sentryCollector/__main__.py b/src/python/sentryCollector/__main__.py
|
||
|
|
new file mode 100644
|
||
|
|
index 0000000..9c2ae50
|
||
|
|
--- /dev/null
|
||
|
|
+++ b/src/python/sentryCollector/__main__.py
|
||
|
|
@@ -0,0 +1,17 @@
|
||
|
|
+# coding: utf-8
|
||
|
|
+# Copyright (c) 2023 Huawei Technologies Co., Ltd.
|
||
|
|
+# sysSentry is licensed under the Mulan PSL v2.
|
||
|
|
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
|
||
|
|
+# You may obtain a copy of Mulan PSL v2 at:
|
||
|
|
+# http://license.coscl.org.cn/MulanPSL2
|
||
|
|
+# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
|
||
|
|
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
|
||
|
|
+# PURPOSE.
|
||
|
|
+# See the Mulan PSL v2 for more details.
|
||
|
|
+
|
||
|
|
+"""
|
||
|
|
+main
|
||
|
|
+"""
|
||
|
|
+from collectd import collectd
|
||
|
|
+
|
||
|
|
+collectd.main()
|
||
|
|
diff --git a/src/python/sentryCollector/collect_config.py b/src/python/sentryCollector/collect_config.py
|
||
|
|
new file mode 100644
|
||
|
|
index 0000000..b6cc75c
|
||
|
|
--- /dev/null
|
||
|
|
+++ b/src/python/sentryCollector/collect_config.py
|
||
|
|
@@ -0,0 +1,118 @@
|
||
|
|
+# coding: utf-8
|
||
|
|
+# Copyright (c) 2024 Huawei Technologies Co., Ltd.
|
||
|
|
+# sysSentry is licensed under the Mulan PSL v2.
|
||
|
|
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
|
||
|
|
+# You may obtain a copy of Mulan PSL v2 at:
|
||
|
|
+# http://license.coscl.org.cn/MulanPSL2
|
||
|
|
+# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
|
||
|
|
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
|
||
|
|
+# PURPOSE.
|
||
|
|
+# See the Mulan PSL v2 for more details.
|
||
|
|
+
|
||
|
|
+"""
|
||
|
|
+Read and save collector.conf value.
|
||
|
|
+"""
|
||
|
|
+import configparser
|
||
|
|
+import logging
|
||
|
|
+import os
|
||
|
|
+import re
|
||
|
|
+
|
||
|
|
+
|
||
|
|
+COLLECT_CONF_PATH = "/etc/sysSentry/collector.conf"
|
||
|
|
+
|
||
|
|
+CONF_COMMON = 'common'
|
||
|
|
+CONF_MODULES = 'modules'
|
||
|
|
+
|
||
|
|
+# io
|
||
|
|
+CONF_IO = 'io'
|
||
|
|
+CONF_IO_PERIOD_TIME = 'period_time'
|
||
|
|
+CONF_IO_MAX_SAVE = 'max_save'
|
||
|
|
+CONF_IO_DISK = 'disk'
|
||
|
|
+CONF_IO_PERIOD_TIME_DEFAULT = 1
|
||
|
|
+CONF_IO_MAX_SAVE_DEFAULT = 10
|
||
|
|
+CONF_IO_DISK_DEFAULT = "default"
|
||
|
|
+
|
||
|
|
+class CollectConfig:
|
||
|
|
+ def __init__(self, filename=COLLECT_CONF_PATH):
|
||
|
|
+
|
||
|
|
+ self.filename = filename
|
||
|
|
+ self.modules = []
|
||
|
|
+ self.module_count = 0
|
||
|
|
+ self.load_config()
|
||
|
|
+
|
||
|
|
+ def load_config(self):
|
||
|
|
+ if not os.path.exists(self.filename):
|
||
|
|
+ logging.error("%s is not exists", self.filename)
|
||
|
|
+ return
|
||
|
|
+
|
||
|
|
+ try:
|
||
|
|
+ self.config = configparser.ConfigParser()
|
||
|
|
+ self.config.read(self.filename)
|
||
|
|
+ except configparser.Error:
|
||
|
|
+ logging.error("collectd configure file read failed")
|
||
|
|
+ return
|
||
|
|
+
|
||
|
|
+ try:
|
||
|
|
+ common_config = self.config[CONF_COMMON]
|
||
|
|
+ modules_str = common_config[CONF_MODULES]
|
||
|
|
+ # remove space
|
||
|
|
+ modules_list = modules_str.replace(" ", "").split(',')
|
||
|
|
+ except KeyError as e:
|
||
|
|
+ logging.error("read config data failed, %s", e)
|
||
|
|
+ return
|
||
|
|
+
|
||
|
|
+ pattern = r'^[a-zA-Z0-9-_]+$'
|
||
|
|
+ for module_name in modules_list:
|
||
|
|
+ if not re.match(pattern, module_name):
|
||
|
|
+ logging.warning("module_name: %s is invalid", module_name)
|
||
|
|
+ continue
|
||
|
|
+ if not self.config.has_section(module_name):
|
||
|
|
+ logging.warning("module_name: %s config is incorrect", module_name)
|
||
|
|
+ continue
|
||
|
|
+ self.modules.append(module_name)
|
||
|
|
+
|
||
|
|
+ def load_module_config(self, module_name):
|
||
|
|
+ module_name = module_name.strip().lower()
|
||
|
|
+ if module_name in self.modules and self.config.has_section(module_name):
|
||
|
|
+ return {key.lower(): value for key, value in self.config[module_name].items()}
|
||
|
|
+ else:
|
||
|
|
+ raise ValueError(f"Module '{module_name}' not found in configuration")
|
||
|
|
+
|
||
|
|
+ def get_io_config(self):
|
||
|
|
+ result_io_config = {}
|
||
|
|
+ io_map_value = self.load_module_config(CONF_IO)
|
||
|
|
+ # period_time
|
||
|
|
+ period_time = io_map_value.get(CONF_IO_PERIOD_TIME)
|
||
|
|
+ if period_time and period_time.isdigit() and int(period_time) >= 1 and int(period_time) <= 300:
|
||
|
|
+ result_io_config[CONF_IO_PERIOD_TIME] = int(period_time)
|
||
|
|
+ else:
|
||
|
|
+ logging.warning("module_name = %s section, field = %s is incorrect, use default %d",
|
||
|
|
+ CONF_IO, CONF_IO_PERIOD_TIME, CONF_IO_PERIOD_TIME_DEFAULT)
|
||
|
|
+ result_io_config[CONF_IO_PERIOD_TIME] = CONF_IO_PERIOD_TIME_DEFAULT
|
||
|
|
+ # max_save
|
||
|
|
+ max_save = io_map_value.get(CONF_IO_MAX_SAVE)
|
||
|
|
+ if max_save and max_save.isdigit() and int(max_save) >= 1 and int(max_save) <= 300:
|
||
|
|
+ result_io_config[CONF_IO_MAX_SAVE] = int(max_save)
|
||
|
|
+ else:
|
||
|
|
+ logging.warning("module_name = %s section, field = %s is incorrect, use default %d",
|
||
|
|
+ CONF_IO, CONF_IO_MAX_SAVE, CONF_IO_MAX_SAVE_DEFAULT)
|
||
|
|
+ result_io_config[CONF_IO_MAX_SAVE] = CONF_IO_MAX_SAVE_DEFAULT
|
||
|
|
+ # disk
|
||
|
|
+ disk = io_map_value.get(CONF_IO_DISK)
|
||
|
|
+ if disk:
|
||
|
|
+ disk_str = disk.replace(" ", "")
|
||
|
|
+ pattern = r'^[a-zA-Z0-9-_,]+$'
|
||
|
|
+ if not re.match(pattern, disk_str):
|
||
|
|
+ logging.warning("module_name = %s section, field = %s is incorrect, use default %s",
|
||
|
|
+ CONF_IO, CONF_IO_DISK, CONF_IO_DISK_DEFAULT)
|
||
|
|
+ disk_str = CONF_IO_DISK_DEFAULT
|
||
|
|
+ result_io_config[CONF_IO_DISK] = disk_str
|
||
|
|
+ else:
|
||
|
|
+ logging.warning("module_name = %s section, field = %s is incorrect, use default %s",
|
||
|
|
+ CONF_IO, CONF_IO_DISK, CONF_IO_DISK_DEFAULT)
|
||
|
|
+ result_io_config[CONF_IO_DISK] = CONF_IO_DISK_DEFAULT
|
||
|
|
+ logging.info("config get_io_config: %s", result_io_config)
|
||
|
|
+ return result_io_config
|
||
|
|
+
|
||
|
|
+ def get_common_config(self):
|
||
|
|
+ return {key.lower(): value for key, value in self.config['common'].items()}
|
||
|
|
diff --git a/src/python/sentryCollector/collect_io.py b/src/python/sentryCollector/collect_io.py
|
||
|
|
new file mode 100644
|
||
|
|
index 0000000..b826dc4
|
||
|
|
--- /dev/null
|
||
|
|
+++ b/src/python/sentryCollector/collect_io.py
|
||
|
|
@@ -0,0 +1,239 @@
|
||
|
|
+# coding: utf-8
|
||
|
|
+# Copyright (c) 2024 Huawei Technologies Co., Ltd.
|
||
|
|
+# sysSentry is licensed under the Mulan PSL v2.
|
||
|
|
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
|
||
|
|
+# You may obtain a copy of Mulan PSL v2 at:
|
||
|
|
+# http://license.coscl.org.cn/MulanPSL2
|
||
|
|
+# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
|
||
|
|
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
|
||
|
|
+# PURPOSE.
|
||
|
|
+# See the Mulan PSL v2 for more details.
|
||
|
|
+
|
||
|
|
+"""
|
||
|
|
+collect module
|
||
|
|
+"""
|
||
|
|
+import os
|
||
|
|
+import time
|
||
|
|
+import logging
|
||
|
|
+import threading
|
||
|
|
+
|
||
|
|
+from .collect_config import CollectConfig
|
||
|
|
+
|
||
|
|
+Io_Category = ["read", "write", "flush", "discard"]
|
||
|
|
+IO_GLOBAL_DATA = {}
|
||
|
|
+IO_CONFIG_DATA = []
|
||
|
|
+
|
||
|
|
+class IoStatus():
|
||
|
|
+ TOTAL = 0
|
||
|
|
+ FINISH = 1
|
||
|
|
+ LATENCY = 2
|
||
|
|
+
|
||
|
|
+class CollectIo():
|
||
|
|
+
|
||
|
|
+ def __init__(self, module_config):
|
||
|
|
+
|
||
|
|
+ io_config = module_config.get_io_config()
|
||
|
|
+
|
||
|
|
+ self.period_time = io_config['period_time']
|
||
|
|
+ self.max_save = io_config['max_save']
|
||
|
|
+ disk_str = io_config['disk']
|
||
|
|
+
|
||
|
|
+ self.disk_map_stage = {}
|
||
|
|
+ self.window_value = {}
|
||
|
|
+
|
||
|
|
+ self.loop_all = False
|
||
|
|
+
|
||
|
|
+ if disk_str == "default":
|
||
|
|
+ self.loop_all = True
|
||
|
|
+ else:
|
||
|
|
+ self.disk_list = disk_str.strip().split(',')
|
||
|
|
+
|
||
|
|
+ self.stop_event = threading.Event()
|
||
|
|
+
|
||
|
|
+ IO_CONFIG_DATA.append(self.period_time)
|
||
|
|
+ IO_CONFIG_DATA.append(self.max_save)
|
||
|
|
+
|
||
|
|
+ def get_blk_io_hierarchy(self, disk_name, stage_list):
|
||
|
|
+ stats_file = '/sys/kernel/debug/block/{}/blk_io_hierarchy/stats'.format(disk_name)
|
||
|
|
+ try:
|
||
|
|
+ with open(stats_file, 'r') as file:
|
||
|
|
+ lines = file.read()
|
||
|
|
+ except FileNotFoundError:
|
||
|
|
+ logging.error("The file %s does not exist", stats_file)
|
||
|
|
+ return -1
|
||
|
|
+ except Exception as e:
|
||
|
|
+ logging.error("An error occurred3: %s", e)
|
||
|
|
+ return -1
|
||
|
|
+
|
||
|
|
+ curr_value = lines.strip().split('\n')
|
||
|
|
+
|
||
|
|
+ for stage_val in curr_value:
|
||
|
|
+ stage = stage_val.split(' ')[0]
|
||
|
|
+ if (len(self.window_value[disk_name][stage])) >= 2:
|
||
|
|
+ self.window_value[disk_name][stage].pop(0)
|
||
|
|
+
|
||
|
|
+ curr_stage_value = stage_val.split(' ')[1:-1]
|
||
|
|
+ self.window_value[disk_name][stage].append(curr_stage_value)
|
||
|
|
+ return 0
|
||
|
|
+
|
||
|
|
+ def append_period_lat(self, disk_name, stage_list):
|
||
|
|
+ for stage in stage_list:
|
||
|
|
+ if len(self.window_value[disk_name][stage]) < 2:
|
||
|
|
+ return
|
||
|
|
+ curr_stage_value = self.window_value[disk_name][stage][-1]
|
||
|
|
+ last_stage_value = self.window_value[disk_name][stage][-2]
|
||
|
|
+
|
||
|
|
+ for index in range(len(Io_Category)):
|
||
|
|
+ # read=0, write=1, flush=2, discard=3
|
||
|
|
+ if (len(IO_GLOBAL_DATA[disk_name][stage][Io_Category[index]])) >= self.max_save:
|
||
|
|
+ IO_GLOBAL_DATA[disk_name][stage][Io_Category[index]].pop()
|
||
|
|
+
|
||
|
|
+ curr_lat = self.get_latency_value(curr_stage_value, last_stage_value, index)
|
||
|
|
+ curr_iops = self.get_iops(curr_stage_value, last_stage_value, index)
|
||
|
|
+ curr_io_length = self.get_io_length(curr_stage_value, last_stage_value, index)
|
||
|
|
+ curr_io_dump = self.get_io_dump(disk_name, stage, index)
|
||
|
|
+
|
||
|
|
+ IO_GLOBAL_DATA[disk_name][stage][Io_Category[index]].insert(0, [curr_lat, curr_io_dump, curr_io_length, curr_iops])
|
||
|
|
+
|
||
|
|
+ def get_iops(self, curr_stage_value, last_stage_value, category):
|
||
|
|
+ try:
|
||
|
|
+ finish = int(curr_stage_value[category * 3 + IoStatus.FINISH]) - int(last_stage_value[category * 3 + IoStatus.FINISH])
|
||
|
|
+ except ValueError as e:
|
||
|
|
+ logging.error("get_iops convert to int failed, %s", e)
|
||
|
|
+ return 0
|
||
|
|
+ value = finish / self.period_time
|
||
|
|
+ if value.is_integer():
|
||
|
|
+ return int(value)
|
||
|
|
+ else:
|
||
|
|
+ return round(value, 1)
|
||
|
|
+
|
||
|
|
+ def get_latency_value(self, curr_stage_value, last_stage_value, category):
|
||
|
|
+ try:
|
||
|
|
+ finish = int(curr_stage_value[category * 3 + IoStatus.FINISH]) - int(last_stage_value[category * 3 + IoStatus.FINISH])
|
||
|
|
+ lat_time = (int(curr_stage_value[category * 3 + IoStatus.LATENCY]) - int(last_stage_value[category * 3 + IoStatus.LATENCY]))
|
||
|
|
+ except ValueError as e:
|
||
|
|
+ logging.error("get_latency_value convert to int failed, %s", e)
|
||
|
|
+ return 0
|
||
|
|
+ if finish <= 0 or lat_time <= 0:
|
||
|
|
+ return 0
|
||
|
|
+ value = lat_time / finish / 1000 / 1000
|
||
|
|
+ if value.is_integer():
|
||
|
|
+ return int(value)
|
||
|
|
+ else:
|
||
|
|
+ return round(value, 1)
|
||
|
|
+
|
||
|
|
+ def get_io_length(self, curr_stage_value, last_stage_value, category):
|
||
|
|
+ try:
|
||
|
|
+ finish = int(curr_stage_value[category * 3 + IoStatus.FINISH]) - int(last_stage_value[category * 3 + IoStatus.FINISH])
|
||
|
|
+ except ValueError as e:
|
||
|
|
+ logging.error("get_io_length convert to int failed, %s", e)
|
||
|
|
+ return 0
|
||
|
|
+ value = finish / self.period_time / 1000 / 1000
|
||
|
|
+ if value.is_integer():
|
||
|
|
+ return int(value)
|
||
|
|
+ else:
|
||
|
|
+ return round(value, 1)
|
||
|
|
+
|
||
|
|
+ def get_io_dump(self, disk_name, stage, category):
|
||
|
|
+ io_dump_file = '/sys/kernel/debug/block/{}/blk_io_hierarchy/{}/io_dump'.format(disk_name, stage)
|
||
|
|
+ count = 0
|
||
|
|
+ try:
|
||
|
|
+ with open(io_dump_file, 'r') as file:
|
||
|
|
+ for line in file:
|
||
|
|
+ count += line.count('.op=' + Io_Category[category])
|
||
|
|
+ except FileNotFoundError:
|
||
|
|
+ logging.error("The file %s does not exist.", io_dump_file)
|
||
|
|
+ return count
|
||
|
|
+ except Exception as e:
|
||
|
|
+ logging.error("An error occurred1: %s", e)
|
||
|
|
+ return count
|
||
|
|
+ return count
|
||
|
|
+
|
||
|
|
+ def extract_first_column(self, file_path):
|
||
|
|
+ column_names = []
|
||
|
|
+ try:
|
||
|
|
+ with open(file_path, 'r') as file:
|
||
|
|
+ for line in file:
|
||
|
|
+ parts = line.strip().split()
|
||
|
|
+ if parts:
|
||
|
|
+ column_names.append(parts[0])
|
||
|
|
+ except FileNotFoundError:
|
||
|
|
+ logging.error("The file %s does not exist.", file_path)
|
||
|
|
+ except Exception as e:
|
||
|
|
+ logging.error("An error occurred2: %s", e)
|
||
|
|
+ return column_names
|
||
|
|
+
|
||
|
|
+ def task_loop(self):
|
||
|
|
+ if self.stop_event.is_set():
|
||
|
|
+ logging.info("collect io thread exit")
|
||
|
|
+ return
|
||
|
|
+
|
||
|
|
+ for disk_name, stage_list in self.disk_map_stage.items():
|
||
|
|
+ if self.get_blk_io_hierarchy(disk_name, stage_list) < 0:
|
||
|
|
+ continue
|
||
|
|
+ self.append_period_lat(disk_name, stage_list)
|
||
|
|
+
|
||
|
|
+ threading.Timer(self.period_time, self.task_loop).start()
|
||
|
|
+
|
||
|
|
+ def main_loop(self):
|
||
|
|
+ logging.info("collect io thread start")
|
||
|
|
+ base_path = '/sys/kernel/debug/block'
|
||
|
|
+ for disk_name in os.listdir(base_path):
|
||
|
|
+ if not self.loop_all and disk_name not in self.disk_list:
|
||
|
|
+ continue
|
||
|
|
+
|
||
|
|
+ disk_path = os.path.join(base_path, disk_name)
|
||
|
|
+ blk_io_hierarchy_path = os.path.join(disk_path, 'blk_io_hierarchy')
|
||
|
|
+
|
||
|
|
+ if not os.path.exists(blk_io_hierarchy_path):
|
||
|
|
+ logging.error("no blk_io_hierarchy directory found in %s, skipping.", disk_name)
|
||
|
|
+ continue
|
||
|
|
+
|
||
|
|
+ for file_name in os.listdir(blk_io_hierarchy_path):
|
||
|
|
+ file_path = os.path.join(blk_io_hierarchy_path, file_name)
|
||
|
|
+
|
||
|
|
+ if file_name == 'stats':
|
||
|
|
+ stage_list = self.extract_first_column(file_path)
|
||
|
|
+ self.disk_map_stage[disk_name] = stage_list
|
||
|
|
+ self.window_value[disk_name] = {}
|
||
|
|
+ IO_GLOBAL_DATA[disk_name] = {}
|
||
|
|
+
|
||
|
|
+ if len(self.disk_map_stage) == 0:
|
||
|
|
+ logging.warning("no disks meet the requirements. the thread exits")
|
||
|
|
+ return
|
||
|
|
+
|
||
|
|
+ for disk_name, stage_list in self.disk_map_stage.items():
|
||
|
|
+ for stage in stage_list:
|
||
|
|
+ self.window_value[disk_name][stage] = []
|
||
|
|
+ IO_GLOBAL_DATA[disk_name][stage] = {}
|
||
|
|
+ for category in Io_Category:
|
||
|
|
+ IO_GLOBAL_DATA[disk_name][stage][category] = []
|
||
|
|
+
|
||
|
|
+ while True:
|
||
|
|
+ start_time = time.time()
|
||
|
|
+
|
||
|
|
+ if self.stop_event.is_set():
|
||
|
|
+ logging.info("collect io thread exit")
|
||
|
|
+ return
|
||
|
|
+
|
||
|
|
+ for disk_name, stage_list in self.disk_map_stage.items():
|
||
|
|
+ if self.get_blk_io_hierarchy(disk_name, stage_list) < 0:
|
||
|
|
+ continue
|
||
|
|
+ self.append_period_lat(disk_name, stage_list)
|
||
|
|
+
|
||
|
|
+ elapsed_time = time.time() - start_time
|
||
|
|
+ sleep_time = self.period_time - elapsed_time
|
||
|
|
+ if sleep_time < 0:
|
||
|
|
+ continue
|
||
|
|
+ while sleep_time > 1:
|
||
|
|
+ if self.stop_event.is_set():
|
||
|
|
+ logging.info("collect io thread exit")
|
||
|
|
+ return
|
||
|
|
+ time.sleep(1)
|
||
|
|
+ sleep_time -= 1
|
||
|
|
+ time.sleep(sleep_time)
|
||
|
|
+
|
||
|
|
+ # set stop event, notify thread exit
|
||
|
|
+ def stop_thread(self):
|
||
|
|
+ logging.info("collect io thread is preparing to exit")
|
||
|
|
+ self.stop_event.set()
|
||
|
|
diff --git a/src/python/sentryCollector/collect_plugin.py b/src/python/sentryCollector/collect_plugin.py
|
||
|
|
new file mode 100644
|
||
|
|
index 0000000..49ce0a8
|
||
|
|
--- /dev/null
|
||
|
|
+++ b/src/python/sentryCollector/collect_plugin.py
|
||
|
|
@@ -0,0 +1,276 @@
|
||
|
|
+# coding: utf-8
|
||
|
|
+# Copyright (c) 2024 Huawei Technologies Co., Ltd.
|
||
|
|
+# sysSentry is licensed under the Mulan PSL v2.
|
||
|
|
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
|
||
|
|
+# You may obtain a copy of Mulan PSL v2 at:
|
||
|
|
+# http://license.coscl.org.cn/MulanPSL2
|
||
|
|
+# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
|
||
|
|
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
|
||
|
|
+# PURPOSE.
|
||
|
|
+# See the Mulan PSL v2 for more details.
|
||
|
|
+
|
||
|
|
+"""
|
||
|
|
+collcet plugin
|
||
|
|
+"""
|
||
|
|
+import json
|
||
|
|
+import socket
|
||
|
|
+import logging
|
||
|
|
+import re
|
||
|
|
+
|
||
|
|
+COLLECT_SOCKET_PATH = "/var/run/sysSentry/collector.sock"
|
||
|
|
+
|
||
|
|
+# data length param
|
||
|
|
+CLT_MSG_HEAD_LEN = 9 #3+2+4
|
||
|
|
+CLT_MSG_PRO_LEN = 2
|
||
|
|
+CLT_MSG_MAGIC_LEN = 3
|
||
|
|
+CLT_MSG_LEN_LEN = 4
|
||
|
|
+
|
||
|
|
+CLT_MAGIC = "CLT"
|
||
|
|
+RES_MAGIC = "RES"
|
||
|
|
+
|
||
|
|
+# disk limit
|
||
|
|
+LIMIT_DISK_CHAR_LEN = 32
|
||
|
|
+LIMIT_DISK_LIST_LEN = 10
|
||
|
|
+
|
||
|
|
+# stage limit
|
||
|
|
+LIMIT_STAGE_CHAR_LEN = 20
|
||
|
|
+LIMIT_STAGE_LIST_LEN = 15
|
||
|
|
+
|
||
|
|
+#iotype limit
|
||
|
|
+LIMIT_IOTYPE_CHAR_LEN = 7
|
||
|
|
+LIMIT_IOTYPE_LIST_LEN = 4
|
||
|
|
+
|
||
|
|
+#period limit
|
||
|
|
+LIMIT_PERIOD_MIN_LEN = 1
|
||
|
|
+LIMIT_PERIOD_MAX_LEN = 300
|
||
|
|
+
|
||
|
|
+# interface protocol
|
||
|
|
+class ClientProtocol():
|
||
|
|
+ IS_IOCOLLECT_VALID = 0
|
||
|
|
+ GET_IO_DATA = 1
|
||
|
|
+ PRO_END = 3
|
||
|
|
+
|
||
|
|
+class ResultMessage():
|
||
|
|
+ RESULT_SUCCEED = 0
|
||
|
|
+ RESULT_UNKNOWN = 1 # unknown error
|
||
|
|
+ RESULT_NOT_PARAM = 2 # the parameter does not exist or the type does not match.
|
||
|
|
+ RESULT_INVALID_LENGTH = 3 # invalid parameter length.
|
||
|
|
+ RESULT_EXCEED_LIMIT = 4 # the parameter length exceeds the limit.
|
||
|
|
+ RESULT_PARSE_FAILED = 5 # parse failed
|
||
|
|
+ RESULT_INVALID_CHAR = 6 # invalid char
|
||
|
|
+
|
||
|
|
+Result_Messages = {
|
||
|
|
+ ResultMessage.RESULT_SUCCEED: "Succeed",
|
||
|
|
+ ResultMessage.RESULT_UNKNOWN: "Unknown error",
|
||
|
|
+ ResultMessage.RESULT_NOT_PARAM: "The parameter does not exist or the type does not match",
|
||
|
|
+ ResultMessage.RESULT_INVALID_LENGTH: "Invalid parameter length",
|
||
|
|
+ ResultMessage.RESULT_EXCEED_LIMIT: "The parameter length exceeds the limit",
|
||
|
|
+ ResultMessage.RESULT_PARSE_FAILED: "Parse failed",
|
||
|
|
+ ResultMessage.RESULT_INVALID_CHAR: "Invalid char"
|
||
|
|
+}
|
||
|
|
+
|
||
|
|
+
|
||
|
|
+def client_send_and_recv(request_data, data_str_len, protocol):
|
||
|
|
+ """client socket send and recv message"""
|
||
|
|
+ try:
|
||
|
|
+ client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||
|
|
+ except socket.error:
|
||
|
|
+ print("collect_plugin: client creat socket error")
|
||
|
|
+ return None
|
||
|
|
+
|
||
|
|
+ try:
|
||
|
|
+ client_socket.connect(COLLECT_SOCKET_PATH)
|
||
|
|
+ except OSError:
|
||
|
|
+ client_socket.close()
|
||
|
|
+ print("collect_plugin: client connect error")
|
||
|
|
+ return None
|
||
|
|
+
|
||
|
|
+ req_data_len = len(request_data)
|
||
|
|
+ request_msg = CLT_MAGIC + str(protocol).zfill(CLT_MSG_PRO_LEN) + str(req_data_len).zfill(CLT_MSG_LEN_LEN) + request_data
|
||
|
|
+
|
||
|
|
+ try:
|
||
|
|
+ client_socket.send(request_msg.encode())
|
||
|
|
+ res_data = client_socket.recv(len(RES_MAGIC) + CLT_MSG_PRO_LEN + data_str_len)
|
||
|
|
+ res_data = res_data.decode()
|
||
|
|
+ except (OSError, UnicodeError):
|
||
|
|
+ client_socket.close()
|
||
|
|
+ print("collect_plugin: client communicate error")
|
||
|
|
+ return None
|
||
|
|
+
|
||
|
|
+ res_magic = res_data[:CLT_MSG_MAGIC_LEN]
|
||
|
|
+ if res_magic != "RES":
|
||
|
|
+ print("res msg format error")
|
||
|
|
+ return None
|
||
|
|
+
|
||
|
|
+ protocol_str = res_data[CLT_MSG_MAGIC_LEN:CLT_MSG_MAGIC_LEN+CLT_MSG_PRO_LEN]
|
||
|
|
+ try:
|
||
|
|
+ protocol_id = int(protocol_str)
|
||
|
|
+ except ValueError:
|
||
|
|
+ print("recv msg protocol id is invalid %s", protocol_str)
|
||
|
|
+ return None
|
||
|
|
+
|
||
|
|
+ if protocol_id >= ClientProtocol.PRO_END:
|
||
|
|
+ print("protocol id is invalid")
|
||
|
|
+ return None
|
||
|
|
+
|
||
|
|
+ try:
|
||
|
|
+ res_data_len = int(res_data[CLT_MSG_MAGIC_LEN+CLT_MSG_PRO_LEN:])
|
||
|
|
+ res_msg_data = client_socket.recv(res_data_len)
|
||
|
|
+ res_msg_data = res_msg_data.decode()
|
||
|
|
+ return res_msg_data
|
||
|
|
+ except (OSError, ValueError, UnicodeError):
|
||
|
|
+ print("collect_plugin: client recv res msg error")
|
||
|
|
+ finally:
|
||
|
|
+ client_socket.close()
|
||
|
|
+
|
||
|
|
+ return None
|
||
|
|
+
|
||
|
|
+def validate_parameters(param, len_limit, char_limit):
|
||
|
|
+ ret = ResultMessage.RESULT_SUCCEED
|
||
|
|
+ if not param:
|
||
|
|
+ print("parm is invalid")
|
||
|
|
+ ret = ResultMessage.RESULT_NOT_PARAM
|
||
|
|
+ return [False, ret]
|
||
|
|
+
|
||
|
|
+ if not isinstance(param, list):
|
||
|
|
+ print(f"{param} is not list type.")
|
||
|
|
+ ret = ResultMessage.RESULT_NOT_PARAM
|
||
|
|
+ return [False, ret]
|
||
|
|
+
|
||
|
|
+ if len(param) <= 0:
|
||
|
|
+ print(f"{param} length is 0.")
|
||
|
|
+ ret = ResultMessage.RESULT_INVALID_LENGTH
|
||
|
|
+ return [False, ret]
|
||
|
|
+
|
||
|
|
+ if len(param) > len_limit:
|
||
|
|
+ print(f"{param} length more than {len_limit}")
|
||
|
|
+ ret = ResultMessage.RESULT_EXCEED_LIMIT
|
||
|
|
+ return [False, ret]
|
||
|
|
+
|
||
|
|
+ pattern = r'^[a-zA-Z0-9_-]+$'
|
||
|
|
+ for info in param:
|
||
|
|
+ if len(info) > char_limit:
|
||
|
|
+ print(f"{info} length more than {char_limit}")
|
||
|
|
+ ret = ResultMessage.RESULT_EXCEED_LIMIT
|
||
|
|
+ return [False, ret]
|
||
|
|
+ if not re.match(pattern, info):
|
||
|
|
+ print(f"{info} is invalid char")
|
||
|
|
+ ret = ResultMessage.RESULT_INVALID_CHAR
|
||
|
|
+ return [False, ret]
|
||
|
|
+
|
||
|
|
+ return [True, ret]
|
||
|
|
+
|
||
|
|
+def is_iocollect_valid(period, disk_list=None, stage=None):
|
||
|
|
+ result = inter_is_iocollect_valid(period, disk_list, stage)
|
||
|
|
+ error_code = result['ret']
|
||
|
|
+ if error_code != ResultMessage.RESULT_SUCCEED:
|
||
|
|
+ result['message'] = Result_Messages[error_code]
|
||
|
|
+ return result
|
||
|
|
+
|
||
|
|
+def inter_is_iocollect_valid(period, disk_list=None, stage=None):
|
||
|
|
+ result = {}
|
||
|
|
+ result['ret'] = ResultMessage.RESULT_UNKNOWN
|
||
|
|
+ result['message'] = ""
|
||
|
|
+
|
||
|
|
+ if not period or not isinstance(period, int):
|
||
|
|
+ result['ret'] = ResultMessage.RESULT_NOT_PARAM
|
||
|
|
+ return result
|
||
|
|
+ if period < LIMIT_PERIOD_MIN_LEN or period > LIMIT_PERIOD_MAX_LEN:
|
||
|
|
+ result['ret'] = ResultMessage.RESULT_INVALID_LENGTH
|
||
|
|
+ return result
|
||
|
|
+
|
||
|
|
+ if not disk_list:
|
||
|
|
+ disk_list = []
|
||
|
|
+ else:
|
||
|
|
+ res = validate_parameters(disk_list, LIMIT_DISK_LIST_LEN, LIMIT_DISK_CHAR_LEN)
|
||
|
|
+ if not res[0]:
|
||
|
|
+ result['ret'] = res[1]
|
||
|
|
+ return result
|
||
|
|
+
|
||
|
|
+ if not stage:
|
||
|
|
+ stage = []
|
||
|
|
+ else:
|
||
|
|
+ res = validate_parameters(stage, LIMIT_STAGE_LIST_LEN, LIMIT_STAGE_CHAR_LEN)
|
||
|
|
+ if not res[0]:
|
||
|
|
+ result['ret'] = res[1]
|
||
|
|
+ return result
|
||
|
|
+
|
||
|
|
+ req_msg_struct = {
|
||
|
|
+ 'disk_list': json.dumps(disk_list),
|
||
|
|
+ 'period': period,
|
||
|
|
+ 'stage': json.dumps(stage)
|
||
|
|
+ }
|
||
|
|
+ request_message = json.dumps(req_msg_struct)
|
||
|
|
+ result_message = client_send_and_recv(request_message, CLT_MSG_LEN_LEN, ClientProtocol.IS_IOCOLLECT_VALID)
|
||
|
|
+ if not result_message:
|
||
|
|
+ print("collect_plugin: client_send_and_recv failed")
|
||
|
|
+ return result
|
||
|
|
+
|
||
|
|
+ try:
|
||
|
|
+ json.loads(result_message)
|
||
|
|
+ except json.JSONDecodeError:
|
||
|
|
+ print("is_iocollect_valid: json decode error")
|
||
|
|
+ result['ret'] = ResultMessage.RESULT_PARSE_FAILED
|
||
|
|
+ return result
|
||
|
|
+
|
||
|
|
+ result['ret'] = ResultMessage.RESULT_SUCCEED
|
||
|
|
+ result['message'] = result_message
|
||
|
|
+ return result
|
||
|
|
+
|
||
|
|
+def get_io_data(period, disk_list, stage, iotype):
|
||
|
|
+ result = inter_get_io_data(period, disk_list, stage, iotype)
|
||
|
|
+ error_code = result['ret']
|
||
|
|
+ if error_code != ResultMessage.RESULT_SUCCEED:
|
||
|
|
+ result['message'] = Result_Messages[error_code]
|
||
|
|
+ return result
|
||
|
|
+
|
||
|
|
+def inter_get_io_data(period, disk_list, stage, iotype):
|
||
|
|
+ result = {}
|
||
|
|
+ result['ret'] = ResultMessage.RESULT_UNKNOWN
|
||
|
|
+ result['message'] = ""
|
||
|
|
+
|
||
|
|
+ if not isinstance(period, int):
|
||
|
|
+ result['ret'] = ResultMessage.RESULT_NOT_PARAM
|
||
|
|
+ return result
|
||
|
|
+ if period < LIMIT_PERIOD_MIN_LEN or period > LIMIT_PERIOD_MAX_LEN:
|
||
|
|
+ result['ret'] = ResultMessage.RESULT_INVALID_LENGTH
|
||
|
|
+ return result
|
||
|
|
+
|
||
|
|
+ res = validate_parameters(disk_list, LIMIT_DISK_LIST_LEN, LIMIT_DISK_CHAR_LEN)
|
||
|
|
+ if not res[0]:
|
||
|
|
+ result['ret'] = res[1]
|
||
|
|
+ return result
|
||
|
|
+
|
||
|
|
+ res = validate_parameters(stage, LIMIT_STAGE_LIST_LEN, LIMIT_STAGE_CHAR_LEN)
|
||
|
|
+ if not res[0]:
|
||
|
|
+ result['ret'] = res[1]
|
||
|
|
+ return result
|
||
|
|
+
|
||
|
|
+ res = validate_parameters(iotype, LIMIT_IOTYPE_LIST_LEN, LIMIT_IOTYPE_CHAR_LEN)
|
||
|
|
+ if not res[0]:
|
||
|
|
+ result['ret'] = res[1]
|
||
|
|
+ return result
|
||
|
|
+
|
||
|
|
+ req_msg_struct = {
|
||
|
|
+ 'disk_list': json.dumps(disk_list),
|
||
|
|
+ 'period': period,
|
||
|
|
+ 'stage': json.dumps(stage),
|
||
|
|
+ 'iotype' : json.dumps(iotype)
|
||
|
|
+ }
|
||
|
|
+
|
||
|
|
+ request_message = json.dumps(req_msg_struct)
|
||
|
|
+ result_message = client_send_and_recv(request_message, CLT_MSG_LEN_LEN, ClientProtocol.GET_IO_DATA)
|
||
|
|
+ if not result_message:
|
||
|
|
+ print("collect_plugin: client_send_and_recv failed")
|
||
|
|
+ return result
|
||
|
|
+ try:
|
||
|
|
+ json.loads(result_message)
|
||
|
|
+ except json.JSONDecodeError:
|
||
|
|
+ print("get_io_data: json decode error")
|
||
|
|
+ result['ret'] = ResultMessage.RESULT_PARSE_FAILED
|
||
|
|
+ return result
|
||
|
|
+
|
||
|
|
+ result['ret'] = ResultMessage.RESULT_SUCCEED
|
||
|
|
+ result['message'] = result_message
|
||
|
|
+ return result
|
||
|
|
+
|
||
|
|
diff --git a/src/python/sentryCollector/collect_server.py b/src/python/sentryCollector/collect_server.py
|
||
|
|
new file mode 100644
|
||
|
|
index 0000000..fa49781
|
||
|
|
--- /dev/null
|
||
|
|
+++ b/src/python/sentryCollector/collect_server.py
|
||
|
|
@@ -0,0 +1,285 @@
|
||
|
|
+# coding: utf-8
|
||
|
|
+# Copyright (c) 2024 Huawei Technologies Co., Ltd.
|
||
|
|
+# sysSentry is licensed under the Mulan PSL v2.
|
||
|
|
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
|
||
|
|
+# You may obtain a copy of Mulan PSL v2 at:
|
||
|
|
+# http://license.coscl.org.cn/MulanPSL2
|
||
|
|
+# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
|
||
|
|
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
|
||
|
|
+# PURPOSE.
|
||
|
|
+# See the Mulan PSL v2 for more details.
|
||
|
|
+
|
||
|
|
+"""
|
||
|
|
+listen module
|
||
|
|
+"""
|
||
|
|
+import sys
|
||
|
|
+import signal
|
||
|
|
+import traceback
|
||
|
|
+import socket
|
||
|
|
+import os
|
||
|
|
+import json
|
||
|
|
+import logging
|
||
|
|
+import fcntl
|
||
|
|
+import select
|
||
|
|
+import threading
|
||
|
|
+import time
|
||
|
|
+
|
||
|
|
+from .collect_io import IO_GLOBAL_DATA, IO_CONFIG_DATA
|
||
|
|
+from .collect_config import CollectConfig
|
||
|
|
+
|
||
|
|
+SENTRY_RUN_DIR = "/var/run/sysSentry"
|
||
|
|
+COLLECT_SOCKET_PATH = "/var/run/sysSentry/collector.sock"
|
||
|
|
+
|
||
|
|
+# socket param
|
||
|
|
+CLT_LISTEN_QUEUE_LEN = 5
|
||
|
|
+SERVER_EPOLL_TIMEOUT = 0.3
|
||
|
|
+
|
||
|
|
+# data length param
|
||
|
|
+CLT_MSG_HEAD_LEN = 9 #3+2+4
|
||
|
|
+CLT_MSG_PRO_LEN = 2
|
||
|
|
+CLT_MSG_MAGIC_LEN = 3
|
||
|
|
+CLT_MSG_LEN_LEN = 4
|
||
|
|
+
|
||
|
|
+# data flag param
|
||
|
|
+CLT_MAGIC = "CLT"
|
||
|
|
+RES_MAGIC = "RES"
|
||
|
|
+
|
||
|
|
+# interface protocol
|
||
|
|
+class ServerProtocol():
|
||
|
|
+ IS_IOCOLLECT_VALID = 0
|
||
|
|
+ GET_IO_DATA = 1
|
||
|
|
+ PRO_END = 3
|
||
|
|
+
|
||
|
|
+class CollectServer():
|
||
|
|
+
|
||
|
|
+ def __init__(self):
|
||
|
|
+
|
||
|
|
+ self.io_global_data = {}
|
||
|
|
+
|
||
|
|
+ self.stop_event = threading.Event()
|
||
|
|
+
|
||
|
|
+ def is_iocollect_valid(self, data_struct):
|
||
|
|
+
|
||
|
|
+ result_rev = {}
|
||
|
|
+ self.io_global_data = IO_GLOBAL_DATA
|
||
|
|
+
|
||
|
|
+ if len(IO_CONFIG_DATA) == 0:
|
||
|
|
+ logging.error("the collect thread is not started, the data is invalid. ")
|
||
|
|
+ return json.dumps(result_rev)
|
||
|
|
+
|
||
|
|
+ period_time = IO_CONFIG_DATA[0]
|
||
|
|
+ max_save = IO_CONFIG_DATA[1]
|
||
|
|
+
|
||
|
|
+ disk_list = json.loads(data_struct['disk_list'])
|
||
|
|
+ period = int(data_struct['period'])
|
||
|
|
+ stage_list = json.loads(data_struct['stage'])
|
||
|
|
+
|
||
|
|
+ if (period < period_time) or (period > period_time * max_save) or (period % period_time):
|
||
|
|
+ logging.error("is_iocollect_valid: period time: %d is invalid", period)
|
||
|
|
+ return json.dumps(result_rev)
|
||
|
|
+
|
||
|
|
+ for disk_name, stage_info in self.io_global_data.items():
|
||
|
|
+ if len(disk_list) > 0 and disk_name not in disk_list:
|
||
|
|
+ continue
|
||
|
|
+ result_rev[disk_name] = []
|
||
|
|
+ if len(stage_list) == 0:
|
||
|
|
+ result_rev[disk_name] = list(stage_info.keys())
|
||
|
|
+ continue
|
||
|
|
+ for stage_name, stage_data in stage_info.items():
|
||
|
|
+ if stage_name in stage_list:
|
||
|
|
+ result_rev[disk_name].append(stage_name)
|
||
|
|
+
|
||
|
|
+ return json.dumps(result_rev)
|
||
|
|
+
|
||
|
|
+ def get_io_data(self, data_struct):
|
||
|
|
+ result_rev = {}
|
||
|
|
+ self.io_global_data = IO_GLOBAL_DATA
|
||
|
|
+
|
||
|
|
+ if len(IO_CONFIG_DATA) == 0:
|
||
|
|
+ logging.error("the collect thread is not started, the data is invalid. ")
|
||
|
|
+ return json.dumps(result_rev)
|
||
|
|
+ period_time = IO_CONFIG_DATA[0]
|
||
|
|
+ max_save = IO_CONFIG_DATA[1]
|
||
|
|
+
|
||
|
|
+ period = int(data_struct['period'])
|
||
|
|
+ disk_list = json.loads(data_struct['disk_list'])
|
||
|
|
+ stage_list = json.loads(data_struct['stage'])
|
||
|
|
+ iotype_list = json.loads(data_struct['iotype'])
|
||
|
|
+
|
||
|
|
+ if (period < period_time) or (period > period_time * max_save) or (period % period_time):
|
||
|
|
+ logging.error("get_io_data: period time: %d is invalid", period)
|
||
|
|
+ return json.dumps(result_rev)
|
||
|
|
+
|
||
|
|
+ collect_index = period // period_time - 1
|
||
|
|
+ logging.debug("period: %d, collect_index: %d", period, collect_index)
|
||
|
|
+
|
||
|
|
+ for disk_name, stage_info in self.io_global_data.items():
|
||
|
|
+ if disk_name not in disk_list:
|
||
|
|
+ continue
|
||
|
|
+ result_rev[disk_name] = {}
|
||
|
|
+ for stage_name, iotype_info in stage_info.items():
|
||
|
|
+ if len(stage_list) > 0 and stage_name not in stage_list:
|
||
|
|
+ continue
|
||
|
|
+ result_rev[disk_name][stage_name] = {}
|
||
|
|
+ for iotype_name, iotype_info in iotype_info.items():
|
||
|
|
+ if iotype_name not in iotype_list:
|
||
|
|
+ continue
|
||
|
|
+ if len(iotype_info) < collect_index:
|
||
|
|
+ continue
|
||
|
|
+ result_rev[disk_name][stage_name][iotype_name] = iotype_info[collect_index]
|
||
|
|
+
|
||
|
|
+ return json.dumps(result_rev)
|
||
|
|
+
|
||
|
|
+ def msg_data_process(self, msg_data, protocal_id):
|
||
|
|
+ """message data process"""
|
||
|
|
+ logging.debug("msg_data %s", msg_data)
|
||
|
|
+ protocol_name = msg_data[0]
|
||
|
|
+ try:
|
||
|
|
+ data_struct = json.loads(msg_data)
|
||
|
|
+ except json.JSONDecodeError:
|
||
|
|
+ logging.error("msg data process: json decode error")
|
||
|
|
+ return "Request message decode failed"
|
||
|
|
+
|
||
|
|
+ if protocal_id == ServerProtocol.IS_IOCOLLECT_VALID:
|
||
|
|
+ res_msg = self.is_iocollect_valid(data_struct)
|
||
|
|
+ elif protocal_id == ServerProtocol.GET_IO_DATA:
|
||
|
|
+ res_msg = self.get_io_data(data_struct)
|
||
|
|
+
|
||
|
|
+ return res_msg
|
||
|
|
+
|
||
|
|
+ def msg_head_process(self, msg_head):
|
||
|
|
+ """message head process"""
|
||
|
|
+ ctl_magic = msg_head[:CLT_MSG_MAGIC_LEN]
|
||
|
|
+ if ctl_magic != CLT_MAGIC:
|
||
|
|
+ logging.error("recv msg head magic invalid")
|
||
|
|
+ return None
|
||
|
|
+
|
||
|
|
+ protocol_str = msg_head[CLT_MSG_MAGIC_LEN:CLT_MSG_MAGIC_LEN+CLT_MSG_PRO_LEN]
|
||
|
|
+ try:
|
||
|
|
+ protocol_id = int(protocol_str)
|
||
|
|
+ except ValueError:
|
||
|
|
+ logging.error("recv msg protocol id is invalid")
|
||
|
|
+ return None
|
||
|
|
+
|
||
|
|
+ data_len_str = msg_head[CLT_MSG_MAGIC_LEN+CLT_MSG_PRO_LEN:CLT_MSG_HEAD_LEN]
|
||
|
|
+ try:
|
||
|
|
+ data_len = int(data_len_str)
|
||
|
|
+ except ValueError:
|
||
|
|
+ logging.error("recv msg data len is invalid %s", data_len_str)
|
||
|
|
+ return None
|
||
|
|
+
|
||
|
|
+ return [protocol_id, data_len]
|
||
|
|
+
|
||
|
|
+ def server_recv(self, server_socket: socket.socket):
|
||
|
|
+ """server receive"""
|
||
|
|
+ try:
|
||
|
|
+ client_socket, _ = server_socket.accept()
|
||
|
|
+ logging.debug("server_fd listen ok")
|
||
|
|
+ except socket.error:
|
||
|
|
+ logging.error("server accept failed, %s", socket.error)
|
||
|
|
+ return
|
||
|
|
+
|
||
|
|
+ try:
|
||
|
|
+ msg_head = client_socket.recv(CLT_MSG_HEAD_LEN)
|
||
|
|
+ logging.debug("recv msg head: %s", msg_head.decode())
|
||
|
|
+ head_info = self.msg_head_process(msg_head.decode())
|
||
|
|
+ except (OSError, UnicodeError):
|
||
|
|
+ client_socket.close()
|
||
|
|
+ logging.error("server recv HEAD failed")
|
||
|
|
+ return
|
||
|
|
+
|
||
|
|
+ protocol_id = head_info[0]
|
||
|
|
+ data_len = head_info[1]
|
||
|
|
+ logging.debug("msg protocol id: %d, data length: %d", protocol_id, data_len)
|
||
|
|
+ if protocol_id >= ServerProtocol.PRO_END:
|
||
|
|
+ client_socket.close()
|
||
|
|
+ logging.error("protocol id is invalid")
|
||
|
|
+ return
|
||
|
|
+
|
||
|
|
+ if data_len < 0:
|
||
|
|
+ client_socket.close()
|
||
|
|
+ logging.error("msg head parse failed")
|
||
|
|
+ return
|
||
|
|
+
|
||
|
|
+ try:
|
||
|
|
+ msg_data = client_socket.recv(data_len)
|
||
|
|
+ msg_data_decode = msg_data.decode()
|
||
|
|
+ logging.debug("msg data %s", msg_data_decode)
|
||
|
|
+ except (OSError, UnicodeError):
|
||
|
|
+ client_socket.close()
|
||
|
|
+ logging.error("server recv MSG failed")
|
||
|
|
+ return
|
||
|
|
+
|
||
|
|
+ res_data = self.msg_data_process(msg_data_decode, protocol_id)
|
||
|
|
+ logging.debug("res data %s", res_data)
|
||
|
|
+
|
||
|
|
+ # server send
|
||
|
|
+ res_head = RES_MAGIC
|
||
|
|
+ res_head += str(protocol_id).zfill(CLT_MSG_PRO_LEN)
|
||
|
|
+ res_data_len = str(len(res_data)).zfill(CLT_MSG_LEN_LEN)
|
||
|
|
+ res_head += res_data_len
|
||
|
|
+ logging.debug("res head %s", res_head)
|
||
|
|
+
|
||
|
|
+ res_msg = res_head + res_data
|
||
|
|
+ logging.debug("res msg %s", res_msg)
|
||
|
|
+
|
||
|
|
+ try:
|
||
|
|
+ client_socket.send(res_msg.encode())
|
||
|
|
+ except OSError:
|
||
|
|
+ logging.error("server recv failed")
|
||
|
|
+ finally:
|
||
|
|
+ client_socket.close()
|
||
|
|
+ return
|
||
|
|
+
|
||
|
|
+ def server_fd_create(self):
|
||
|
|
+ """create server fd"""
|
||
|
|
+ if not os.path.exists(SENTRY_RUN_DIR):
|
||
|
|
+ logging.error("%s not exist, failed", SENTRY_RUN_DIR)
|
||
|
|
+ return None
|
||
|
|
+
|
||
|
|
+ try:
|
||
|
|
+ server_fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||
|
|
+ server_fd.setblocking(False)
|
||
|
|
+ if os.path.exists(COLLECT_SOCKET_PATH):
|
||
|
|
+ os.remove(COLLECT_SOCKET_PATH)
|
||
|
|
+
|
||
|
|
+ server_fd.bind(COLLECT_SOCKET_PATH)
|
||
|
|
+ os.chmod(COLLECT_SOCKET_PATH, 0o600)
|
||
|
|
+ server_fd.listen(CLT_LISTEN_QUEUE_LEN)
|
||
|
|
+ logging.debug("%s bind and listen", COLLECT_SOCKET_PATH)
|
||
|
|
+ except socket.error:
|
||
|
|
+ logging.error("server fd create failed")
|
||
|
|
+ server_fd = None
|
||
|
|
+
|
||
|
|
+ return server_fd
|
||
|
|
+
|
||
|
|
+
|
||
|
|
+ def server_loop(self):
|
||
|
|
+ """main loop"""
|
||
|
|
+ logging.info("collect server thread start")
|
||
|
|
+ server_fd = self.server_fd_create()
|
||
|
|
+ if not server_fd:
|
||
|
|
+ return
|
||
|
|
+
|
||
|
|
+ epoll_fd = select.epoll()
|
||
|
|
+ epoll_fd.register(server_fd.fileno(), select.EPOLLIN)
|
||
|
|
+
|
||
|
|
+ logging.debug("start server_loop loop")
|
||
|
|
+ while True:
|
||
|
|
+ if self.stop_event.is_set():
|
||
|
|
+ logging.info("collect server thread exit")
|
||
|
|
+ server_fd = None
|
||
|
|
+ return
|
||
|
|
+ try:
|
||
|
|
+ events_list = epoll_fd.poll(SERVER_EPOLL_TIMEOUT)
|
||
|
|
+ for event_fd, _ in events_list:
|
||
|
|
+ if event_fd == server_fd.fileno():
|
||
|
|
+ self.server_recv(server_fd)
|
||
|
|
+ else:
|
||
|
|
+ continue
|
||
|
|
+ except socket.error:
|
||
|
|
+ pass
|
||
|
|
+
|
||
|
|
+ def stop_thread(self):
|
||
|
|
+ logging.info("collect server thread is preparing to exit")
|
||
|
|
+ self.stop_event.set()
|
||
|
|
diff --git a/src/python/sentryCollector/collectd.py b/src/python/sentryCollector/collectd.py
|
||
|
|
new file mode 100644
|
||
|
|
index 0000000..b77c642
|
||
|
|
--- /dev/null
|
||
|
|
+++ b/src/python/sentryCollector/collectd.py
|
||
|
|
@@ -0,0 +1,99 @@
|
||
|
|
+# coding: utf-8
|
||
|
|
+# Copyright (c) 2024 Huawei Technologies Co., Ltd.
|
||
|
|
+# sysSentry is licensed under the Mulan PSL v2.
|
||
|
|
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
|
||
|
|
+# You may obtain a copy of Mulan PSL v2 at:
|
||
|
|
+# http://license.coscl.org.cn/MulanPSL2
|
||
|
|
+# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
|
||
|
|
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
|
||
|
|
+# PURPOSE.
|
||
|
|
+# See the Mulan PSL v2 for more details.
|
||
|
|
+
|
||
|
|
+"""
|
||
|
|
+main loop for collect.
|
||
|
|
+"""
|
||
|
|
+import sys
|
||
|
|
+import signal
|
||
|
|
+import traceback
|
||
|
|
+import socket
|
||
|
|
+import os
|
||
|
|
+import json
|
||
|
|
+import logging
|
||
|
|
+import fcntl
|
||
|
|
+import select
|
||
|
|
+
|
||
|
|
+import threading
|
||
|
|
+
|
||
|
|
+from .collect_io import CollectIo
|
||
|
|
+from .collect_server import CollectServer
|
||
|
|
+from .collect_config import CollectConfig
|
||
|
|
+
|
||
|
|
+SENTRY_RUN_DIR = "/var/run/sysSentry"
|
||
|
|
+COLLECT_SOCKET_PATH = "/var/run/sysSentry/collector.sock"
|
||
|
|
+SENTRY_RUN_DIR_PERM = 0o750
|
||
|
|
+
|
||
|
|
+COLLECT_LOG_FILE = "/var/log/sysSentry/collector.log"
|
||
|
|
+Thread_List = []
|
||
|
|
+Module_Map_Class = {"io" : CollectIo}
|
||
|
|
+
|
||
|
|
+def remove_sock_file():
|
||
|
|
+ try:
|
||
|
|
+ os.unlink(COLLECT_SOCKET_PATH)
|
||
|
|
+ except FileNotFoundError:
|
||
|
|
+ pass
|
||
|
|
+
|
||
|
|
+def sig_handler(signum, _f):
|
||
|
|
+ if signum not in (signal.SIGINT, signal.SIGTERM):
|
||
|
|
+ return
|
||
|
|
+ for i in range(len(Thread_List)):
|
||
|
|
+ Thread_List[i][0].stop_thread()
|
||
|
|
+
|
||
|
|
+ remove_sock_file()
|
||
|
|
+ sys.exit(0)
|
||
|
|
+
|
||
|
|
+def main():
|
||
|
|
+ """main
|
||
|
|
+ """
|
||
|
|
+ if not os.path.exists(SENTRY_RUN_DIR):
|
||
|
|
+ os.mkdir(SENTRY_RUN_DIR)
|
||
|
|
+ os.chmod(SENTRY_RUN_DIR, mode=SENTRY_RUN_DIR_PERM)
|
||
|
|
+
|
||
|
|
+ logging.basicConfig(filename=COLLECT_LOG_FILE, level=logging.INFO)
|
||
|
|
+ os.chmod(COLLECT_LOG_FILE, 0o600)
|
||
|
|
+
|
||
|
|
+ try:
|
||
|
|
+ signal.signal(signal.SIGINT, sig_handler)
|
||
|
|
+ signal.signal(signal.SIGTERM, sig_handler)
|
||
|
|
+ signal.signal(signal.SIGHUP, sig_handler)
|
||
|
|
+
|
||
|
|
+ logging.info("finish main parse_args")
|
||
|
|
+
|
||
|
|
+ module_config = CollectConfig()
|
||
|
|
+ module_list = module_config.modules
|
||
|
|
+
|
||
|
|
+ # listen thread
|
||
|
|
+ cs = CollectServer()
|
||
|
|
+ listen_thread = threading.Thread(target=cs.server_loop)
|
||
|
|
+ listen_thread.start()
|
||
|
|
+ Thread_List.append([cs, listen_thread])
|
||
|
|
+
|
||
|
|
+ # collect thread
|
||
|
|
+ for info in module_list:
|
||
|
|
+ class_name = Module_Map_Class.get(info)
|
||
|
|
+ if not class_name:
|
||
|
|
+ logging.info("%s correspond to class is not exists", info)
|
||
|
|
+ continue
|
||
|
|
+ cn = class_name(module_config)
|
||
|
|
+ collect_thread = threading.Thread(target=cn.main_loop)
|
||
|
|
+ collect_thread.start()
|
||
|
|
+ Thread_List.append([cn, collect_thread])
|
||
|
|
+
|
||
|
|
+ for i in range(len(Thread_List)):
|
||
|
|
+ Thread_List[i][1].join()
|
||
|
|
+
|
||
|
|
+ except Exception:
|
||
|
|
+ logging.error('%s', traceback.format_exc())
|
||
|
|
+ finally:
|
||
|
|
+ pass
|
||
|
|
+
|
||
|
|
+ logging.info("All threads have finished. Main thread is exiting.")
|
||
|
|
\ No newline at end of file
|
||
|
|
diff --git a/src/python/setup.py b/src/python/setup.py
|
||
|
|
index f96a96e..c28c691 100644
|
||
|
|
--- a/src/python/setup.py
|
||
|
|
+++ b/src/python/setup.py
|
||
|
|
@@ -31,7 +31,9 @@ setup(
|
||
|
|
'console_scripts': [
|
||
|
|
'cpu_sentry=syssentry.cpu_sentry:main',
|
||
|
|
'syssentry=syssentry.syssentry:main',
|
||
|
|
- 'xalarmd=xalarm.xalarm_daemon:alarm_process_create'
|
||
|
|
+ 'xalarmd=xalarm.xalarm_daemon:alarm_process_create',
|
||
|
|
+ 'sentryCollector=sentryCollector.collectd:main',
|
||
|
|
+ 'avg_block_io=sentryPlugins.avg_block_io.avg_block_io:main'
|
||
|
|
]
|
||
|
|
},
|
||
|
|
)
|
||
|
|
--
|
||
|
|
2.33.0
|
||
|
|
|