From a18ea2e94fef78334a56dce1ea3f67ee649732f3 Mon Sep 17 00:00:00 2001 From: PshySimon Date: Thu, 26 Sep 2024 16:12:25 +0800 Subject: [PATCH] add pyxalarm and pySentryNotify, add multi users support for xalarmd and adapt libxalarm --- src/libso/xalarm/register_xalarm.c | 41 ++---- src/libso/xalarm/register_xalarm.h | 10 +- src/python/xalarm/register_xalarm.py | 192 +++++++++++++++++++++++++++ src/python/xalarm/sentry_notify.py | 71 ++++++++++ src/python/xalarm/xalarm_api.py | 18 ++- src/python/xalarm/xalarm_server.py | 40 +++++- src/python/xalarm/xalarm_transfer.py | 96 ++++++++++++-- 7 files changed, 408 insertions(+), 60 deletions(-) create mode 100644 src/python/xalarm/register_xalarm.py create mode 100644 src/python/xalarm/sentry_notify.py diff --git a/src/libso/xalarm/register_xalarm.c b/src/libso/xalarm/register_xalarm.c index 152c078..21a419f 100644 --- a/src/libso/xalarm/register_xalarm.c +++ b/src/libso/xalarm/register_xalarm.c @@ -35,7 +35,7 @@ #define ALARM_SOCKET_PERMISSION 0700 #define TIME_UNIT_MILLISECONDS 1000 -#define MAX_PARAS_LEN 511 +#define MAX_PARAS_LEN 1023 #define MIN_ALARM_ID 1001 #define MAX_ALARM_ID (MIN_ALARM_ID + MAX_NUM_OF_ALARM_ID - 1) @@ -91,7 +91,7 @@ static int create_unix_socket(const char *path) return -1; } - fd = socket(AF_UNIX, SOCK_DGRAM, 0); + fd = socket(AF_UNIX, SOCK_STREAM, 0); if (fd < 0) { printf("socket failed:%s\n", strerror(errno)); return -1; @@ -103,14 +103,6 @@ static int create_unix_socket(const char *path) goto release_socket; } - if (access(PATH_REG_ALARM, F_OK) == 0) { - ret = unlink(PATH_REG_ALARM); - if (ret != 0) { - printf("unlink register socket file failed\n"); - goto release_socket; - } - } - if (access(DIR_XALARM, F_OK) == -1) { if (mkdir(DIR_XALARM, ALARM_DIR_PERMISSION) == -1) { printf("mkdir %s failed\n", DIR_XALARM); @@ -120,32 +112,22 @@ static int create_unix_socket(const char *path) if (memset(&alarm_addr, 0, sizeof(alarm_addr)) == NULL) { printf("create_unix_socket: memset alarm_addr failed, ret: %d\n", ret); - goto remove_dir; + goto release_socket; } alarm_addr.sun_family = AF_UNIX; strncpy(alarm_addr.sun_path, path, sizeof(alarm_addr.sun_path) - 1); - if (bind(fd, (struct sockaddr *)&alarm_addr, sizeof(alarm_addr.sun_family) + strlen(alarm_addr.sun_path)) < 0) { - printf("bind socket failed:%s\n", strerror(errno)); - goto remove_dir; + if (connect(fd, (struct sockaddr*)&alarm_addr, sizeof(alarm_addr)) == -1) { + printf("create_unix_socket: connect alarm_addr failed, ret: %d\n", ret); + goto release_socket; } if (chmod(path, ALARM_SOCKET_PERMISSION) < 0) { printf("chmod %s failed: %s\n", path, strerror(errno)); - goto unlink_sockfile; + goto release_socket; } return fd; -unlink_sockfile: - ret = unlink(PATH_REG_ALARM); - if (ret != 0) { - printf("unlink register socket file failed\n"); - } -remove_dir: - ret = rmdir(DIR_XALARM); - if (ret != 0) { - printf("rmdir %s failed: %s\n", path, strerror(errno)); - } release_socket: (void)close(fd); @@ -271,8 +253,6 @@ int xalarm_Register(alarm_callback_func callback, struct alarm_subscription_info void xalarm_UnRegister(int client_id) { - int ret; - if (!g_register_info.is_registered) { printf("%s: alarm has not registered\n", __func__); return; @@ -292,10 +272,6 @@ void xalarm_UnRegister(int client_id) if (g_register_info.register_fd != -1) { (void)close(g_register_info.register_fd); g_register_info.register_fd = -1; - ret = unlink(PATH_REG_ALARM); - if (ret != 0) { - printf("%s: unlink register socket file failed\n", __func__); - } } memset(g_register_info.alarm_enable_bitmap, 0, MAX_NUM_OF_ALARM_ID * sizeof(char)); @@ -357,7 +333,7 @@ int xalarm_Report(unsigned short usAlarmId, unsigned char ucAlarmLevel, struct sockaddr_un alarm_addr; if ((usAlarmId < MIN_ALARM_ID || usAlarmId > MAX_ALARM_ID) || - (ucAlarmLevel < ALARM_LEVEL_FATAL || ucAlarmLevel > ALARM_LEVEL_DEBUG) || + (ucAlarmLevel < MINOR_ALM || ucAlarmLevel > CRITICAL_ALM) || (ucAlarmType < ALARM_TYPE_OCCUR || ucAlarmType > ALARM_TYPE_RECOVER)) { fprintf(stderr, "%s: alarm info invalid\n", __func__); return -1; @@ -666,3 +642,4 @@ int report_result(const char *task_name, enum RESULT_LEVEL result_level, const c return RETURE_CODE_SUCCESS; } + diff --git a/src/libso/xalarm/register_xalarm.h b/src/libso/xalarm/register_xalarm.h index 1f26c6a..fef9482 100644 --- a/src/libso/xalarm/register_xalarm.h +++ b/src/libso/xalarm/register_xalarm.h @@ -11,7 +11,7 @@ #include #include -#define ALARM_INFO_MAX_PARAS_LEN 512 +#define ALARM_INFO_MAX_PARAS_LEN 1024 #define MAX_STRERROR_SIZE 1024 #define MAX_ALARM_TYEPS 1024 #define MIN_ALARM_ID 1001 @@ -19,11 +19,9 @@ #define MEMORY_ALARM_ID 1001 -#define ALARM_LEVEL_FATAL 1 -#define ALARM_LEVEL_ERROR 2 -#define ALARM_LEVEL_WARNING 3 -#define ALARM_LEVEL_INFO 4 -#define ALARM_LEVEL_DEBUG 5 +#define MINOR_ALM 1 +#define MAJOR_ALM 2 +#define CRITICAL_ALM 3 #define ALARM_TYPE_OCCUR 1 #define ALARM_TYPE_RECOVER 2 diff --git a/src/python/xalarm/register_xalarm.py b/src/python/xalarm/register_xalarm.py new file mode 100644 index 0000000..e58343d --- /dev/null +++ b/src/python/xalarm/register_xalarm.py @@ -0,0 +1,192 @@ +import os +import sys +import socket +import logging +import threading +import time +import fcntl +import inspect +from struct import error as StructParseError + +from .xalarm_api import Xalarm, alarm_bin2stu + + +ALARM_REPORT_LEN = 1048 +MAX_NUM_OF_ALARM_ID=128 +MIN_ALARM_ID = 1001 +MAX_ALARM_ID = (MIN_ALARM_ID + MAX_NUM_OF_ALARM_ID - 1) +DIR_XALARM = "/var/run/xalarm" +PATH_REG_ALARM = "/var/run/xalarm/alarm" +PATH_REPORT_ALARM = "/var/run/xalarm/report" +ALARM_DIR_PERMISSION = 0o0750 +ALARM_REG_SOCK_PERMISSION = 0o0700 +ALARM_SOCKET_PERMISSION = 0o0700 +TIME_UNIT_MILLISECONDS = 1000 +ALARM_REGISTER_INFO = None + + +class AlarmRegister: + def __init__(self, id_filter: list[bool], callback: callable): + self.id_filter = id_filter + self.callback = callback + self.socket = self.create_unix_socket() + self.is_registered = True + self.thread = threading.Thread(target=self.alarm_recv) + self.thread_should_stop = False + + def check_params(self) -> bool: + if (len(self.id_filter) != MAX_NUM_OF_ALARM_ID): + sys.stderr.write("check_params: invalid param id_filter\n") + return False + + sig = inspect.signature(self.callback) + if len(sig.parameters) != 1: + sys.stderr.write("check_params: invalid param callback\n") + return False + + if self.socket is None: + sys.stderr.write("check_params: scoket create failed\n") + return False + return True + + def set_id_filter(self, id_filter: list[bool]) -> bool: + if (len(id_filter) > MAX_NUM_OF_ALARM_ID): + sys.stderr.write("set_id_filter: invalid param id_filter\n") + return False + self.id_filter = id_filter + + def id_is_registered(self, alarm_id) -> bool: + if alarm_id < MIN_ALARM_ID or alarm_id > MAX_ALARM_ID: + return False + return self.id_filter[alarm_id - MIN_ALARM_ID] + + def put_alarm_info(self, alarm_info: Xalarm) -> None: + if not self.callback or not alarm_info: + return + if not self.id_is_registered(alarm_info.alarm_id): + return + self.callback(alarm_info) + + def create_unix_socket(self) -> socket.socket: + try: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.setblocking(False) + + if not os.access(DIR_XALARM, os.F_OK): + os.makedirs(DIR_XALARM) + os.chmod(DIR_XALARM, ALARM_DIR_PERMISSION) + + sock.connect(PATH_REG_ALARM) + return sock + except (IOError, OSError, FileNotFoundError) as e: + sock.close() + sys.stderr.write(f"create_unix_socket: create socket error:{e}\n") + return None + + def alarm_recv(self): + while not self.thread_should_stop: + try: + data = self.socket.recv(ALARM_REPORT_LEN) + if not data: + sys.stderr.write("connection closed by xalarmd, maybe connections reach max num or service stopped.\n") + self.thread_should_stop = True + break + if len(data) != ALARM_REPORT_LEN: + sys.stderr.write(f"server receive report msg length wrong {len(data)}\n") + continue + + alarm_info = alarm_bin2stu(data) + self.put_alarm_info(alarm_info) + except (BlockingIOError) as e: + time.sleep(0.1) + except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError): + sys.stderr.write("Connection closed by the server.\n") + self.thread_should_stop = True + except (ValueError, StructParseError, InterruptedError) as e: + sys.stderr.write(f"{e}\n") + except Exception as e: + sys.stderr.write(f"{e}\n") + self.thread_should_stop = True + + def start_thread(self) -> None: + self.thread.daemon = True + self.thread.start() + + def stop_thread(self) -> None: + self.thread_should_stop = True + self.thread.join() + self.socket.close() + + +def xalarm_register(callback: callable, id_filter: list[bool]) -> int: + global ALARM_REGISTER_INFO + + if ALARM_REGISTER_INFO is not None: + sys.stderr.write("xalarm_register: alarm has registered\n") + return -1 + + ALARM_REGISTER_INFO = AlarmRegister(id_filter, callback) + if not ALARM_REGISTER_INFO.check_params(): + return -1 + + ALARM_REGISTER_INFO.start_thread() + + return 0 + + +def xalarm_unregister(clientId: int) -> None: + global ALARM_REGISTER_INFO + if clientId < 0: + sys.stderr.write("xalarm_unregister: invalid client\n") + return + + if ALARM_REGISTER_INFO is None: + sys.stderr.write("xalarm_unregister: alarm has not registered\n") + return + + ALARM_REGISTER_INFO.stop_thread() + ALARM_REGISTER_INFO = None + + +def xalarm_upgrade(clientId: int, id_filter: list[bool]) -> None: + global ALARM_REGISTER_INFO + if clientId < 0: + sys.stderr.write("xalarm_unregister: invalid client\n") + return + if ALARM_REGISTER_INFO is None: + sys.stderr.write("xalarm_unregister: alarm has not registered\n") + return + ALARM_REGISTER_INFO.id_filter = id_filter + + +def xalarm_getid(alarm_info: Xalarm) -> int: + if not alarm_info: + return 0 + return alarm_info.alarm_id + + +def xalarm_getlevel(alarm_info: Xalarm) -> int: + if not alarm_info: + return 0 + return alarm_info.alarm_level + + +def xalarm_gettype(alarm_info: Xalarm) -> int: + if not alarm_info: + return 0 + return alarm_info.alarm_type + + +def xalarm_gettime(alarm_info: Xalarm) -> int: + if not alarm_info: + return 0 + return alarm_info.timetamp.tv_sec * TIME_UNIT_MILLISECONDS + alarm_info.timetamp.tv_usec / TIME_UNIT_MILLISECONDS + +def xalarm_getdesc(alarm_info: Xalarm) -> str: + if not alarm_info: + return None + try: + desc_str = alarm_info.msg1.rstrip(b'\x00').decode('utf-8') + except UnicodeError: + desc_str = None + return desc_str diff --git a/src/python/xalarm/sentry_notify.py b/src/python/xalarm/sentry_notify.py new file mode 100644 index 0000000..a19e5b3 --- /dev/null +++ b/src/python/xalarm/sentry_notify.py @@ -0,0 +1,71 @@ +import os +import sys +import time +import socket +from struct import error as StructParseError + +from .xalarm_api import alarm_stu2bin, Xalarm + +MAX_NUM_OF_ALARM_ID = 128 +MIN_ALARM_ID = 1001 +MAX_ALARM_ID = (MIN_ALARM_ID + MAX_NUM_OF_ALARM_ID - 1) + +MINOR_ALM = 1 +MAJOR_ALM = 2 +CRITICAL_ALM = 3 + +ALARM_TYPE_OCCUR = 1 +ALARM_TYPE_RECOVER = 2 + +MAX_PUC_PARAS_LEN = 1024 + +DIR_XALARM = "/var/run/xalarm" +PATH_REPORT_ALARM = "/var/run/xalarm/report" +ALARM_DIR_PERMISSION = 0o750 +ALARM_SOCKET_PERMISSION = 0o700 + + +def check_params(alarm_id, alarm_level, alarm_type, puc_paras) -> bool: + if not os.path.exists(DIR_XALARM): + sys.stderr.write(f"check_params: {DIR_XALARM} not exist, failed") + return False + + if not os.path.exists(PATH_REPORT_ALARM): + sys.stderr.write(f"check_params: {PATH_REPORT_ALARM} not exist, failed") + return False + + if (alarm_id < MIN_ALARM_ID or alarm_id > MAX_ALARM_ID or + alarm_level < MINOR_ALM or alarm_level > CRITICAL_ALM or + alarm_type < ALARM_TYPE_OCCUR or alarm_type > ALARM_TYPE_RECOVER): + sys.stderr.write("check_params: alarm info invalid\n") + return False + + if len(puc_paras) >= MAX_PUC_PARAS_LEN: + sys.stderr.write(f"check_params: alarm msg should be less than {MAX_PUC_PARAS_LEN}\n") + return False + + return True + +def xalarm_report(alarm_id, alarm_level, alarm_type, puc_paras) -> bool: + if not check_params(alarm_id, alarm_level, alarm_type, puc_paras): + return False + + try: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + + current_time = time.time() + current_time_seconds = int(current_time) + current_microseconds = int((current_time - current_time_seconds) * 1_000_000) + alarm_info = Xalarm(alarm_id, alarm_type, alarm_level, + current_time_seconds, current_microseconds, puc_paras) + + sock.sendto(alarm_stu2bin(alarm_info), PATH_REPORT_ALARM) + except (FileNotFoundError, StructParseError, socket.error, OSError, UnicodeError) as e: + sys.stderr.write(f"check_params: error occurs when sending msg.{e}\n") + return False + finally: + sock.close() + + return True + + diff --git a/src/python/xalarm/xalarm_api.py b/src/python/xalarm/xalarm_api.py index 94d7638..99eabf5 100644 --- a/src/python/xalarm/xalarm_api.py +++ b/src/python/xalarm/xalarm_api.py @@ -23,6 +23,7 @@ ALARM_LEVELS = (1, 2, 3, 4, 5) ALARM_SOCK_PATH = "/var/run/xalarm/report" MIN_ALARM_ID = 1001 MAX_ALARM_ID = 1128 +MAX_MSG_LEN = 1024 @dataclasses.dataclass @@ -97,15 +98,15 @@ class Xalarm: def msg1(self, msg): """msg1 setter """ - if len(msg) > 512: - raise ValueError("msg1 length must below 255") + if len(msg) > MAX_MSG_LEN: + raise ValueError(f"msg1 length must below {MAX_MSG_LEN}") self._msg1 = msg def alarm_bin2stu(bin_data): """alarm binary to struct """ - struct_data = struct.unpack("@HBBll512s", bin_data) + struct_data = struct.unpack(f"@HBBll{MAX_MSG_LEN}s", bin_data) alarm_info = Xalarm(1001, 2, 1, 0, 0, "") alarm_info.alarm_id = struct_data[0] @@ -116,3 +117,14 @@ def alarm_bin2stu(bin_data): alarm_info.msg1 = struct_data[5] return alarm_info + + +def alarm_stu2bin(alarm_info: Xalarm): + return struct.pack( + f'@HBBll{MAX_MSG_LEN}s', + alarm_info.alarm_id, + alarm_info.alarm_level, + alarm_info.alarm_type, + alarm_info.timetamp.tv_sec, + alarm_info.timetamp.tv_usec, + alarm_info.msg1.encode('utf-8')) diff --git a/src/python/xalarm/xalarm_server.py b/src/python/xalarm/xalarm_server.py index 84db273..fcaf393 100644 --- a/src/python/xalarm/xalarm_server.py +++ b/src/python/xalarm/xalarm_server.py @@ -17,16 +17,20 @@ Create: 2023-11-02 import socket import os import logging +import select +import threading from struct import error as StructParseError from .xalarm_api import alarm_bin2stu -from .xalarm_transfer import check_filter, transmit_alarm +from .xalarm_transfer import check_filter, transmit_alarm, wait_for_connection ALARM_DIR = "/var/run/xalarm" +USER_RECV_SOCK = "/var/run/xalarm/alarm" SOCK_FILE = "/var/run/xalarm/report" -ALARM_REPORT_LEN = 536 +ALARM_REPORT_LEN = 1048 ALARM_DIR_PERMISSION = 0o750 +ALARM_LISTEN_QUEUE_LEN = 5 def clear_sock_path(): @@ -37,6 +41,8 @@ def clear_sock_path(): os.chmod(ALARM_DIR, ALARM_DIR_PERMISSION) if os.path.exists(SOCK_FILE): os.unlink(SOCK_FILE) + if os.path.exists(USER_RECV_SOCK): + os.unlink(USER_RECV_SOCK) def server_loop(alarm_config): @@ -49,6 +55,21 @@ def server_loop(alarm_config): sock.bind(SOCK_FILE) os.chmod(SOCK_FILE, 0o600) + alarm_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + alarm_sock.bind(USER_RECV_SOCK) + os.chmod(USER_RECV_SOCK, 0o600) + alarm_sock.listen(ALARM_LISTEN_QUEUE_LEN) + alarm_sock.setblocking(False) + + epoll = select.epoll() + epoll.register(alarm_sock.fileno(), select.EPOLLIN) + fd_to_socket = {alarm_sock.fileno(): alarm_sock,} + thread_should_stop = False + + thread = threading.Thread(target=wait_for_connection, args=(alarm_sock, epoll, fd_to_socket, thread_should_stop)) + thread.daemon = True + thread.start() + while True: try: data, _ = sock.recvfrom(ALARM_REPORT_LEN) @@ -58,14 +79,21 @@ def server_loop(alarm_config): logging.debug("server receive report msg length wrong %d", len(data)) continue - alarm_info = alarm_bin2stu(data) logging.debug("server bin2stu msg") if not check_filter(alarm_info, alarm_config): continue + transmit_alarm(alarm_sock, epoll, fd_to_socket, data) + except Exception as e: + logging.error(f"Error server:{e}") + + thread_should_stop = True + thread.join() - transmit_alarm(data) - except (ValueError, StructParseError): - pass + epoll.unregister(alarm_sock.fileno()) + epoll.close() + alarm_sock.close() + os.unlink(USER_RECV_SOCK) sock.close() + diff --git a/src/python/xalarm/xalarm_transfer.py b/src/python/xalarm/xalarm_transfer.py index b590b43..42137d8 100644 --- a/src/python/xalarm/xalarm_transfer.py +++ b/src/python/xalarm/xalarm_transfer.py @@ -16,10 +16,12 @@ Create: 2023-11-02 import socket import logging +import select -USER_RECV_SOCK = "/var/run/xalarm/alarm" MIN_ID_NUMBER = 1001 MAX_ID_NUMBER = 1128 +MAX_CONNECTION_NUM = 100 +TEST_CONNECT_BUFFER_SIZE = 32 def check_filter(alarm_info, alarm_filter): @@ -35,16 +37,84 @@ def check_filter(alarm_info, alarm_filter): return True -def transmit_alarm(bin_data): - """forward alarm message +def cleanup_closed_connections(server_sock, epoll, fd_to_socket): """ - sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - try: - sock.sendto(bin_data, USER_RECV_SOCK) - logging.debug("transfer alarm success") - except ConnectionRefusedError: - logging.debug("transfer sendto failed") - except FileNotFoundError: - logging.debug("transfer sendto failed") - finally: - sock.close() + clean invalid client socket connections saved in 'fd_to_socket' + :param server_sock: server socket instance of alarm + :param epoll: epoll instance, used to unregister invalid client connections + :param fd_to_socket: dict instance, used to hold client connections and server connections + """ + to_remove = [] + for fileno, connection in fd_to_socket.items(): + if connection is server_sock: + continue + try: + # test whether connection still alive, use MSG_DONTWAIT to avoid blocking thread + # use MSG_PEEK to avoid consuming buffer data + data = connection.recv(TEST_CONNECT_BUFFER_SIZE, socket.MSG_DONTWAIT | socket.MSG_PEEK) + if not data: + to_remove.append(fileno) + except BlockingIOError: + pass + except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError): + to_remove.append(fileno) + + for fileno in to_remove: + epoll.unregister(fileno) + fd_to_socket[fileno].close() + del fd_to_socket[fileno] + logging.info(f"cleaned up connection {fileno} for client lost connection.") + + +def wait_for_connection(server_sock, epoll, fd_to_socket, thread_should_stop): + """ + thread function for catch and save client connection + :param server_sock: server socket instance of alarm + :param epoll: epoll instance, used to unregister invalid client connections + :param fd_to_socket: dict instance, used to hold client connections and server connections + :param thread_should_stop: bool instance + """ + while not thread_should_stop: + try: + events = epoll.poll(1) + + for fileno, event in events: + if fileno == server_sock.fileno(): + connection, client_address = server_sock.accept() + # if reach max connection, cleanup closed connections + if len(fd_to_socket) - 1 >= MAX_CONNECTION_NUM: + cleanup_closed_connections(server_sock, epoll, fd_to_socket) + # if connections still reach max num, close this connection automatically + if len(fd_to_socket) - 1 >= MAX_CONNECTION_NUM: + logging.info(f"connection reach max num of {MAX_CONNECTION_NUM}, closed current connection!") + connection.close() + continue + epoll.register(connection.fileno(), select.EPOLLOUT) + fd_to_socket[connection.fileno()] = connection + except socket.error as e: + logging.debug(f"socket error, reason is {e}") + break + except (KeyError, OSError, ValueError) as e: + logging.debug(f"wait for connection failed {e}") + + +def transmit_alarm(server_sock, epoll, fd_to_socket, bin_data): + """ + this function is to broadcast alarm data to client, if fail to send data, remove connections held by fd_to_socket + :param server_sock: server socket instance of alarm + :param epoll: epoll instance, used to unregister invalid client connections + :param fd_to_socket: dict instance, used to hold client connections and server connections + :param bin_data: binary instance, alarm info data in C-style struct format defined in xalarm_api.py + """ + to_remove = [] + for fileno, connection in fd_to_socket.items(): + if connection is not server_sock: + try: + connection.sendall(bin_data) + except (BrokenPipeError, ConnectionResetError): + to_remove.append(fileno) + for fileno in to_remove: + epoll.unregister(fileno) + fd_to_socket[fileno].close() + del fd_to_socket[fileno] + -- 2.27.0