679 lines
23 KiB
Diff
679 lines
23 KiB
Diff
From a18ea2e94fef78334a56dce1ea3f67ee649732f3 Mon Sep 17 00:00:00 2001
|
|
From: PshySimon <caixiaomeng2@huawei.com>
|
|
Date: Thu, 26 Sep 2024 16:12:25 +0800
|
|
Subject: [PATCH] add pyxalarm and pySentryNotify, add multi users support for
|
|
xalarmd and adapt libxalarm
|
|
|
|
---
|
|
src/libso/xalarm/register_xalarm.c | 41 ++----
|
|
src/libso/xalarm/register_xalarm.h | 10 +-
|
|
src/python/xalarm/register_xalarm.py | 192 +++++++++++++++++++++++++++
|
|
src/python/xalarm/sentry_notify.py | 71 ++++++++++
|
|
src/python/xalarm/xalarm_api.py | 18 ++-
|
|
src/python/xalarm/xalarm_server.py | 40 +++++-
|
|
src/python/xalarm/xalarm_transfer.py | 96 ++++++++++++--
|
|
7 files changed, 408 insertions(+), 60 deletions(-)
|
|
create mode 100644 src/python/xalarm/register_xalarm.py
|
|
create mode 100644 src/python/xalarm/sentry_notify.py
|
|
|
|
diff --git a/src/libso/xalarm/register_xalarm.c b/src/libso/xalarm/register_xalarm.c
|
|
index 152c078..21a419f 100644
|
|
--- a/src/libso/xalarm/register_xalarm.c
|
|
+++ b/src/libso/xalarm/register_xalarm.c
|
|
@@ -35,7 +35,7 @@
|
|
#define ALARM_SOCKET_PERMISSION 0700
|
|
#define TIME_UNIT_MILLISECONDS 1000
|
|
|
|
-#define MAX_PARAS_LEN 511
|
|
+#define MAX_PARAS_LEN 1023
|
|
#define MIN_ALARM_ID 1001
|
|
#define MAX_ALARM_ID (MIN_ALARM_ID + MAX_NUM_OF_ALARM_ID - 1)
|
|
|
|
@@ -91,7 +91,7 @@ static int create_unix_socket(const char *path)
|
|
return -1;
|
|
}
|
|
|
|
- fd = socket(AF_UNIX, SOCK_DGRAM, 0);
|
|
+ fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
|
if (fd < 0) {
|
|
printf("socket failed:%s\n", strerror(errno));
|
|
return -1;
|
|
@@ -103,14 +103,6 @@ static int create_unix_socket(const char *path)
|
|
goto release_socket;
|
|
}
|
|
|
|
- if (access(PATH_REG_ALARM, F_OK) == 0) {
|
|
- ret = unlink(PATH_REG_ALARM);
|
|
- if (ret != 0) {
|
|
- printf("unlink register socket file failed\n");
|
|
- goto release_socket;
|
|
- }
|
|
- }
|
|
-
|
|
if (access(DIR_XALARM, F_OK) == -1) {
|
|
if (mkdir(DIR_XALARM, ALARM_DIR_PERMISSION) == -1) {
|
|
printf("mkdir %s failed\n", DIR_XALARM);
|
|
@@ -120,32 +112,22 @@ static int create_unix_socket(const char *path)
|
|
|
|
if (memset(&alarm_addr, 0, sizeof(alarm_addr)) == NULL) {
|
|
printf("create_unix_socket: memset alarm_addr failed, ret: %d\n", ret);
|
|
- goto remove_dir;
|
|
+ goto release_socket;
|
|
}
|
|
alarm_addr.sun_family = AF_UNIX;
|
|
strncpy(alarm_addr.sun_path, path, sizeof(alarm_addr.sun_path) - 1);
|
|
|
|
- if (bind(fd, (struct sockaddr *)&alarm_addr, sizeof(alarm_addr.sun_family) + strlen(alarm_addr.sun_path)) < 0) {
|
|
- printf("bind socket failed:%s\n", strerror(errno));
|
|
- goto remove_dir;
|
|
+ if (connect(fd, (struct sockaddr*)&alarm_addr, sizeof(alarm_addr)) == -1) {
|
|
+ printf("create_unix_socket: connect alarm_addr failed, ret: %d\n", ret);
|
|
+ goto release_socket;
|
|
}
|
|
if (chmod(path, ALARM_SOCKET_PERMISSION) < 0) {
|
|
printf("chmod %s failed: %s\n", path, strerror(errno));
|
|
- goto unlink_sockfile;
|
|
+ goto release_socket;
|
|
}
|
|
|
|
return fd;
|
|
|
|
-unlink_sockfile:
|
|
- ret = unlink(PATH_REG_ALARM);
|
|
- if (ret != 0) {
|
|
- printf("unlink register socket file failed\n");
|
|
- }
|
|
-remove_dir:
|
|
- ret = rmdir(DIR_XALARM);
|
|
- if (ret != 0) {
|
|
- printf("rmdir %s failed: %s\n", path, strerror(errno));
|
|
- }
|
|
release_socket:
|
|
(void)close(fd);
|
|
|
|
@@ -271,8 +253,6 @@ int xalarm_Register(alarm_callback_func callback, struct alarm_subscription_info
|
|
|
|
void xalarm_UnRegister(int client_id)
|
|
{
|
|
- int ret;
|
|
-
|
|
if (!g_register_info.is_registered) {
|
|
printf("%s: alarm has not registered\n", __func__);
|
|
return;
|
|
@@ -292,10 +272,6 @@ void xalarm_UnRegister(int client_id)
|
|
if (g_register_info.register_fd != -1) {
|
|
(void)close(g_register_info.register_fd);
|
|
g_register_info.register_fd = -1;
|
|
- ret = unlink(PATH_REG_ALARM);
|
|
- if (ret != 0) {
|
|
- printf("%s: unlink register socket file failed\n", __func__);
|
|
- }
|
|
}
|
|
|
|
memset(g_register_info.alarm_enable_bitmap, 0, MAX_NUM_OF_ALARM_ID * sizeof(char));
|
|
@@ -357,7 +333,7 @@ int xalarm_Report(unsigned short usAlarmId, unsigned char ucAlarmLevel,
|
|
struct sockaddr_un alarm_addr;
|
|
|
|
if ((usAlarmId < MIN_ALARM_ID || usAlarmId > MAX_ALARM_ID) ||
|
|
- (ucAlarmLevel < ALARM_LEVEL_FATAL || ucAlarmLevel > ALARM_LEVEL_DEBUG) ||
|
|
+ (ucAlarmLevel < MINOR_ALM || ucAlarmLevel > CRITICAL_ALM) ||
|
|
(ucAlarmType < ALARM_TYPE_OCCUR || ucAlarmType > ALARM_TYPE_RECOVER)) {
|
|
fprintf(stderr, "%s: alarm info invalid\n", __func__);
|
|
return -1;
|
|
@@ -666,3 +642,4 @@ int report_result(const char *task_name, enum RESULT_LEVEL result_level, const c
|
|
return RETURE_CODE_SUCCESS;
|
|
}
|
|
|
|
+
|
|
diff --git a/src/libso/xalarm/register_xalarm.h b/src/libso/xalarm/register_xalarm.h
|
|
index 1f26c6a..fef9482 100644
|
|
--- a/src/libso/xalarm/register_xalarm.h
|
|
+++ b/src/libso/xalarm/register_xalarm.h
|
|
@@ -11,7 +11,7 @@
|
|
#include <sys/time.h>
|
|
#include <stdbool.h>
|
|
|
|
-#define ALARM_INFO_MAX_PARAS_LEN 512
|
|
+#define ALARM_INFO_MAX_PARAS_LEN 1024
|
|
#define MAX_STRERROR_SIZE 1024
|
|
#define MAX_ALARM_TYEPS 1024
|
|
#define MIN_ALARM_ID 1001
|
|
@@ -19,11 +19,9 @@
|
|
|
|
#define MEMORY_ALARM_ID 1001
|
|
|
|
-#define ALARM_LEVEL_FATAL 1
|
|
-#define ALARM_LEVEL_ERROR 2
|
|
-#define ALARM_LEVEL_WARNING 3
|
|
-#define ALARM_LEVEL_INFO 4
|
|
-#define ALARM_LEVEL_DEBUG 5
|
|
+#define MINOR_ALM 1
|
|
+#define MAJOR_ALM 2
|
|
+#define CRITICAL_ALM 3
|
|
|
|
#define ALARM_TYPE_OCCUR 1
|
|
#define ALARM_TYPE_RECOVER 2
|
|
diff --git a/src/python/xalarm/register_xalarm.py b/src/python/xalarm/register_xalarm.py
|
|
new file mode 100644
|
|
index 0000000..e58343d
|
|
--- /dev/null
|
|
+++ b/src/python/xalarm/register_xalarm.py
|
|
@@ -0,0 +1,192 @@
|
|
+import os
|
|
+import sys
|
|
+import socket
|
|
+import logging
|
|
+import threading
|
|
+import time
|
|
+import fcntl
|
|
+import inspect
|
|
+from struct import error as StructParseError
|
|
+
|
|
+from .xalarm_api import Xalarm, alarm_bin2stu
|
|
+
|
|
+
|
|
+ALARM_REPORT_LEN = 1048
|
|
+MAX_NUM_OF_ALARM_ID=128
|
|
+MIN_ALARM_ID = 1001
|
|
+MAX_ALARM_ID = (MIN_ALARM_ID + MAX_NUM_OF_ALARM_ID - 1)
|
|
+DIR_XALARM = "/var/run/xalarm"
|
|
+PATH_REG_ALARM = "/var/run/xalarm/alarm"
|
|
+PATH_REPORT_ALARM = "/var/run/xalarm/report"
|
|
+ALARM_DIR_PERMISSION = 0o0750
|
|
+ALARM_REG_SOCK_PERMISSION = 0o0700
|
|
+ALARM_SOCKET_PERMISSION = 0o0700
|
|
+TIME_UNIT_MILLISECONDS = 1000
|
|
+ALARM_REGISTER_INFO = None
|
|
+
|
|
+
|
|
+class AlarmRegister:
|
|
+ def __init__(self, id_filter: list[bool], callback: callable):
|
|
+ self.id_filter = id_filter
|
|
+ self.callback = callback
|
|
+ self.socket = self.create_unix_socket()
|
|
+ self.is_registered = True
|
|
+ self.thread = threading.Thread(target=self.alarm_recv)
|
|
+ self.thread_should_stop = False
|
|
+
|
|
+ def check_params(self) -> bool:
|
|
+ if (len(self.id_filter) != MAX_NUM_OF_ALARM_ID):
|
|
+ sys.stderr.write("check_params: invalid param id_filter\n")
|
|
+ return False
|
|
+
|
|
+ sig = inspect.signature(self.callback)
|
|
+ if len(sig.parameters) != 1:
|
|
+ sys.stderr.write("check_params: invalid param callback\n")
|
|
+ return False
|
|
+
|
|
+ if self.socket is None:
|
|
+ sys.stderr.write("check_params: scoket create failed\n")
|
|
+ return False
|
|
+ return True
|
|
+
|
|
+ def set_id_filter(self, id_filter: list[bool]) -> bool:
|
|
+ if (len(id_filter) > MAX_NUM_OF_ALARM_ID):
|
|
+ sys.stderr.write("set_id_filter: invalid param id_filter\n")
|
|
+ return False
|
|
+ self.id_filter = id_filter
|
|
+
|
|
+ def id_is_registered(self, alarm_id) -> bool:
|
|
+ if alarm_id < MIN_ALARM_ID or alarm_id > MAX_ALARM_ID:
|
|
+ return False
|
|
+ return self.id_filter[alarm_id - MIN_ALARM_ID]
|
|
+
|
|
+ def put_alarm_info(self, alarm_info: Xalarm) -> None:
|
|
+ if not self.callback or not alarm_info:
|
|
+ return
|
|
+ if not self.id_is_registered(alarm_info.alarm_id):
|
|
+ return
|
|
+ self.callback(alarm_info)
|
|
+
|
|
+ def create_unix_socket(self) -> socket.socket:
|
|
+ try:
|
|
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
+ sock.setblocking(False)
|
|
+
|
|
+ if not os.access(DIR_XALARM, os.F_OK):
|
|
+ os.makedirs(DIR_XALARM)
|
|
+ os.chmod(DIR_XALARM, ALARM_DIR_PERMISSION)
|
|
+
|
|
+ sock.connect(PATH_REG_ALARM)
|
|
+ return sock
|
|
+ except (IOError, OSError, FileNotFoundError) as e:
|
|
+ sock.close()
|
|
+ sys.stderr.write(f"create_unix_socket: create socket error:{e}\n")
|
|
+ return None
|
|
+
|
|
+ def alarm_recv(self):
|
|
+ while not self.thread_should_stop:
|
|
+ try:
|
|
+ data = self.socket.recv(ALARM_REPORT_LEN)
|
|
+ if not data:
|
|
+ sys.stderr.write("connection closed by xalarmd, maybe connections reach max num or service stopped.\n")
|
|
+ self.thread_should_stop = True
|
|
+ break
|
|
+ if len(data) != ALARM_REPORT_LEN:
|
|
+ sys.stderr.write(f"server receive report msg length wrong {len(data)}\n")
|
|
+ continue
|
|
+
|
|
+ alarm_info = alarm_bin2stu(data)
|
|
+ self.put_alarm_info(alarm_info)
|
|
+ except (BlockingIOError) as e:
|
|
+ time.sleep(0.1)
|
|
+ except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError):
|
|
+ sys.stderr.write("Connection closed by the server.\n")
|
|
+ self.thread_should_stop = True
|
|
+ except (ValueError, StructParseError, InterruptedError) as e:
|
|
+ sys.stderr.write(f"{e}\n")
|
|
+ except Exception as e:
|
|
+ sys.stderr.write(f"{e}\n")
|
|
+ self.thread_should_stop = True
|
|
+
|
|
+ def start_thread(self) -> None:
|
|
+ self.thread.daemon = True
|
|
+ self.thread.start()
|
|
+
|
|
+ def stop_thread(self) -> None:
|
|
+ self.thread_should_stop = True
|
|
+ self.thread.join()
|
|
+ self.socket.close()
|
|
+
|
|
+
|
|
+def xalarm_register(callback: callable, id_filter: list[bool]) -> int:
|
|
+ global ALARM_REGISTER_INFO
|
|
+
|
|
+ if ALARM_REGISTER_INFO is not None:
|
|
+ sys.stderr.write("xalarm_register: alarm has registered\n")
|
|
+ return -1
|
|
+
|
|
+ ALARM_REGISTER_INFO = AlarmRegister(id_filter, callback)
|
|
+ if not ALARM_REGISTER_INFO.check_params():
|
|
+ return -1
|
|
+
|
|
+ ALARM_REGISTER_INFO.start_thread()
|
|
+
|
|
+ return 0
|
|
+
|
|
+
|
|
+def xalarm_unregister(clientId: int) -> None:
|
|
+ global ALARM_REGISTER_INFO
|
|
+ if clientId < 0:
|
|
+ sys.stderr.write("xalarm_unregister: invalid client\n")
|
|
+ return
|
|
+
|
|
+ if ALARM_REGISTER_INFO is None:
|
|
+ sys.stderr.write("xalarm_unregister: alarm has not registered\n")
|
|
+ return
|
|
+
|
|
+ ALARM_REGISTER_INFO.stop_thread()
|
|
+ ALARM_REGISTER_INFO = None
|
|
+
|
|
+
|
|
+def xalarm_upgrade(clientId: int, id_filter: list[bool]) -> None:
|
|
+ global ALARM_REGISTER_INFO
|
|
+ if clientId < 0:
|
|
+ sys.stderr.write("xalarm_unregister: invalid client\n")
|
|
+ return
|
|
+ if ALARM_REGISTER_INFO is None:
|
|
+ sys.stderr.write("xalarm_unregister: alarm has not registered\n")
|
|
+ return
|
|
+ ALARM_REGISTER_INFO.id_filter = id_filter
|
|
+
|
|
+
|
|
+def xalarm_getid(alarm_info: Xalarm) -> int:
|
|
+ if not alarm_info:
|
|
+ return 0
|
|
+ return alarm_info.alarm_id
|
|
+
|
|
+
|
|
+def xalarm_getlevel(alarm_info: Xalarm) -> int:
|
|
+ if not alarm_info:
|
|
+ return 0
|
|
+ return alarm_info.alarm_level
|
|
+
|
|
+
|
|
+def xalarm_gettype(alarm_info: Xalarm) -> int:
|
|
+ if not alarm_info:
|
|
+ return 0
|
|
+ return alarm_info.alarm_type
|
|
+
|
|
+
|
|
+def xalarm_gettime(alarm_info: Xalarm) -> int:
|
|
+ if not alarm_info:
|
|
+ return 0
|
|
+ return alarm_info.timetamp.tv_sec * TIME_UNIT_MILLISECONDS + alarm_info.timetamp.tv_usec / TIME_UNIT_MILLISECONDS
|
|
+
|
|
+def xalarm_getdesc(alarm_info: Xalarm) -> str:
|
|
+ if not alarm_info:
|
|
+ return None
|
|
+ try:
|
|
+ desc_str = alarm_info.msg1.rstrip(b'\x00').decode('utf-8')
|
|
+ except UnicodeError:
|
|
+ desc_str = None
|
|
+ return desc_str
|
|
diff --git a/src/python/xalarm/sentry_notify.py b/src/python/xalarm/sentry_notify.py
|
|
new file mode 100644
|
|
index 0000000..a19e5b3
|
|
--- /dev/null
|
|
+++ b/src/python/xalarm/sentry_notify.py
|
|
@@ -0,0 +1,71 @@
|
|
+import os
|
|
+import sys
|
|
+import time
|
|
+import socket
|
|
+from struct import error as StructParseError
|
|
+
|
|
+from .xalarm_api import alarm_stu2bin, Xalarm
|
|
+
|
|
+MAX_NUM_OF_ALARM_ID = 128
|
|
+MIN_ALARM_ID = 1001
|
|
+MAX_ALARM_ID = (MIN_ALARM_ID + MAX_NUM_OF_ALARM_ID - 1)
|
|
+
|
|
+MINOR_ALM = 1
|
|
+MAJOR_ALM = 2
|
|
+CRITICAL_ALM = 3
|
|
+
|
|
+ALARM_TYPE_OCCUR = 1
|
|
+ALARM_TYPE_RECOVER = 2
|
|
+
|
|
+MAX_PUC_PARAS_LEN = 1024
|
|
+
|
|
+DIR_XALARM = "/var/run/xalarm"
|
|
+PATH_REPORT_ALARM = "/var/run/xalarm/report"
|
|
+ALARM_DIR_PERMISSION = 0o750
|
|
+ALARM_SOCKET_PERMISSION = 0o700
|
|
+
|
|
+
|
|
+def check_params(alarm_id, alarm_level, alarm_type, puc_paras) -> bool:
|
|
+ if not os.path.exists(DIR_XALARM):
|
|
+ sys.stderr.write(f"check_params: {DIR_XALARM} not exist, failed")
|
|
+ return False
|
|
+
|
|
+ if not os.path.exists(PATH_REPORT_ALARM):
|
|
+ sys.stderr.write(f"check_params: {PATH_REPORT_ALARM} not exist, failed")
|
|
+ return False
|
|
+
|
|
+ if (alarm_id < MIN_ALARM_ID or alarm_id > MAX_ALARM_ID or
|
|
+ alarm_level < MINOR_ALM or alarm_level > CRITICAL_ALM or
|
|
+ alarm_type < ALARM_TYPE_OCCUR or alarm_type > ALARM_TYPE_RECOVER):
|
|
+ sys.stderr.write("check_params: alarm info invalid\n")
|
|
+ return False
|
|
+
|
|
+ if len(puc_paras) >= MAX_PUC_PARAS_LEN:
|
|
+ sys.stderr.write(f"check_params: alarm msg should be less than {MAX_PUC_PARAS_LEN}\n")
|
|
+ return False
|
|
+
|
|
+ return True
|
|
+
|
|
+def xalarm_report(alarm_id, alarm_level, alarm_type, puc_paras) -> bool:
|
|
+ if not check_params(alarm_id, alarm_level, alarm_type, puc_paras):
|
|
+ return False
|
|
+
|
|
+ try:
|
|
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
|
+
|
|
+ current_time = time.time()
|
|
+ current_time_seconds = int(current_time)
|
|
+ current_microseconds = int((current_time - current_time_seconds) * 1_000_000)
|
|
+ alarm_info = Xalarm(alarm_id, alarm_type, alarm_level,
|
|
+ current_time_seconds, current_microseconds, puc_paras)
|
|
+
|
|
+ sock.sendto(alarm_stu2bin(alarm_info), PATH_REPORT_ALARM)
|
|
+ except (FileNotFoundError, StructParseError, socket.error, OSError, UnicodeError) as e:
|
|
+ sys.stderr.write(f"check_params: error occurs when sending msg.{e}\n")
|
|
+ return False
|
|
+ finally:
|
|
+ sock.close()
|
|
+
|
|
+ return True
|
|
+
|
|
+
|
|
diff --git a/src/python/xalarm/xalarm_api.py b/src/python/xalarm/xalarm_api.py
|
|
index 94d7638..99eabf5 100644
|
|
--- a/src/python/xalarm/xalarm_api.py
|
|
+++ b/src/python/xalarm/xalarm_api.py
|
|
@@ -23,6 +23,7 @@ ALARM_LEVELS = (1, 2, 3, 4, 5)
|
|
ALARM_SOCK_PATH = "/var/run/xalarm/report"
|
|
MIN_ALARM_ID = 1001
|
|
MAX_ALARM_ID = 1128
|
|
+MAX_MSG_LEN = 1024
|
|
|
|
|
|
@dataclasses.dataclass
|
|
@@ -97,15 +98,15 @@ class Xalarm:
|
|
def msg1(self, msg):
|
|
"""msg1 setter
|
|
"""
|
|
- if len(msg) > 512:
|
|
- raise ValueError("msg1 length must below 255")
|
|
+ if len(msg) > MAX_MSG_LEN:
|
|
+ raise ValueError(f"msg1 length must below {MAX_MSG_LEN}")
|
|
self._msg1 = msg
|
|
|
|
|
|
def alarm_bin2stu(bin_data):
|
|
"""alarm binary to struct
|
|
"""
|
|
- struct_data = struct.unpack("@HBBll512s", bin_data)
|
|
+ struct_data = struct.unpack(f"@HBBll{MAX_MSG_LEN}s", bin_data)
|
|
|
|
alarm_info = Xalarm(1001, 2, 1, 0, 0, "")
|
|
alarm_info.alarm_id = struct_data[0]
|
|
@@ -116,3 +117,14 @@ def alarm_bin2stu(bin_data):
|
|
alarm_info.msg1 = struct_data[5]
|
|
|
|
return alarm_info
|
|
+
|
|
+
|
|
+def alarm_stu2bin(alarm_info: Xalarm):
|
|
+ return struct.pack(
|
|
+ f'@HBBll{MAX_MSG_LEN}s',
|
|
+ alarm_info.alarm_id,
|
|
+ alarm_info.alarm_level,
|
|
+ alarm_info.alarm_type,
|
|
+ alarm_info.timetamp.tv_sec,
|
|
+ alarm_info.timetamp.tv_usec,
|
|
+ alarm_info.msg1.encode('utf-8'))
|
|
diff --git a/src/python/xalarm/xalarm_server.py b/src/python/xalarm/xalarm_server.py
|
|
index 84db273..fcaf393 100644
|
|
--- a/src/python/xalarm/xalarm_server.py
|
|
+++ b/src/python/xalarm/xalarm_server.py
|
|
@@ -17,16 +17,20 @@ Create: 2023-11-02
|
|
import socket
|
|
import os
|
|
import logging
|
|
+import select
|
|
+import threading
|
|
from struct import error as StructParseError
|
|
|
|
from .xalarm_api import alarm_bin2stu
|
|
-from .xalarm_transfer import check_filter, transmit_alarm
|
|
+from .xalarm_transfer import check_filter, transmit_alarm, wait_for_connection
|
|
|
|
|
|
ALARM_DIR = "/var/run/xalarm"
|
|
+USER_RECV_SOCK = "/var/run/xalarm/alarm"
|
|
SOCK_FILE = "/var/run/xalarm/report"
|
|
-ALARM_REPORT_LEN = 536
|
|
+ALARM_REPORT_LEN = 1048
|
|
ALARM_DIR_PERMISSION = 0o750
|
|
+ALARM_LISTEN_QUEUE_LEN = 5
|
|
|
|
|
|
def clear_sock_path():
|
|
@@ -37,6 +41,8 @@ def clear_sock_path():
|
|
os.chmod(ALARM_DIR, ALARM_DIR_PERMISSION)
|
|
if os.path.exists(SOCK_FILE):
|
|
os.unlink(SOCK_FILE)
|
|
+ if os.path.exists(USER_RECV_SOCK):
|
|
+ os.unlink(USER_RECV_SOCK)
|
|
|
|
|
|
def server_loop(alarm_config):
|
|
@@ -49,6 +55,21 @@ def server_loop(alarm_config):
|
|
sock.bind(SOCK_FILE)
|
|
os.chmod(SOCK_FILE, 0o600)
|
|
|
|
+ alarm_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
+ alarm_sock.bind(USER_RECV_SOCK)
|
|
+ os.chmod(USER_RECV_SOCK, 0o600)
|
|
+ alarm_sock.listen(ALARM_LISTEN_QUEUE_LEN)
|
|
+ alarm_sock.setblocking(False)
|
|
+
|
|
+ epoll = select.epoll()
|
|
+ epoll.register(alarm_sock.fileno(), select.EPOLLIN)
|
|
+ fd_to_socket = {alarm_sock.fileno(): alarm_sock,}
|
|
+ thread_should_stop = False
|
|
+
|
|
+ thread = threading.Thread(target=wait_for_connection, args=(alarm_sock, epoll, fd_to_socket, thread_should_stop))
|
|
+ thread.daemon = True
|
|
+ thread.start()
|
|
+
|
|
while True:
|
|
try:
|
|
data, _ = sock.recvfrom(ALARM_REPORT_LEN)
|
|
@@ -58,14 +79,21 @@ def server_loop(alarm_config):
|
|
logging.debug("server receive report msg length wrong %d",
|
|
len(data))
|
|
continue
|
|
-
|
|
alarm_info = alarm_bin2stu(data)
|
|
logging.debug("server bin2stu msg")
|
|
if not check_filter(alarm_info, alarm_config):
|
|
continue
|
|
+ transmit_alarm(alarm_sock, epoll, fd_to_socket, data)
|
|
+ except Exception as e:
|
|
+ logging.error(f"Error server:{e}")
|
|
+
|
|
+ thread_should_stop = True
|
|
+ thread.join()
|
|
|
|
- transmit_alarm(data)
|
|
- except (ValueError, StructParseError):
|
|
- pass
|
|
+ epoll.unregister(alarm_sock.fileno())
|
|
+ epoll.close()
|
|
+ alarm_sock.close()
|
|
+ os.unlink(USER_RECV_SOCK)
|
|
|
|
sock.close()
|
|
+
|
|
diff --git a/src/python/xalarm/xalarm_transfer.py b/src/python/xalarm/xalarm_transfer.py
|
|
index b590b43..42137d8 100644
|
|
--- a/src/python/xalarm/xalarm_transfer.py
|
|
+++ b/src/python/xalarm/xalarm_transfer.py
|
|
@@ -16,10 +16,12 @@ Create: 2023-11-02
|
|
|
|
import socket
|
|
import logging
|
|
+import select
|
|
|
|
-USER_RECV_SOCK = "/var/run/xalarm/alarm"
|
|
MIN_ID_NUMBER = 1001
|
|
MAX_ID_NUMBER = 1128
|
|
+MAX_CONNECTION_NUM = 100
|
|
+TEST_CONNECT_BUFFER_SIZE = 32
|
|
|
|
|
|
def check_filter(alarm_info, alarm_filter):
|
|
@@ -35,16 +37,84 @@ def check_filter(alarm_info, alarm_filter):
|
|
return True
|
|
|
|
|
|
-def transmit_alarm(bin_data):
|
|
- """forward alarm message
|
|
+def cleanup_closed_connections(server_sock, epoll, fd_to_socket):
|
|
"""
|
|
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
|
- try:
|
|
- sock.sendto(bin_data, USER_RECV_SOCK)
|
|
- logging.debug("transfer alarm success")
|
|
- except ConnectionRefusedError:
|
|
- logging.debug("transfer sendto failed")
|
|
- except FileNotFoundError:
|
|
- logging.debug("transfer sendto failed")
|
|
- finally:
|
|
- sock.close()
|
|
+ clean invalid client socket connections saved in 'fd_to_socket'
|
|
+ :param server_sock: server socket instance of alarm
|
|
+ :param epoll: epoll instance, used to unregister invalid client connections
|
|
+ :param fd_to_socket: dict instance, used to hold client connections and server connections
|
|
+ """
|
|
+ to_remove = []
|
|
+ for fileno, connection in fd_to_socket.items():
|
|
+ if connection is server_sock:
|
|
+ continue
|
|
+ try:
|
|
+ # test whether connection still alive, use MSG_DONTWAIT to avoid blocking thread
|
|
+ # use MSG_PEEK to avoid consuming buffer data
|
|
+ data = connection.recv(TEST_CONNECT_BUFFER_SIZE, socket.MSG_DONTWAIT | socket.MSG_PEEK)
|
|
+ if not data:
|
|
+ to_remove.append(fileno)
|
|
+ except BlockingIOError:
|
|
+ pass
|
|
+ except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError):
|
|
+ to_remove.append(fileno)
|
|
+
|
|
+ for fileno in to_remove:
|
|
+ epoll.unregister(fileno)
|
|
+ fd_to_socket[fileno].close()
|
|
+ del fd_to_socket[fileno]
|
|
+ logging.info(f"cleaned up connection {fileno} for client lost connection.")
|
|
+
|
|
+
|
|
+def wait_for_connection(server_sock, epoll, fd_to_socket, thread_should_stop):
|
|
+ """
|
|
+ thread function for catch and save client connection
|
|
+ :param server_sock: server socket instance of alarm
|
|
+ :param epoll: epoll instance, used to unregister invalid client connections
|
|
+ :param fd_to_socket: dict instance, used to hold client connections and server connections
|
|
+ :param thread_should_stop: bool instance
|
|
+ """
|
|
+ while not thread_should_stop:
|
|
+ try:
|
|
+ events = epoll.poll(1)
|
|
+
|
|
+ for fileno, event in events:
|
|
+ if fileno == server_sock.fileno():
|
|
+ connection, client_address = server_sock.accept()
|
|
+ # if reach max connection, cleanup closed connections
|
|
+ if len(fd_to_socket) - 1 >= MAX_CONNECTION_NUM:
|
|
+ cleanup_closed_connections(server_sock, epoll, fd_to_socket)
|
|
+ # if connections still reach max num, close this connection automatically
|
|
+ if len(fd_to_socket) - 1 >= MAX_CONNECTION_NUM:
|
|
+ logging.info(f"connection reach max num of {MAX_CONNECTION_NUM}, closed current connection!")
|
|
+ connection.close()
|
|
+ continue
|
|
+ epoll.register(connection.fileno(), select.EPOLLOUT)
|
|
+ fd_to_socket[connection.fileno()] = connection
|
|
+ except socket.error as e:
|
|
+ logging.debug(f"socket error, reason is {e}")
|
|
+ break
|
|
+ except (KeyError, OSError, ValueError) as e:
|
|
+ logging.debug(f"wait for connection failed {e}")
|
|
+
|
|
+
|
|
+def transmit_alarm(server_sock, epoll, fd_to_socket, bin_data):
|
|
+ """
|
|
+ this function is to broadcast alarm data to client, if fail to send data, remove connections held by fd_to_socket
|
|
+ :param server_sock: server socket instance of alarm
|
|
+ :param epoll: epoll instance, used to unregister invalid client connections
|
|
+ :param fd_to_socket: dict instance, used to hold client connections and server connections
|
|
+ :param bin_data: binary instance, alarm info data in C-style struct format defined in xalarm_api.py
|
|
+ """
|
|
+ to_remove = []
|
|
+ for fileno, connection in fd_to_socket.items():
|
|
+ if connection is not server_sock:
|
|
+ try:
|
|
+ connection.sendall(bin_data)
|
|
+ except (BrokenPipeError, ConnectionResetError):
|
|
+ to_remove.append(fileno)
|
|
+ for fileno in to_remove:
|
|
+ epoll.unregister(fileno)
|
|
+ fd_to_socket[fileno].close()
|
|
+ del fd_to_socket[fileno]
|
|
+
|
|
--
|
|
2.27.0
|
|
|