/** * @file BConnection_unix.c * @author Ambroz Bizjak <ambrop7@gmail.com> * * @section LICENSE * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither the name of the author nor the * names of its contributors may be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include <string.h> #include <stddef.h> #include <unistd.h> #include <errno.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/un.h> #include <misc/nonblocking.h> #include <misc/strdup.h> #include <base/BLog.h> #include "BConnection.h" #include <generated/blog_channel_BConnection.h> #define MAX_UNIX_SOCKET_PATH 200 #define SEND_STATE_NOT_INITED 0 #define SEND_STATE_READY 1 #define SEND_STATE_BUSY 2 #define RECV_STATE_NOT_INITED 0 #define RECV_STATE_READY 1 #define RECV_STATE_BUSY 2 #define RECV_STATE_INITED_CLOSED 3 #define RECV_STATE_NOT_INITED_CLOSED 4 struct sys_addr { socklen_t len; union { struct sockaddr generic; struct sockaddr_in ipv4; struct sockaddr_in6 ipv6; } addr; }; struct unix_addr { socklen_t len; union { struct sockaddr_un addr; uint8_t bytes[offsetof(struct sockaddr_un, sun_path) + MAX_UNIX_SOCKET_PATH + 1]; } u; }; static int build_unix_address (struct unix_addr *out, const char *socket_path); static void addr_socket_to_sys (struct sys_addr *out, BAddr addr); static void addr_sys_to_socket (BAddr *out, struct sys_addr addr); static void listener_fd_handler (BListener *o, int events); static void listener_default_job_handler (BListener *o); static void connector_fd_handler (BConnector *o, int events); static void connector_job_handler (BConnector *o); static void connection_report_error (BConnection *o); static void connection_send (BConnection *o); static void connection_recv (BConnection *o); static void connection_fd_handler (BConnection *o, int events); static void connection_send_job_handler (BConnection *o); static void connection_recv_job_handler (BConnection *o); static void connection_send_if_handler_send (BConnection *o, uint8_t *data, int data_len); static void connection_recv_if_handler_recv (BConnection *o, uint8_t *data, int data_len); static int build_unix_address (struct unix_addr *out, const char *socket_path) { ASSERT(socket_path); if (strlen(socket_path) > MAX_UNIX_SOCKET_PATH) { return 0; } out->len = offsetof(struct sockaddr_un, sun_path) + strlen(socket_path) + 1; out->u.addr.sun_family = AF_UNIX; strcpy(out->u.addr.sun_path, socket_path); return 1; } static void addr_socket_to_sys (struct sys_addr *out, BAddr addr) { switch (addr.type) { case BADDR_TYPE_IPV4: { out->len = sizeof(out->addr.ipv4); memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4)); out->addr.ipv4.sin_family = AF_INET; out->addr.ipv4.sin_port = addr.ipv4.port; out->addr.ipv4.sin_addr.s_addr = addr.ipv4.ip; } break; case BADDR_TYPE_IPV6: { out->len = sizeof(out->addr.ipv6); memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6)); out->addr.ipv6.sin6_family = AF_INET6; out->addr.ipv6.sin6_port = addr.ipv6.port; out->addr.ipv6.sin6_flowinfo = 0; memcpy(out->addr.ipv6.sin6_addr.s6_addr, addr.ipv6.ip, 16); out->addr.ipv6.sin6_scope_id = 0; } break; default: ASSERT(0); } } static void addr_sys_to_socket (BAddr *out, struct sys_addr addr) { switch (addr.addr.generic.sa_family) { case AF_INET: { ASSERT(addr.len == sizeof(struct sockaddr_in)) BAddr_InitIPv4(out, addr.addr.ipv4.sin_addr.s_addr, addr.addr.ipv4.sin_port); } break; case AF_INET6: { ASSERT(addr.len == sizeof(struct sockaddr_in6)) BAddr_InitIPv6(out, addr.addr.ipv6.sin6_addr.s6_addr, addr.addr.ipv6.sin6_port); } break; default: { BAddr_InitNone(out); } break; } } static void listener_fd_handler (BListener *o, int events) { DebugObject_Access(&o->d_obj); // set default job BPending_Set(&o->default_job); // call handler o->handler(o->user); return; } static void listener_default_job_handler (BListener *o) { DebugObject_Access(&o->d_obj); BLog(BLOG_ERROR, "discarding connection"); // accept int newfd = accept(o->fd, NULL, NULL); if (newfd < 0) { BLog(BLOG_ERROR, "accept failed"); return; } // close new fd if (close(newfd) < 0) { BLog(BLOG_ERROR, "close failed"); } } static void connector_fd_handler (BConnector *o, int events) { DebugObject_Access(&o->d_obj); ASSERT(o->fd >= 0) ASSERT(!o->connected) ASSERT(o->have_bfd) // free BFileDescriptor BReactor_RemoveFileDescriptor(o->reactor, &o->bfd); // set have no BFileDescriptor o->have_bfd = 0; // read connection result int result; socklen_t result_len = sizeof(result); if (getsockopt(o->fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) { BLog(BLOG_ERROR, "getsockopt failed"); goto fail0; } ASSERT_FORCE(result_len == sizeof(result)) if (result != 0) { BLog(BLOG_ERROR, "connection failed"); goto fail0; } // set connected o->connected = 1; fail0: // call handler o->handler(o->user, !o->connected); return; } static void connector_job_handler (BConnector *o) { DebugObject_Access(&o->d_obj); ASSERT(o->fd >= 0) ASSERT(o->connected) ASSERT(!o->have_bfd) // call handler o->handler(o->user, 0); return; } static void connection_report_error (BConnection *o) { DebugError_AssertNoError(&o->d_err); ASSERT(o->handler) // report error DEBUGERROR(&o->d_err, o->handler(o->user, BCONNECTION_EVENT_ERROR)); return; } static void connection_send (BConnection *o) { DebugError_AssertNoError(&o->d_err); ASSERT(o->send.state == SEND_STATE_BUSY) // limit if (!o->is_hupd) { if (!BReactorLimit_Increment(&o->send.limit)) { // wait for fd o->wait_events |= BREACTOR_WRITE; BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events); return; } } // send int bytes = write(o->fd, o->send.busy_data, o->send.busy_data_len); if (bytes < 0) { if (!o->is_hupd && (errno == EAGAIN || errno == EWOULDBLOCK)) { // wait for fd o->wait_events |= BREACTOR_WRITE; BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events); return; } BLog(BLOG_ERROR, "send failed"); connection_report_error(o); return; } ASSERT(bytes > 0) ASSERT(bytes <= o->send.busy_data_len) // set ready o->send.state = SEND_STATE_READY; // done StreamPassInterface_Done(&o->send.iface, bytes); } static void connection_recv (BConnection *o) { DebugError_AssertNoError(&o->d_err); ASSERT(o->recv.state == RECV_STATE_BUSY) // limit if (!o->is_hupd) { if (!BReactorLimit_Increment(&o->recv.limit)) { // wait for fd o->wait_events |= BREACTOR_READ; BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events); return; } } // recv int bytes = read(o->fd, o->recv.busy_data, o->recv.busy_data_avail); if (bytes < 0) { if (!o->is_hupd && (errno == EAGAIN || errno == EWOULDBLOCK)) { // wait for fd o->wait_events |= BREACTOR_READ; BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events); return; } BLog(BLOG_ERROR, "recv failed"); connection_report_error(o); return; } if (bytes == 0) { // set recv inited closed o->recv.state = RECV_STATE_INITED_CLOSED; // report recv closed o->handler(o->user, BCONNECTION_EVENT_RECVCLOSED); return; } ASSERT(bytes > 0) ASSERT(bytes <= o->recv.busy_data_avail) // set not busy o->recv.state = RECV_STATE_READY; // done StreamRecvInterface_Done(&o->recv.iface, bytes); } static void connection_fd_handler (BConnection *o, int events) { DebugObject_Access(&o->d_obj); DebugError_AssertNoError(&o->d_err); ASSERT(!o->is_hupd) // clear handled events o->wait_events &= ~events; BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events); int have_send = 0; int have_recv = 0; // if we got a HUP event, stop monitoring the file descriptor if ((events & BREACTOR_HUP)) { BReactor_RemoveFileDescriptor(o->reactor, &o->bfd); o->is_hupd = 1; } if ((events & BREACTOR_WRITE) || ((events & (BREACTOR_ERROR|BREACTOR_HUP)) && o->send.state == SEND_STATE_BUSY)) { ASSERT(o->send.state == SEND_STATE_BUSY) have_send = 1; } if ((events & BREACTOR_READ) || ((events & (BREACTOR_ERROR|BREACTOR_HUP)) && o->recv.state == RECV_STATE_BUSY)) { ASSERT(o->recv.state == RECV_STATE_BUSY) have_recv = 1; } if (have_send) { if (have_recv) { BPending_Set(&o->recv.job); } connection_send(o); return; } if (have_recv) { connection_recv(o); return; } if (!o->is_hupd) { BLog(BLOG_ERROR, "fd error event"); connection_report_error(o); return; } } static void connection_send_job_handler (BConnection *o) { DebugObject_Access(&o->d_obj); DebugError_AssertNoError(&o->d_err); ASSERT(o->send.state == SEND_STATE_BUSY) connection_send(o); return; } static void connection_recv_job_handler (BConnection *o) { DebugObject_Access(&o->d_obj); DebugError_AssertNoError(&o->d_err); ASSERT(o->recv.state == RECV_STATE_BUSY) connection_recv(o); return; } static void connection_send_if_handler_send (BConnection *o, uint8_t *data, int data_len) { DebugObject_Access(&o->d_obj); DebugError_AssertNoError(&o->d_err); ASSERT(o->send.state == SEND_STATE_READY) ASSERT(data_len > 0) // remember data o->send.busy_data = data; o->send.busy_data_len = data_len; // set busy o->send.state = SEND_STATE_BUSY; connection_send(o); return; } static void connection_recv_if_handler_recv (BConnection *o, uint8_t *data, int data_avail) { DebugObject_Access(&o->d_obj); DebugError_AssertNoError(&o->d_err); ASSERT(o->recv.state == RECV_STATE_READY) ASSERT(data_avail > 0) // remember data o->recv.busy_data = data; o->recv.busy_data_avail = data_avail; // set busy o->recv.state = RECV_STATE_BUSY; connection_recv(o); return; } int BConnection_AddressSupported (BAddr addr) { BAddr_Assert(&addr); return (addr.type == BADDR_TYPE_IPV4 || addr.type == BADDR_TYPE_IPV6); } int BListener_Init (BListener *o, BAddr addr, BReactor *reactor, void *user, BListener_handler handler) { ASSERT(handler) BNetwork_Assert(); // init arguments o->reactor = reactor; o->user = user; o->handler = handler; // set no unix socket path o->unix_socket_path = NULL; // check address if (!BConnection_AddressSupported(addr)) { BLog(BLOG_ERROR, "address not supported"); goto fail0; } // convert address struct sys_addr sysaddr; addr_socket_to_sys(&sysaddr, addr); // init fd if ((o->fd = socket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0)) < 0) { BLog(BLOG_ERROR, "socket failed"); goto fail0; } // set non-blocking if (!badvpn_set_nonblocking(o->fd)) { BLog(BLOG_ERROR, "badvpn_set_nonblocking failed"); goto fail1; } // set SO_REUSEADDR int optval = 1; if (setsockopt(o->fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) { BLog(BLOG_ERROR, "setsockopt(SO_REUSEADDR) failed"); } // bind if (bind(o->fd, &sysaddr.addr.generic, sysaddr.len) < 0) { BLog(BLOG_ERROR, "bind failed"); goto fail1; } // listen if (listen(o->fd, BCONNECTION_LISTEN_BACKLOG) < 0) { BLog(BLOG_ERROR, "listen failed"); goto fail1; } // init BFileDescriptor BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)listener_fd_handler, o); if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) { BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed"); goto fail1; } BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ); // init default job BPending_Init(&o->default_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_default_job_handler, o); DebugObject_Init(&o->d_obj); return 1; fail1: if (close(o->fd) < 0) { BLog(BLOG_ERROR, "close failed"); } fail0: return 0; } int BListener_InitUnix (BListener *o, const char *socket_path, BReactor *reactor, void *user, BListener_handler handler) { ASSERT(socket_path) ASSERT(handler) BNetwork_Assert(); // init arguments o->reactor = reactor; o->user = user; o->handler = handler; // copy socket path o->unix_socket_path = b_strdup(socket_path); if (!o->unix_socket_path) { BLog(BLOG_ERROR, "b_strdup failed"); goto fail0; } // build address struct unix_addr addr; if (!build_unix_address(&addr, socket_path)) { BLog(BLOG_ERROR, "build_unix_address failed"); goto fail1; } // init fd if ((o->fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { BLog(BLOG_ERROR, "socket failed"); goto fail1; } // set non-blocking if (!badvpn_set_nonblocking(o->fd)) { BLog(BLOG_ERROR, "badvpn_set_nonblocking failed"); goto fail2; } // unlink existing socket if (unlink(o->unix_socket_path) < 0 && errno != ENOENT) { BLog(BLOG_ERROR, "unlink existing socket failed"); goto fail2; } // bind if (bind(o->fd, (struct sockaddr *)&addr.u.addr, addr.len) < 0) { BLog(BLOG_ERROR, "bind failed"); goto fail2; } // listen if (listen(o->fd, BCONNECTION_LISTEN_BACKLOG) < 0) { BLog(BLOG_ERROR, "listen failed"); goto fail3; } // init BFileDescriptor BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)listener_fd_handler, o); if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) { BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed"); goto fail3; } BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ); // init default job BPending_Init(&o->default_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_default_job_handler, o); DebugObject_Init(&o->d_obj); return 1; fail3: if (unlink(o->unix_socket_path) < 0) { BLog(BLOG_ERROR, "unlink socket failed"); } fail2: if (close(o->fd) < 0) { BLog(BLOG_ERROR, "close failed"); } fail1: free(o->unix_socket_path); fail0: return 0; } void BListener_Free (BListener *o) { DebugObject_Free(&o->d_obj); // free default job BPending_Free(&o->default_job); // free BFileDescriptor BReactor_RemoveFileDescriptor(o->reactor, &o->bfd); // free fd if (close(o->fd) < 0) { BLog(BLOG_ERROR, "close failed"); } // unlink unix socket if (o->unix_socket_path) { if (unlink(o->unix_socket_path) < 0) { BLog(BLOG_ERROR, "unlink socket failed"); } } // free unix socket path if (o->unix_socket_path) { free(o->unix_socket_path); } } int BConnector_Init (BConnector *o, BAddr addr, BReactor *reactor, void *user, BConnector_handler handler) { ASSERT(handler) BNetwork_Assert(); // init arguments o->reactor = reactor; o->user = user; o->handler = handler; // check address if (!BConnection_AddressSupported(addr)) { BLog(BLOG_ERROR, "address not supported"); goto fail0; } // convert address struct sys_addr sysaddr; addr_socket_to_sys(&sysaddr, addr); // init job BPending_Init(&o->job, BReactor_PendingGroup(o->reactor), (BPending_handler)connector_job_handler, o); // init fd if ((o->fd = socket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0)) < 0) { BLog(BLOG_ERROR, "socket failed"); goto fail1; } // set fd non-blocking if (!badvpn_set_nonblocking(o->fd)) { BLog(BLOG_ERROR, "badvpn_set_nonblocking failed"); goto fail2; } // connect fd int res = connect(o->fd, &sysaddr.addr.generic, sysaddr.len); if (res < 0 && errno != EINPROGRESS) { BLog(BLOG_ERROR, "connect failed"); goto fail2; } // set not connected o->connected = 0; // set have no BFileDescriptor o->have_bfd = 0; if (res < 0) { // init BFileDescriptor BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connector_fd_handler, o); if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) { BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed"); goto fail2; } BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_WRITE); // set have BFileDescriptor o->have_bfd = 1; } else { // set connected o->connected = 1; // set job BPending_Set(&o->job); } DebugObject_Init(&o->d_obj); return 1; fail2: if (close(o->fd) < 0) { BLog(BLOG_ERROR, "close failed"); } fail1: BPending_Free(&o->job); fail0: return 0; } int BConnector_InitUnix (BConnector *o, const char *socket_path, BReactor *reactor, void *user, BConnector_handler handler) { ASSERT(socket_path) ASSERT(handler) BNetwork_Assert(); // init arguments o->reactor = reactor; o->user = user; o->handler = handler; // build address struct unix_addr addr; if (!build_unix_address(&addr, socket_path)) { BLog(BLOG_ERROR, "build_unix_address failed"); goto fail0; } // init job BPending_Init(&o->job, BReactor_PendingGroup(o->reactor), (BPending_handler)connector_job_handler, o); // init fd if ((o->fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { BLog(BLOG_ERROR, "socket failed"); goto fail1; } // set fd non-blocking if (!badvpn_set_nonblocking(o->fd)) { BLog(BLOG_ERROR, "badvpn_set_nonblocking failed"); goto fail2; } // connect fd int res = connect(o->fd, (struct sockaddr *)&addr.u.addr, addr.len); if (res < 0 && errno != EINPROGRESS) { BLog(BLOG_ERROR, "connect failed"); goto fail2; } // set not connected o->connected = 0; // set have no BFileDescriptor o->have_bfd = 0; if (res < 0) { // init BFileDescriptor BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connector_fd_handler, o); if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) { BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed"); goto fail2; } BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_WRITE); // set have BFileDescriptor o->have_bfd = 1; } else { // set connected o->connected = 1; // set job BPending_Set(&o->job); } DebugObject_Init(&o->d_obj); return 1; fail2: if (close(o->fd) < 0) { BLog(BLOG_ERROR, "close failed"); } fail1: BPending_Free(&o->job); fail0: return 0; } void BConnector_Free (BConnector *o) { DebugObject_Free(&o->d_obj); // free BFileDescriptor if (o->have_bfd) { BReactor_RemoveFileDescriptor(o->reactor, &o->bfd); } // close fd if (o->fd != -1) { if (close(o->fd) < 0) { BLog(BLOG_ERROR, "close failed"); } } // free job BPending_Free(&o->job); } int BConnection_Init (BConnection *o, struct BConnection_source source, BReactor *reactor, void *user, BConnection_handler handler) { switch (source.type) { case BCONNECTION_SOURCE_TYPE_LISTENER: { BListener *listener = source.u.listener.listener; DebugObject_Access(&listener->d_obj); ASSERT(BPending_IsSet(&listener->default_job)) } break; case BCONNECTION_SOURCE_TYPE_CONNECTOR: { BConnector *connector = source.u.connector.connector; DebugObject_Access(&connector->d_obj); ASSERT(connector->fd >= 0) ASSERT(connector->connected) ASSERT(!connector->have_bfd) ASSERT(!BPending_IsSet(&connector->job)) } break; case BCONNECTION_SOURCE_TYPE_PIPE: { ASSERT(source.u.pipe.pipefd >= 0) } break; default: ASSERT(0); } ASSERT(handler) BNetwork_Assert(); // init arguments o->reactor = reactor; o->user = user; o->handler = handler; switch (source.type) { case BCONNECTION_SOURCE_TYPE_LISTENER: { BListener *listener = source.u.listener.listener; // unset listener's default job BPending_Unset(&listener->default_job); // accept struct sys_addr sysaddr; sysaddr.len = sizeof(sysaddr.addr); if ((o->fd = accept(listener->fd, &sysaddr.addr.generic, &sysaddr.len)) < 0) { BLog(BLOG_ERROR, "accept failed"); goto fail0; } o->close_fd = 1; // set non-blocking if (!badvpn_set_nonblocking(o->fd)) { BLog(BLOG_ERROR, "badvpn_set_nonblocking failed"); goto fail1; } // return address if (source.u.listener.out_addr) { addr_sys_to_socket(source.u.listener.out_addr, sysaddr); } } break; case BCONNECTION_SOURCE_TYPE_CONNECTOR: { BConnector *connector = source.u.connector.connector; // grab fd from connector o->fd = connector->fd; connector->fd = -1; o->close_fd = 1; } break; case BCONNECTION_SOURCE_TYPE_PIPE: { // use user-provided fd o->fd = source.u.pipe.pipefd; o->close_fd = 0; // set non-blocking if (!badvpn_set_nonblocking(o->fd)) { BLog(BLOG_ERROR, "badvpn_set_nonblocking failed"); goto fail1; } } break; } // set not HUPd o->is_hupd = 0; // init BFileDescriptor BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connection_fd_handler, o); if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) { BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed"); goto fail1; } // set no wait events o->wait_events = 0; // init limits BReactorLimit_Init(&o->send.limit, o->reactor, BCONNECTION_SEND_LIMIT); BReactorLimit_Init(&o->recv.limit, o->reactor, BCONNECTION_RECV_LIMIT); // set send and recv not inited o->send.state = SEND_STATE_NOT_INITED; o->recv.state = RECV_STATE_NOT_INITED; DebugError_Init(&o->d_err, BReactor_PendingGroup(o->reactor)); DebugObject_Init(&o->d_obj); return 1; fail1: if (o->close_fd) { if (close(o->fd) < 0) { BLog(BLOG_ERROR, "close failed"); } } fail0: return 0; } void BConnection_Free (BConnection *o) { DebugObject_Free(&o->d_obj); DebugError_Free(&o->d_err); ASSERT(o->send.state == SEND_STATE_NOT_INITED) ASSERT(o->recv.state == RECV_STATE_NOT_INITED || o->recv.state == RECV_STATE_NOT_INITED_CLOSED) // free limits BReactorLimit_Free(&o->recv.limit); BReactorLimit_Free(&o->send.limit); // free BFileDescriptor if (!o->is_hupd) { BReactor_RemoveFileDescriptor(o->reactor, &o->bfd); } // close fd if (o->close_fd) { if (close(o->fd) < 0) { BLog(BLOG_ERROR, "close failed"); } } } void BConnection_SetHandlers (BConnection *o, void *user, BConnection_handler handler) { DebugObject_Access(&o->d_obj); // set handlers o->user = user; o->handler = handler; } int BConnection_SetSendBuffer (BConnection *o, int buf_size) { DebugObject_Access(&o->d_obj); if (setsockopt(o->fd, SOL_SOCKET, SO_SNDBUF, (void *)&buf_size, sizeof(buf_size)) < 0) { BLog(BLOG_ERROR, "setsockopt failed"); return 0; } return 1; } void BConnection_SendAsync_Init (BConnection *o) { DebugObject_Access(&o->d_obj); DebugError_AssertNoError(&o->d_err); ASSERT(o->send.state == SEND_STATE_NOT_INITED) // init interface StreamPassInterface_Init(&o->send.iface, (StreamPassInterface_handler_send)connection_send_if_handler_send, o, BReactor_PendingGroup(o->reactor)); // init job BPending_Init(&o->send.job, BReactor_PendingGroup(o->reactor), (BPending_handler)connection_send_job_handler, o); // set ready o->send.state = SEND_STATE_READY; } void BConnection_SendAsync_Free (BConnection *o) { DebugObject_Access(&o->d_obj); ASSERT(o->send.state == SEND_STATE_READY || o->send.state == SEND_STATE_BUSY) // update events if (!o->is_hupd) { o->wait_events &= ~BREACTOR_WRITE; BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events); } // free job BPending_Free(&o->send.job); // free interface StreamPassInterface_Free(&o->send.iface); // set not inited o->send.state = SEND_STATE_NOT_INITED; } StreamPassInterface * BConnection_SendAsync_GetIf (BConnection *o) { DebugObject_Access(&o->d_obj); ASSERT(o->send.state == SEND_STATE_READY || o->send.state == SEND_STATE_BUSY) return &o->send.iface; } void BConnection_RecvAsync_Init (BConnection *o) { DebugObject_Access(&o->d_obj); DebugError_AssertNoError(&o->d_err); ASSERT(o->recv.state == RECV_STATE_NOT_INITED) // init interface StreamRecvInterface_Init(&o->recv.iface, (StreamRecvInterface_handler_recv)connection_recv_if_handler_recv, o, BReactor_PendingGroup(o->reactor)); // init job BPending_Init(&o->recv.job, BReactor_PendingGroup(o->reactor), (BPending_handler)connection_recv_job_handler, o); // set ready o->recv.state = RECV_STATE_READY; } void BConnection_RecvAsync_Free (BConnection *o) { DebugObject_Access(&o->d_obj); ASSERT(o->recv.state == RECV_STATE_READY || o->recv.state == RECV_STATE_BUSY || o->recv.state == RECV_STATE_INITED_CLOSED) // update events if (!o->is_hupd) { o->wait_events &= ~BREACTOR_READ; BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events); } // free job BPending_Free(&o->recv.job); // free interface StreamRecvInterface_Free(&o->recv.iface); // set not inited o->recv.state = RECV_STATE_NOT_INITED; } StreamRecvInterface * BConnection_RecvAsync_GetIf (BConnection *o) { DebugObject_Access(&o->d_obj); ASSERT(o->recv.state == RECV_STATE_READY || o->recv.state == RECV_STATE_BUSY || o->recv.state == RECV_STATE_INITED_CLOSED) return &o->recv.iface; }