From eedeee26cb113f550339d54238ca41eb012697e3 Mon Sep 17 00:00:00 2001 From: yinbin Date: Wed, 31 Jul 2024 20:31:57 +0800 Subject: [PATCH] refector fill udp sendring --- src/lstack/core/lstack_lwip.c | 109 ++++++++++++++++++------ src/lstack/core/lstack_protocol_stack.c | 11 +-- 2 files changed, 84 insertions(+), 36 deletions(-) diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c index 271e94f..19ff22d 100644 --- a/src/lstack/core/lstack_lwip.c +++ b/src/lstack/core/lstack_lwip.c @@ -457,22 +457,67 @@ int sem_timedwait_nsecs(sem_t *sem) return sem_timedwait(sem, &ts); } -static ssize_t do_lwip_fill_sendring(struct lwip_sock *sock, const void *buf, size_t len, - const struct sockaddr *addr, socklen_t addrlen) +static ssize_t do_lwip_udp_fill_sendring(struct lwip_sock *sock, const void *buf, size_t len, + const struct sockaddr *addr, socklen_t addrlen) { - if (sock->errevent > 0) { - GAZELLE_RETURN(ENOTCONN); + if (len > GAZELLE_UDP_PKGLEN_MAX) { + LSTACK_LOG(ERR, LSTACK, "Message too long\n"); + GAZELLE_RETURN(EMSGSIZE); + } + + ssize_t send_len = 0; + uint32_t write_num = (len + MBUF_MAX_DATA_LEN - 1) / MBUF_MAX_DATA_LEN; + uint32_t write_avail = gazelle_ring_readable_count(sock->send_ring); + struct wakeup_poll *wakeup = sock->wakeup; + + if (write_num > rte_ring_get_capacity(sock->send_ring)) { + LSTACK_LOG(ERR, LSTACK, "sock send_ring size is not enough\n"); + GAZELLE_RETURN(ENOMEM); + } + + /* if udp send 0 packet, set write_num to at least 1 */ + if (write_num == 0) { + write_num = 1; + } + + while (!netconn_is_nonblocking(sock->conn) && (write_avail < write_num)) { + if (sock->errevent > 0) { + GAZELLE_RETURN(ENOTCONN); + } + write_avail = gazelle_ring_readable_count(sock->send_ring); + } + + if (write_avail < write_num) { + sem_timedwait_nsecs(&sock->snd_ring_sem); + GAZELLE_RETURN(ENOMEM); + } + + send_len = app_buff_write(sock, (char *)buf, len, write_num, addr, addrlen); + + if (wakeup && wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLOUT) + && !NETCONN_IS_OUTIDLE(sock)) { + del_sock_event(sock, EPOLLOUT); + } + + if (wakeup) { + wakeup->stat.app_write_cnt += write_num; } - struct protocol_stack *stack = sock->stack; - if (!stack) { + return send_len; +} + +static ssize_t do_lwip_tcp_fill_sendring(struct lwip_sock *sock, const void *buf, size_t len, + const struct sockaddr *addr, socklen_t addrlen) +{ + /* refer to the lwip implementation. */ + if (len == 0) { return 0; } ssize_t send_len = 0; /* merge data into last pbuf */ - if (!NETCONN_IS_UDP(sock) && sock->remain_len) { + if (sock->remain_len) { sock->stack->stats.sock_tx_merge++; send_len = merge_data_lastpbuf(sock, (char *)buf, len); if (send_len >= len) { @@ -485,11 +530,6 @@ static ssize_t do_lwip_fill_sendring(struct lwip_sock *sock, const void *buf, si uint32_t write_avail = gazelle_ring_readable_count(sock->send_ring); struct wakeup_poll *wakeup = sock->wakeup; - /* if udp send 0 packet, set write_num to at least 1 */ - if (NETCONN_IS_UDP(sock) && write_num == 0) { - write_num = 1; - } - while (!netconn_is_nonblocking(sock->conn) && (write_avail < write_num)) { if (sock->errevent > 0) { GAZELLE_RETURN(ENOTCONN); @@ -500,9 +540,6 @@ static ssize_t do_lwip_fill_sendring(struct lwip_sock *sock, const void *buf, si /* send_ring is full, data attach last pbuf */ if (write_avail == 0) { sem_timedwait_nsecs(&sock->snd_ring_sem); - if (likely(sock->send_ring != NULL)) { - write_avail = gazelle_ring_readable_count(sock->send_ring); - } goto END; } @@ -523,10 +560,11 @@ static ssize_t do_lwip_fill_sendring(struct lwip_sock *sock, const void *buf, si } END: - if (send_len == 0 && !NETCONN_IS_UDP(sock)) { + if (send_len == 0) { errno = EAGAIN; return -1; } + return send_len; } @@ -817,28 +855,39 @@ static inline void thread_bind_stack(struct lwip_sock *sock) ssize_t do_lwip_send_to_stack(int32_t fd, const void *buf, size_t len, int32_t flags, const struct sockaddr *addr, socklen_t addrlen) { + ssize_t send = 0; + if (buf == NULL) { GAZELLE_RETURN(EINVAL); } - struct lwip_sock *sock = lwip_get_socket(fd); - if (len == 0 && !NETCONN_IS_UDP(sock)) { - return 0; - } - - if (NETCONN_IS_UDP(sock) && (len > GAZELLE_UDP_PKGLEN_MAX)) { - LSTACK_LOG(ERR, LSTACK, "Message too long\n"); - GAZELLE_RETURN(EMSGSIZE); + if (addr && addr->sa_family != AF_INET && addr->sa_family != AF_INET6) { + GAZELLE_RETURN(EINVAL); } + + struct lwip_sock *sock = lwip_get_socket(fd); thread_bind_stack(sock); if (sock->same_node_tx_ring != NULL) { return gazelle_same_node_ring_send(sock, buf, len, flags); } - ssize_t send = do_lwip_fill_sendring(sock, buf, len, addr, addrlen); - if (send < 0 || (send == 0 && !NETCONN_IS_UDP(sock))) { - return send; + if (sock->errevent > 0 || sock->stack == NULL) { + GAZELLE_RETURN(ENOTCONN); + } + + if (NETCONN_IS_UDP(sock)) { + send = do_lwip_udp_fill_sendring(sock, buf, len, addr, addrlen); + /* send = 0: udp send a empty package */ + if (send < 0) { + return send; + } + } else { + send = do_lwip_tcp_fill_sendring(sock, buf, len, addr, addrlen); + // send = 0 : tcp peer close connection ? + if (send <= 0) { + return send; + } } notice_stack_send(sock, fd, send, flags); @@ -860,7 +909,11 @@ ssize_t do_lwip_sendmsg_to_stack(struct lwip_sock *sock, int32_t s, const struct continue; } - ret = do_lwip_fill_sendring(sock, message->msg_iov[i].iov_base, message->msg_iov[i].iov_len, NULL, 0); + if (NETCONN_IS_UDP(sock)) { + ret = do_lwip_udp_fill_sendring(sock, message->msg_iov[i].iov_base, message->msg_iov[i].iov_len, NULL, 0); + } else { + ret = do_lwip_tcp_fill_sendring(sock, message->msg_iov[i].iov_base, message->msg_iov[i].iov_len, NULL, 0); + } if (ret <= 0) { buflen = (buflen == 0) ? ret : buflen; break; diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c index 2867711..f433d0d 100644 --- a/src/lstack/core/lstack_protocol_stack.c +++ b/src/lstack/core/lstack_protocol_stack.c @@ -916,7 +916,6 @@ void stack_udp_send(struct rpc_msg *msg) size_t len = msg->args[MSG_ARG_1].size; struct protocol_stack *stack = get_protocol_stack(); int replenish_again; - uint32_t call_num; struct lwip_sock *sock = lwip_get_socket(fd); if (sock == NULL) { @@ -930,16 +929,12 @@ void stack_udp_send(struct rpc_msg *msg) } replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0); - call_num = __sync_fetch_and_sub(&sock->call_num, 1); - if (replenish_again < 0) { - return; - } - - if ((call_num == 1) && (replenish_again > 0)) { + if ((replenish_again > 0) && (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1)) { rpc_call_replenish(&stack->rpc_queue, sock); return; } - + + __sync_fetch_and_sub(&sock->call_num, 1); return; } -- 2.33.0