!46 新增pyxalarm和pySentryNotify库,xalarmd新增对多用户的支持

From: @pshysimon 
Reviewed-by: @hubin95 
Signed-off-by: @hubin95
This commit is contained in:
openeuler-ci-bot 2024-10-08 11:33:07 +00:00 committed by Gitee
commit 7813223246
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
2 changed files with 712 additions and 2 deletions

View File

@ -0,0 +1,678 @@
From a18ea2e94fef78334a56dce1ea3f67ee649732f3 Mon Sep 17 00:00:00 2001
From: PshySimon <caixiaomeng2@huawei.com>
Date: Thu, 26 Sep 2024 16:12:25 +0800
Subject: [PATCH] add pyxalarm and pySentryNotify, add multi users support for
xalarmd and adapt libxalarm
---
src/libso/xalarm/register_xalarm.c | 41 ++----
src/libso/xalarm/register_xalarm.h | 10 +-
src/python/xalarm/register_xalarm.py | 192 +++++++++++++++++++++++++++
src/python/xalarm/sentry_notify.py | 71 ++++++++++
src/python/xalarm/xalarm_api.py | 18 ++-
src/python/xalarm/xalarm_server.py | 40 +++++-
src/python/xalarm/xalarm_transfer.py | 96 ++++++++++++--
7 files changed, 408 insertions(+), 60 deletions(-)
create mode 100644 src/python/xalarm/register_xalarm.py
create mode 100644 src/python/xalarm/sentry_notify.py
diff --git a/src/libso/xalarm/register_xalarm.c b/src/libso/xalarm/register_xalarm.c
index 152c078..21a419f 100644
--- a/src/libso/xalarm/register_xalarm.c
+++ b/src/libso/xalarm/register_xalarm.c
@@ -35,7 +35,7 @@
#define ALARM_SOCKET_PERMISSION 0700
#define TIME_UNIT_MILLISECONDS 1000
-#define MAX_PARAS_LEN 511
+#define MAX_PARAS_LEN 1023
#define MIN_ALARM_ID 1001
#define MAX_ALARM_ID (MIN_ALARM_ID + MAX_NUM_OF_ALARM_ID - 1)
@@ -91,7 +91,7 @@ static int create_unix_socket(const char *path)
return -1;
}
- fd = socket(AF_UNIX, SOCK_DGRAM, 0);
+ fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0) {
printf("socket failed:%s\n", strerror(errno));
return -1;
@@ -103,14 +103,6 @@ static int create_unix_socket(const char *path)
goto release_socket;
}
- if (access(PATH_REG_ALARM, F_OK) == 0) {
- ret = unlink(PATH_REG_ALARM);
- if (ret != 0) {
- printf("unlink register socket file failed\n");
- goto release_socket;
- }
- }
-
if (access(DIR_XALARM, F_OK) == -1) {
if (mkdir(DIR_XALARM, ALARM_DIR_PERMISSION) == -1) {
printf("mkdir %s failed\n", DIR_XALARM);
@@ -120,32 +112,22 @@ static int create_unix_socket(const char *path)
if (memset(&alarm_addr, 0, sizeof(alarm_addr)) == NULL) {
printf("create_unix_socket: memset alarm_addr failed, ret: %d\n", ret);
- goto remove_dir;
+ goto release_socket;
}
alarm_addr.sun_family = AF_UNIX;
strncpy(alarm_addr.sun_path, path, sizeof(alarm_addr.sun_path) - 1);
- if (bind(fd, (struct sockaddr *)&alarm_addr, sizeof(alarm_addr.sun_family) + strlen(alarm_addr.sun_path)) < 0) {
- printf("bind socket failed:%s\n", strerror(errno));
- goto remove_dir;
+ if (connect(fd, (struct sockaddr*)&alarm_addr, sizeof(alarm_addr)) == -1) {
+ printf("create_unix_socket: connect alarm_addr failed, ret: %d\n", ret);
+ goto release_socket;
}
if (chmod(path, ALARM_SOCKET_PERMISSION) < 0) {
printf("chmod %s failed: %s\n", path, strerror(errno));
- goto unlink_sockfile;
+ goto release_socket;
}
return fd;
-unlink_sockfile:
- ret = unlink(PATH_REG_ALARM);
- if (ret != 0) {
- printf("unlink register socket file failed\n");
- }
-remove_dir:
- ret = rmdir(DIR_XALARM);
- if (ret != 0) {
- printf("rmdir %s failed: %s\n", path, strerror(errno));
- }
release_socket:
(void)close(fd);
@@ -271,8 +253,6 @@ int xalarm_Register(alarm_callback_func callback, struct alarm_subscription_info
void xalarm_UnRegister(int client_id)
{
- int ret;
-
if (!g_register_info.is_registered) {
printf("%s: alarm has not registered\n", __func__);
return;
@@ -292,10 +272,6 @@ void xalarm_UnRegister(int client_id)
if (g_register_info.register_fd != -1) {
(void)close(g_register_info.register_fd);
g_register_info.register_fd = -1;
- ret = unlink(PATH_REG_ALARM);
- if (ret != 0) {
- printf("%s: unlink register socket file failed\n", __func__);
- }
}
memset(g_register_info.alarm_enable_bitmap, 0, MAX_NUM_OF_ALARM_ID * sizeof(char));
@@ -357,7 +333,7 @@ int xalarm_Report(unsigned short usAlarmId, unsigned char ucAlarmLevel,
struct sockaddr_un alarm_addr;
if ((usAlarmId < MIN_ALARM_ID || usAlarmId > MAX_ALARM_ID) ||
- (ucAlarmLevel < ALARM_LEVEL_FATAL || ucAlarmLevel > ALARM_LEVEL_DEBUG) ||
+ (ucAlarmLevel < MINOR_ALM || ucAlarmLevel > CRITICAL_ALM) ||
(ucAlarmType < ALARM_TYPE_OCCUR || ucAlarmType > ALARM_TYPE_RECOVER)) {
fprintf(stderr, "%s: alarm info invalid\n", __func__);
return -1;
@@ -666,3 +642,4 @@ int report_result(const char *task_name, enum RESULT_LEVEL result_level, const c
return RETURE_CODE_SUCCESS;
}
+
diff --git a/src/libso/xalarm/register_xalarm.h b/src/libso/xalarm/register_xalarm.h
index 1f26c6a..fef9482 100644
--- a/src/libso/xalarm/register_xalarm.h
+++ b/src/libso/xalarm/register_xalarm.h
@@ -11,7 +11,7 @@
#include <sys/time.h>
#include <stdbool.h>
-#define ALARM_INFO_MAX_PARAS_LEN 512
+#define ALARM_INFO_MAX_PARAS_LEN 1024
#define MAX_STRERROR_SIZE 1024
#define MAX_ALARM_TYEPS 1024
#define MIN_ALARM_ID 1001
@@ -19,11 +19,9 @@
#define MEMORY_ALARM_ID 1001
-#define ALARM_LEVEL_FATAL 1
-#define ALARM_LEVEL_ERROR 2
-#define ALARM_LEVEL_WARNING 3
-#define ALARM_LEVEL_INFO 4
-#define ALARM_LEVEL_DEBUG 5
+#define MINOR_ALM 1
+#define MAJOR_ALM 2
+#define CRITICAL_ALM 3
#define ALARM_TYPE_OCCUR 1
#define ALARM_TYPE_RECOVER 2
diff --git a/src/python/xalarm/register_xalarm.py b/src/python/xalarm/register_xalarm.py
new file mode 100644
index 0000000..e58343d
--- /dev/null
+++ b/src/python/xalarm/register_xalarm.py
@@ -0,0 +1,192 @@
+import os
+import sys
+import socket
+import logging
+import threading
+import time
+import fcntl
+import inspect
+from struct import error as StructParseError
+
+from .xalarm_api import Xalarm, alarm_bin2stu
+
+
+ALARM_REPORT_LEN = 1048
+MAX_NUM_OF_ALARM_ID=128
+MIN_ALARM_ID = 1001
+MAX_ALARM_ID = (MIN_ALARM_ID + MAX_NUM_OF_ALARM_ID - 1)
+DIR_XALARM = "/var/run/xalarm"
+PATH_REG_ALARM = "/var/run/xalarm/alarm"
+PATH_REPORT_ALARM = "/var/run/xalarm/report"
+ALARM_DIR_PERMISSION = 0o0750
+ALARM_REG_SOCK_PERMISSION = 0o0700
+ALARM_SOCKET_PERMISSION = 0o0700
+TIME_UNIT_MILLISECONDS = 1000
+ALARM_REGISTER_INFO = None
+
+
+class AlarmRegister:
+ def __init__(self, id_filter: list[bool], callback: callable):
+ self.id_filter = id_filter
+ self.callback = callback
+ self.socket = self.create_unix_socket()
+ self.is_registered = True
+ self.thread = threading.Thread(target=self.alarm_recv)
+ self.thread_should_stop = False
+
+ def check_params(self) -> bool:
+ if (len(self.id_filter) != MAX_NUM_OF_ALARM_ID):
+ sys.stderr.write("check_params: invalid param id_filter\n")
+ return False
+
+ sig = inspect.signature(self.callback)
+ if len(sig.parameters) != 1:
+ sys.stderr.write("check_params: invalid param callback\n")
+ return False
+
+ if self.socket is None:
+ sys.stderr.write("check_params: scoket create failed\n")
+ return False
+ return True
+
+ def set_id_filter(self, id_filter: list[bool]) -> bool:
+ if (len(id_filter) > MAX_NUM_OF_ALARM_ID):
+ sys.stderr.write("set_id_filter: invalid param id_filter\n")
+ return False
+ self.id_filter = id_filter
+
+ def id_is_registered(self, alarm_id) -> bool:
+ if alarm_id < MIN_ALARM_ID or alarm_id > MAX_ALARM_ID:
+ return False
+ return self.id_filter[alarm_id - MIN_ALARM_ID]
+
+ def put_alarm_info(self, alarm_info: Xalarm) -> None:
+ if not self.callback or not alarm_info:
+ return
+ if not self.id_is_registered(alarm_info.alarm_id):
+ return
+ self.callback(alarm_info)
+
+ def create_unix_socket(self) -> socket.socket:
+ try:
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.setblocking(False)
+
+ if not os.access(DIR_XALARM, os.F_OK):
+ os.makedirs(DIR_XALARM)
+ os.chmod(DIR_XALARM, ALARM_DIR_PERMISSION)
+
+ sock.connect(PATH_REG_ALARM)
+ return sock
+ except (IOError, OSError, FileNotFoundError) as e:
+ sock.close()
+ sys.stderr.write(f"create_unix_socket: create socket error:{e}\n")
+ return None
+
+ def alarm_recv(self):
+ while not self.thread_should_stop:
+ try:
+ data = self.socket.recv(ALARM_REPORT_LEN)
+ if not data:
+ sys.stderr.write("connection closed by xalarmd, maybe connections reach max num or service stopped.\n")
+ self.thread_should_stop = True
+ break
+ if len(data) != ALARM_REPORT_LEN:
+ sys.stderr.write(f"server receive report msg length wrong {len(data)}\n")
+ continue
+
+ alarm_info = alarm_bin2stu(data)
+ self.put_alarm_info(alarm_info)
+ except (BlockingIOError) as e:
+ time.sleep(0.1)
+ except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError):
+ sys.stderr.write("Connection closed by the server.\n")
+ self.thread_should_stop = True
+ except (ValueError, StructParseError, InterruptedError) as e:
+ sys.stderr.write(f"{e}\n")
+ except Exception as e:
+ sys.stderr.write(f"{e}\n")
+ self.thread_should_stop = True
+
+ def start_thread(self) -> None:
+ self.thread.daemon = True
+ self.thread.start()
+
+ def stop_thread(self) -> None:
+ self.thread_should_stop = True
+ self.thread.join()
+ self.socket.close()
+
+
+def xalarm_register(callback: callable, id_filter: list[bool]) -> int:
+ global ALARM_REGISTER_INFO
+
+ if ALARM_REGISTER_INFO is not None:
+ sys.stderr.write("xalarm_register: alarm has registered\n")
+ return -1
+
+ ALARM_REGISTER_INFO = AlarmRegister(id_filter, callback)
+ if not ALARM_REGISTER_INFO.check_params():
+ return -1
+
+ ALARM_REGISTER_INFO.start_thread()
+
+ return 0
+
+
+def xalarm_unregister(clientId: int) -> None:
+ global ALARM_REGISTER_INFO
+ if clientId < 0:
+ sys.stderr.write("xalarm_unregister: invalid client\n")
+ return
+
+ if ALARM_REGISTER_INFO is None:
+ sys.stderr.write("xalarm_unregister: alarm has not registered\n")
+ return
+
+ ALARM_REGISTER_INFO.stop_thread()
+ ALARM_REGISTER_INFO = None
+
+
+def xalarm_upgrade(clientId: int, id_filter: list[bool]) -> None:
+ global ALARM_REGISTER_INFO
+ if clientId < 0:
+ sys.stderr.write("xalarm_unregister: invalid client\n")
+ return
+ if ALARM_REGISTER_INFO is None:
+ sys.stderr.write("xalarm_unregister: alarm has not registered\n")
+ return
+ ALARM_REGISTER_INFO.id_filter = id_filter
+
+
+def xalarm_getid(alarm_info: Xalarm) -> int:
+ if not alarm_info:
+ return 0
+ return alarm_info.alarm_id
+
+
+def xalarm_getlevel(alarm_info: Xalarm) -> int:
+ if not alarm_info:
+ return 0
+ return alarm_info.alarm_level
+
+
+def xalarm_gettype(alarm_info: Xalarm) -> int:
+ if not alarm_info:
+ return 0
+ return alarm_info.alarm_type
+
+
+def xalarm_gettime(alarm_info: Xalarm) -> int:
+ if not alarm_info:
+ return 0
+ return alarm_info.timetamp.tv_sec * TIME_UNIT_MILLISECONDS + alarm_info.timetamp.tv_usec / TIME_UNIT_MILLISECONDS
+
+def xalarm_getdesc(alarm_info: Xalarm) -> str:
+ if not alarm_info:
+ return None
+ try:
+ desc_str = alarm_info.msg1.rstrip(b'\x00').decode('utf-8')
+ except UnicodeError:
+ desc_str = None
+ return desc_str
diff --git a/src/python/xalarm/sentry_notify.py b/src/python/xalarm/sentry_notify.py
new file mode 100644
index 0000000..a19e5b3
--- /dev/null
+++ b/src/python/xalarm/sentry_notify.py
@@ -0,0 +1,71 @@
+import os
+import sys
+import time
+import socket
+from struct import error as StructParseError
+
+from .xalarm_api import alarm_stu2bin, Xalarm
+
+MAX_NUM_OF_ALARM_ID = 128
+MIN_ALARM_ID = 1001
+MAX_ALARM_ID = (MIN_ALARM_ID + MAX_NUM_OF_ALARM_ID - 1)
+
+MINOR_ALM = 1
+MAJOR_ALM = 2
+CRITICAL_ALM = 3
+
+ALARM_TYPE_OCCUR = 1
+ALARM_TYPE_RECOVER = 2
+
+MAX_PUC_PARAS_LEN = 1024
+
+DIR_XALARM = "/var/run/xalarm"
+PATH_REPORT_ALARM = "/var/run/xalarm/report"
+ALARM_DIR_PERMISSION = 0o750
+ALARM_SOCKET_PERMISSION = 0o700
+
+
+def check_params(alarm_id, alarm_level, alarm_type, puc_paras) -> bool:
+ if not os.path.exists(DIR_XALARM):
+ sys.stderr.write(f"check_params: {DIR_XALARM} not exist, failed")
+ return False
+
+ if not os.path.exists(PATH_REPORT_ALARM):
+ sys.stderr.write(f"check_params: {PATH_REPORT_ALARM} not exist, failed")
+ return False
+
+ if (alarm_id < MIN_ALARM_ID or alarm_id > MAX_ALARM_ID or
+ alarm_level < MINOR_ALM or alarm_level > CRITICAL_ALM or
+ alarm_type < ALARM_TYPE_OCCUR or alarm_type > ALARM_TYPE_RECOVER):
+ sys.stderr.write("check_params: alarm info invalid\n")
+ return False
+
+ if len(puc_paras) >= MAX_PUC_PARAS_LEN:
+ sys.stderr.write(f"check_params: alarm msg should be less than {MAX_PUC_PARAS_LEN}\n")
+ return False
+
+ return True
+
+def xalarm_report(alarm_id, alarm_level, alarm_type, puc_paras) -> bool:
+ if not check_params(alarm_id, alarm_level, alarm_type, puc_paras):
+ return False
+
+ try:
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+
+ current_time = time.time()
+ current_time_seconds = int(current_time)
+ current_microseconds = int((current_time - current_time_seconds) * 1_000_000)
+ alarm_info = Xalarm(alarm_id, alarm_type, alarm_level,
+ current_time_seconds, current_microseconds, puc_paras)
+
+ sock.sendto(alarm_stu2bin(alarm_info), PATH_REPORT_ALARM)
+ except (FileNotFoundError, StructParseError, socket.error, OSError, UnicodeError) as e:
+ sys.stderr.write(f"check_params: error occurs when sending msg.{e}\n")
+ return False
+ finally:
+ sock.close()
+
+ return True
+
+
diff --git a/src/python/xalarm/xalarm_api.py b/src/python/xalarm/xalarm_api.py
index 94d7638..99eabf5 100644
--- a/src/python/xalarm/xalarm_api.py
+++ b/src/python/xalarm/xalarm_api.py
@@ -23,6 +23,7 @@ ALARM_LEVELS = (1, 2, 3, 4, 5)
ALARM_SOCK_PATH = "/var/run/xalarm/report"
MIN_ALARM_ID = 1001
MAX_ALARM_ID = 1128
+MAX_MSG_LEN = 1024
@dataclasses.dataclass
@@ -97,15 +98,15 @@ class Xalarm:
def msg1(self, msg):
"""msg1 setter
"""
- if len(msg) > 512:
- raise ValueError("msg1 length must below 255")
+ if len(msg) > MAX_MSG_LEN:
+ raise ValueError(f"msg1 length must below {MAX_MSG_LEN}")
self._msg1 = msg
def alarm_bin2stu(bin_data):
"""alarm binary to struct
"""
- struct_data = struct.unpack("@HBBll512s", bin_data)
+ struct_data = struct.unpack(f"@HBBll{MAX_MSG_LEN}s", bin_data)
alarm_info = Xalarm(1001, 2, 1, 0, 0, "")
alarm_info.alarm_id = struct_data[0]
@@ -116,3 +117,14 @@ def alarm_bin2stu(bin_data):
alarm_info.msg1 = struct_data[5]
return alarm_info
+
+
+def alarm_stu2bin(alarm_info: Xalarm):
+ return struct.pack(
+ f'@HBBll{MAX_MSG_LEN}s',
+ alarm_info.alarm_id,
+ alarm_info.alarm_level,
+ alarm_info.alarm_type,
+ alarm_info.timetamp.tv_sec,
+ alarm_info.timetamp.tv_usec,
+ alarm_info.msg1.encode('utf-8'))
diff --git a/src/python/xalarm/xalarm_server.py b/src/python/xalarm/xalarm_server.py
index 84db273..fcaf393 100644
--- a/src/python/xalarm/xalarm_server.py
+++ b/src/python/xalarm/xalarm_server.py
@@ -17,16 +17,20 @@ Create: 2023-11-02
import socket
import os
import logging
+import select
+import threading
from struct import error as StructParseError
from .xalarm_api import alarm_bin2stu
-from .xalarm_transfer import check_filter, transmit_alarm
+from .xalarm_transfer import check_filter, transmit_alarm, wait_for_connection
ALARM_DIR = "/var/run/xalarm"
+USER_RECV_SOCK = "/var/run/xalarm/alarm"
SOCK_FILE = "/var/run/xalarm/report"
-ALARM_REPORT_LEN = 536
+ALARM_REPORT_LEN = 1048
ALARM_DIR_PERMISSION = 0o750
+ALARM_LISTEN_QUEUE_LEN = 5
def clear_sock_path():
@@ -37,6 +41,8 @@ def clear_sock_path():
os.chmod(ALARM_DIR, ALARM_DIR_PERMISSION)
if os.path.exists(SOCK_FILE):
os.unlink(SOCK_FILE)
+ if os.path.exists(USER_RECV_SOCK):
+ os.unlink(USER_RECV_SOCK)
def server_loop(alarm_config):
@@ -49,6 +55,21 @@ def server_loop(alarm_config):
sock.bind(SOCK_FILE)
os.chmod(SOCK_FILE, 0o600)
+ alarm_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ alarm_sock.bind(USER_RECV_SOCK)
+ os.chmod(USER_RECV_SOCK, 0o600)
+ alarm_sock.listen(ALARM_LISTEN_QUEUE_LEN)
+ alarm_sock.setblocking(False)
+
+ epoll = select.epoll()
+ epoll.register(alarm_sock.fileno(), select.EPOLLIN)
+ fd_to_socket = {alarm_sock.fileno(): alarm_sock,}
+ thread_should_stop = False
+
+ thread = threading.Thread(target=wait_for_connection, args=(alarm_sock, epoll, fd_to_socket, thread_should_stop))
+ thread.daemon = True
+ thread.start()
+
while True:
try:
data, _ = sock.recvfrom(ALARM_REPORT_LEN)
@@ -58,14 +79,21 @@ def server_loop(alarm_config):
logging.debug("server receive report msg length wrong %d",
len(data))
continue
-
alarm_info = alarm_bin2stu(data)
logging.debug("server bin2stu msg")
if not check_filter(alarm_info, alarm_config):
continue
+ transmit_alarm(alarm_sock, epoll, fd_to_socket, data)
+ except Exception as e:
+ logging.error(f"Error server:{e}")
+
+ thread_should_stop = True
+ thread.join()
- transmit_alarm(data)
- except (ValueError, StructParseError):
- pass
+ epoll.unregister(alarm_sock.fileno())
+ epoll.close()
+ alarm_sock.close()
+ os.unlink(USER_RECV_SOCK)
sock.close()
+
diff --git a/src/python/xalarm/xalarm_transfer.py b/src/python/xalarm/xalarm_transfer.py
index b590b43..42137d8 100644
--- a/src/python/xalarm/xalarm_transfer.py
+++ b/src/python/xalarm/xalarm_transfer.py
@@ -16,10 +16,12 @@ Create: 2023-11-02
import socket
import logging
+import select
-USER_RECV_SOCK = "/var/run/xalarm/alarm"
MIN_ID_NUMBER = 1001
MAX_ID_NUMBER = 1128
+MAX_CONNECTION_NUM = 100
+TEST_CONNECT_BUFFER_SIZE = 32
def check_filter(alarm_info, alarm_filter):
@@ -35,16 +37,84 @@ def check_filter(alarm_info, alarm_filter):
return True
-def transmit_alarm(bin_data):
- """forward alarm message
+def cleanup_closed_connections(server_sock, epoll, fd_to_socket):
"""
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
- try:
- sock.sendto(bin_data, USER_RECV_SOCK)
- logging.debug("transfer alarm success")
- except ConnectionRefusedError:
- logging.debug("transfer sendto failed")
- except FileNotFoundError:
- logging.debug("transfer sendto failed")
- finally:
- sock.close()
+ clean invalid client socket connections saved in 'fd_to_socket'
+ :param server_sock: server socket instance of alarm
+ :param epoll: epoll instance, used to unregister invalid client connections
+ :param fd_to_socket: dict instance, used to hold client connections and server connections
+ """
+ to_remove = []
+ for fileno, connection in fd_to_socket.items():
+ if connection is server_sock:
+ continue
+ try:
+ # test whether connection still alive, use MSG_DONTWAIT to avoid blocking thread
+ # use MSG_PEEK to avoid consuming buffer data
+ data = connection.recv(TEST_CONNECT_BUFFER_SIZE, socket.MSG_DONTWAIT | socket.MSG_PEEK)
+ if not data:
+ to_remove.append(fileno)
+ except BlockingIOError:
+ pass
+ except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError):
+ to_remove.append(fileno)
+
+ for fileno in to_remove:
+ epoll.unregister(fileno)
+ fd_to_socket[fileno].close()
+ del fd_to_socket[fileno]
+ logging.info(f"cleaned up connection {fileno} for client lost connection.")
+
+
+def wait_for_connection(server_sock, epoll, fd_to_socket, thread_should_stop):
+ """
+ thread function for catch and save client connection
+ :param server_sock: server socket instance of alarm
+ :param epoll: epoll instance, used to unregister invalid client connections
+ :param fd_to_socket: dict instance, used to hold client connections and server connections
+ :param thread_should_stop: bool instance
+ """
+ while not thread_should_stop:
+ try:
+ events = epoll.poll(1)
+
+ for fileno, event in events:
+ if fileno == server_sock.fileno():
+ connection, client_address = server_sock.accept()
+ # if reach max connection, cleanup closed connections
+ if len(fd_to_socket) - 1 >= MAX_CONNECTION_NUM:
+ cleanup_closed_connections(server_sock, epoll, fd_to_socket)
+ # if connections still reach max num, close this connection automatically
+ if len(fd_to_socket) - 1 >= MAX_CONNECTION_NUM:
+ logging.info(f"connection reach max num of {MAX_CONNECTION_NUM}, closed current connection!")
+ connection.close()
+ continue
+ epoll.register(connection.fileno(), select.EPOLLOUT)
+ fd_to_socket[connection.fileno()] = connection
+ except socket.error as e:
+ logging.debug(f"socket error, reason is {e}")
+ break
+ except (KeyError, OSError, ValueError) as e:
+ logging.debug(f"wait for connection failed {e}")
+
+
+def transmit_alarm(server_sock, epoll, fd_to_socket, bin_data):
+ """
+ this function is to broadcast alarm data to client, if fail to send data, remove connections held by fd_to_socket
+ :param server_sock: server socket instance of alarm
+ :param epoll: epoll instance, used to unregister invalid client connections
+ :param fd_to_socket: dict instance, used to hold client connections and server connections
+ :param bin_data: binary instance, alarm info data in C-style struct format defined in xalarm_api.py
+ """
+ to_remove = []
+ for fileno, connection in fd_to_socket.items():
+ if connection is not server_sock:
+ try:
+ connection.sendall(bin_data)
+ except (BrokenPipeError, ConnectionResetError):
+ to_remove.append(fileno)
+ for fileno in to_remove:
+ epoll.unregister(fileno)
+ fd_to_socket[fileno].close()
+ del fd_to_socket[fileno]
+
--
2.27.0

View File

@ -4,7 +4,7 @@
Summary: System Inspection Framework
Name: sysSentry
Version: 1.0.2
Release: 19
Release: 20
License: Mulan PSL v2
Group: System Environment/Daemons
Source0: https://gitee.com/openeuler/sysSentry/releases/download/v%{version}/%{name}-%{version}.tar.gz
@ -31,6 +31,7 @@ Patch18: over-threshold-should-be-warn-level-log-in-cat-cli.patch
Patch19: fix-bug-step-2-about-collect-module-and-avg-block-io.patch
Patch20: add-log-level-and-change-log-format.patch
Patch21: fix-ai_block_io-some-issues.patch
Patch22: add-pyxalarm-and-pySentryNotify-add-multi-users-supp.patch
BuildRequires: cmake gcc-c++
BuildRequires: python3 python3-setuptools
@ -82,6 +83,20 @@ Requires: sysSentry = %{version}-%{release}
%description -n ai_block_io
This package provides Supports slow I/O detection based on AI
%package -n pyxalarm
Summary: Supports xalarm api in python immplementation
Requires: sysSentry = %{version}-%{release}
%description -n pyxalarm
This package provides Supports xalarm api for users
%package -n pysentry_notify
Summary: Supports xalarm report in python immplementation
Requires: sysSentry = %{version}-%{release}
%description -n pysentry_notify
This package provides Supports xalarm report for plugins
%prep
%autosetup -n %{name}-%{version} -p1
@ -146,6 +161,8 @@ install config/plugins/ai_block_io.ini %{buildroot}/etc/sysSentry/plugins/ai_blo
pushd src/python
python3 setup.py install -O1 --root=$RPM_BUILD_ROOT --record=SENTRY_FILES
cat SENTRY_FILES | grep -v register_xalarm.* | grep -v sentry_notify.* > SENTRY_FILES.tmp
mv SENTRY_FILES.tmp SENTRY_FILES
popd
%pre
@ -173,7 +190,7 @@ rm -rf %{buildroot}
%files -f src/python/SENTRY_FILES
%defattr(0550,root,root)
%attr(0550,root,root) %{python3_sitelib}/xalarm
%dir %attr(0550,root,root) %{python3_sitelib}/xalarm
%attr(0550,root,root) %{python3_sitelib}/syssentry
%attr(0550,root,root) %{python3_sitelib}/sentryCollector
%attr(0550,root,root) %{python3_sitelib}/sentryPlugins/avg_block_io
@ -228,6 +245,14 @@ rm -rf %{buildroot}
%attr(0550,root,root) %{_includedir}/xalarm
%attr(0550,root,root) %{_includedir}/xalarm/register_xalarm.h
%files -n pyxalarm
%attr(0550,root,root) %{python3_sitelib}/xalarm/register_xalarm.py
%attr(0550,root,root) %{python3_sitelib}/xalarm/__pycache__/register_xalarm*
%files -n pysentry_notify
%attr(0550,root,root) %{python3_sitelib}/xalarm/sentry_notify.py
%attr(0550,root,root) %{python3_sitelib}/xalarm/__pycache__/sentry_notify*
%files -n cpu_sentry
%attr(0500,root,root) %{_bindir}/cat-cli
%attr(0500,root,root) %{_bindir}/cpu_sentry
@ -249,6 +274,12 @@ rm -rf %{buildroot}
%attr(0550,root,root) %{python3_sitelib}/sentryPlugins/ai_block_io
%changelog
* Tue Oct 8 2024 caixiaomeng <caixiaomeng2@huawei.com> - 1.0.2-20
- Type:bugfix
- CVE:NA
- SUG:NA
- DESC:add pyxalarm and pySentryNotify, add multi users support for xalarmd
* Mon Sep 30 2024 heyouzhi <heyouzhi@huawei.com> - 1.0.2-19
- Type:bugfix
- CVE:NA
@ -364,3 +395,4 @@ rm -rf %{buildroot}
- CVE:NA
- SUG:NA
- DESC:Package init