From fe19d0aadb694a61b6ca3552d79026b18e88d7e9 Mon Sep 17 00:00:00 2001 From: PshySimon Date: Fri, 14 Feb 2025 11:57:58 +0800 Subject: [PATCH] add log for xalarmd and fix delete on iter problem --- src/services/xalarm/xalarm_api.py | 35 +++++++++++++ src/services/xalarm/xalarm_server.py | 8 +-- src/services/xalarm/xalarm_transfer.py | 69 +++++++++++++++----------- 3 files changed, 81 insertions(+), 31 deletions(-) diff --git a/src/services/xalarm/xalarm_api.py b/src/services/xalarm/xalarm_api.py index 863bd02..285608a 100644 --- a/src/services/xalarm/xalarm_api.py +++ b/src/services/xalarm/xalarm_api.py @@ -16,6 +16,7 @@ Create: 2023-11-02 """ import dataclasses import struct +from datetime import datetime ALARM_TYPES = (0, 1, 2) @@ -24,6 +25,17 @@ ALARM_SOCK_PATH = "/var/run/xalarm/report" MIN_ALARM_ID = 1001 MAX_ALARM_ID = 1128 MAX_MSG_LEN = 8192 +TIME_UNIT_MILLISECONDS = 1000000 +ALARM_LEVEL_DICT = { + 1: "MINOR_ALM", + 2: "MAJOR_ALM", + 3: "CRITICAL_ALM" +} + +ALARM_TYPE_DICT = { + 1: "ALARM_TYPE_OCCUR", + 2: "ALARM_TYPE_RECOVER" +} @dataclasses.dataclass @@ -132,3 +144,26 @@ def alarm_stu2bin(alarm_info: Xalarm): alarm_info.timetamp.tv_sec, alarm_info.timetamp.tv_usec, alarm_msg.encode('utf-8')) + + +def alarm_stu2str(alarm_info: Xalarm): + if not alarm_info: + return "" + + alarm_id = alarm_info.alarm_id + alarm_level = ALARM_LEVEL_DICT[alarm_info.alarm_level] if alarm_info.alarm_level in ALARM_LEVEL_DICT else "UNKNOWN" + alarm_type = ALARM_TYPE_DICT[alarm_info.alarm_type] if alarm_info.alarm_type in ALARM_TYPE_DICT else "UNKNOWN" + alarm_time = alarm_info.timetamp.tv_sec + alarm_info.timetamp.tv_usec / TIME_UNIT_MILLISECONDS + try: + alarm_msg = alarm_info.msg1.rstrip(b'\x00').decode('utf-8') + except (AttributeError, UnicodeDecodeError, TypeError): + alarm_msg = "" + + try: + time_stamp = datetime.fromtimestamp(alarm_time).strftime('%Y-%m-%d %H:%M:%S') + except (OSError, ValueError): + time_stamp = "UNKNOWN_TIME" + + return (f"alarm_id: {alarm_id}, alarm_level: {alarm_level}, alarm_type: {alarm_type}, " + f"alarm_time: {time_stamp}, alarm_msg_len: {len(alarm_msg)}") + diff --git a/src/services/xalarm/xalarm_server.py b/src/services/xalarm/xalarm_server.py index c6da5d2..ade5eb1 100644 --- a/src/services/xalarm/xalarm_server.py +++ b/src/services/xalarm/xalarm_server.py @@ -20,7 +20,7 @@ import logging import select import threading -from .xalarm_api import alarm_bin2stu +from .xalarm_api import alarm_bin2stu, alarm_stu2str from .xalarm_transfer import ( check_filter, transmit_alarm, @@ -88,10 +88,11 @@ def server_loop(alarm_config): len(data)) continue alarm_info = alarm_bin2stu(data) - logging.debug("server bin2stu msg") + alarm_str = alarm_stu2str(alarm_info) + logging.info("server recieve report msg, %s", alarm_str) if not check_filter(alarm_info, alarm_config): continue - transmit_alarm(alarm_sock, epoll, fd_to_socket, data) + transmit_alarm(alarm_sock, epoll, fd_to_socket, data, alarm_str) except Exception as e: logging.error(f"Error server:{e}") @@ -106,3 +107,4 @@ def server_loop(alarm_config): sock.close() + diff --git a/src/services/xalarm/xalarm_transfer.py b/src/services/xalarm/xalarm_transfer.py index 0479359..2ed5da4 100644 --- a/src/services/xalarm/xalarm_transfer.py +++ b/src/services/xalarm/xalarm_transfer.py @@ -16,6 +16,7 @@ Create: 2023-11-02 import socket import logging +import threading from time import sleep MIN_ID_NUMBER = 1001 @@ -23,6 +24,7 @@ MAX_ID_NUMBER = 1128 MAX_CONNECTION_NUM = 100 TEST_CONNECT_BUFFER_SIZE = 32 PEROID_SCANN_TIME = 60 +LOCK = threading.Lock() def check_filter(alarm_info, alarm_filter): @@ -46,24 +48,25 @@ def cleanup_closed_connections(server_sock, epoll, fd_to_socket): :param fd_to_socket: dict instance, used to hold client connections and server connections """ to_remove = [] - for fileno, connection in fd_to_socket.items(): - if connection is server_sock: - continue - try: - # test whether connection still alive, use MSG_DONTWAIT to avoid blocking thread - # use MSG_PEEK to avoid consuming buffer data - data = connection.recv(TEST_CONNECT_BUFFER_SIZE, socket.MSG_DONTWAIT | socket.MSG_PEEK) - if not data: + with LOCK: + for fileno, connection in fd_to_socket.items(): + if connection is server_sock: + continue + try: + # test whether connection still alive, use MSG_DONTWAIT to avoid blocking thread + # use MSG_PEEK to avoid consuming buffer data + data = connection.recv(TEST_CONNECT_BUFFER_SIZE, socket.MSG_DONTWAIT | socket.MSG_PEEK) + if not data: + to_remove.append(fileno) + except BlockingIOError: + pass + except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError): to_remove.append(fileno) - except BlockingIOError: - pass - except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError): - to_remove.append(fileno) - - for fileno in to_remove: - fd_to_socket[fileno].close() - del fd_to_socket[fileno] - logging.info(f"cleaned up connection {fileno} for client lost connection.") + + for fileno in to_remove: + fd_to_socket[fileno].close() + del fd_to_socket[fileno] + logging.info(f"cleaned up connection {fileno} for client lost connection.") def peroid_task_to_cleanup_connections(server_sock, epoll, fd_to_socket, thread_should_stop): @@ -96,6 +99,7 @@ def wait_for_connection(server_sock, epoll, fd_to_socket, thread_should_stop): connection.close() continue fd_to_socket[connection.fileno()] = connection + logging.info("connection %d registered event.") except socket.error as e: logging.debug(f"socket error, reason is {e}") break @@ -103,7 +107,7 @@ def wait_for_connection(server_sock, epoll, fd_to_socket, thread_should_stop): logging.debug(f"wait for connection failed {e}") -def transmit_alarm(server_sock, epoll, fd_to_socket, bin_data): +def transmit_alarm(server_sock, epoll, fd_to_socket, bin_data, alarm_str): """ this function is to broadcast alarm data to client, if fail to send data, remove connections held by fd_to_socket :param server_sock: server socket instance of alarm @@ -112,13 +116,22 @@ def transmit_alarm(server_sock, epoll, fd_to_socket, bin_data): :param bin_data: binary instance, alarm info data in C-style struct format defined in xalarm_api.py """ to_remove = [] - for fileno, connection in fd_to_socket.items(): - if connection is not server_sock: - try: - connection.sendall(bin_data) - except (BrokenPipeError, ConnectionResetError): - to_remove.append(fileno) - for fileno in to_remove: - fd_to_socket[fileno].close() - del fd_to_socket[fileno] - logging.info(f"cleaned up connection {fileno} for client lost connection.") + + with LOCK: + for fileno, connection in fd_to_socket.items(): + if connection is not server_sock: + try: + connection.sendall(bin_data) + logging.info("Broadcast msg success, alarm msg is %s", alarm_str) + except (BrokenPipeError, ConnectionResetError): + to_remove.append(fileno) + except Exception as e: + logging.info("Sending msg failed, fd is %d, alarm msg is %s, reason is: %s", + fileno, alarm_str, str(e)) + + + for fileno in to_remove: + fd_to_socket[fileno].close() + del fd_to_socket[fileno] + logging.info(f"cleaned up connection {fileno} for client lost connection.") + -- 2.27.0