#ifdef SOCKET_SERVER_FILE_MEMAPI # define STRINIFY_(S) #S # define STRINIFY(S) STRINIFY_(S) # include STRINIFY(SOCKET_SERVER_FILE_MEMAPI) # undef STRINIFY # undef STRINIFY_ #endif #include "socket_server.h" #include "socket_poll.h" #include #include #include #include #include #include #include #include #include #include #include #include #define MAX_INFO 128 // MAX_SOCKET will be 2^MAX_SOCKET_P #define MAX_SOCKET_P 16 #define MAX_EVENT 64 #define MIN_READ_BUFFER 64 #define SOCKET_TYPE_INVALID 0 #define SOCKET_TYPE_RESERVE 1 #define SOCKET_TYPE_PLISTEN 2 #define SOCKET_TYPE_LISTEN 3 #define SOCKET_TYPE_CONNECTING 4 #define SOCKET_TYPE_CONNECTED 5 #define SOCKET_TYPE_HALFCLOSE 6 #define SOCKET_TYPE_PACCEPT 7 #define SOCKET_TYPE_BIND 8 #define MAX_SOCKET (1<buffer = ss->soi.buffer(object); so->sz = ss->soi.size(object); so->free_func = ss->soi.free; return true; } else { so->buffer = object; so->sz = sz; so->free_func = FREE; return false; } } static inline void write_buffer_free(struct socket_server *ss, struct write_buffer *wb) { if (wb->userobject) { ss->soi.free(wb->buffer); } else { FREE(wb->buffer); } FREE(wb); } static void socket_keepalive(int fd) { int keepalive = 1; setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepalive , sizeof(keepalive)); } static int reserve_id(struct socket_server *ss) { int i; for (i=0;ialloc_id), 1); if (id < 0) { id = __sync_and_and_fetch(&(ss->alloc_id), 0x7fffffff); } struct socket *s = &ss->slot[HASH_ID(id)]; if (s->type == SOCKET_TYPE_INVALID) { if (__sync_bool_compare_and_swap(&s->type, SOCKET_TYPE_INVALID, SOCKET_TYPE_RESERVE)) { s->id = id; s->fd = -1; return id; } else { // retry --i; } } } return -1; } static inline void clear_wb_list(struct wb_list *list) { list->head = NULL; list->tail = NULL; } struct socket_server * socket_server_create() { int i; int fd[2]; poll_fd efd = sp_create(); if (sp_invalid(efd)) { fprintf(stderr, "socket-server: create event pool failed.\n"); return NULL; } if (pipe(fd)) { sp_release(efd); fprintf(stderr, "socket-server: create socket pair failed.\n"); return NULL; } if (sp_add(efd, fd[0], NULL)) { // add recvctrl_fd to event poll fprintf(stderr, "socket-server: can't add server fd to event pool.\n"); close(fd[0]); close(fd[1]); sp_release(efd); return NULL; } struct socket_server *ss = MALLOC(sizeof(*ss)); ss->event_fd = efd; ss->recvctrl_fd = fd[0]; ss->sendctrl_fd = fd[1]; ss->checkctrl = 1; for (i=0;islot[i]; s->type = SOCKET_TYPE_INVALID; clear_wb_list(&s->high); clear_wb_list(&s->low); } ss->alloc_id = 0; ss->event_n = 0; ss->event_index = 0; memset(&ss->soi, 0, sizeof(ss->soi)); FD_ZERO(&ss->rfds); assert(ss->recvctrl_fd < FD_SETSIZE); return ss; } static void free_wb_list(struct socket_server *ss, struct wb_list *list) { struct write_buffer *wb = list->head; while (wb) { struct write_buffer *tmp = wb; wb = wb->next; write_buffer_free(ss, tmp); } list->head = NULL; list->tail = NULL; } static void force_close(struct socket_server *ss, struct socket *s, struct socket_message *result) { result->id = s->id; result->ud = 0; result->data = NULL; result->opaque = s->opaque; if (s->type == SOCKET_TYPE_INVALID) { return; } assert(s->type != SOCKET_TYPE_RESERVE); free_wb_list(ss,&s->high); free_wb_list(ss,&s->low); if (s->type != SOCKET_TYPE_PACCEPT && s->type != SOCKET_TYPE_PLISTEN) { sp_del(ss->event_fd, s->fd); } if (s->type != SOCKET_TYPE_BIND) { close(s->fd); } s->type = SOCKET_TYPE_INVALID; } void socket_server_release(struct socket_server *ss) { int i; struct socket_message dummy; for (i=0;islot[i]; if (s->type != SOCKET_TYPE_RESERVE) { force_close(ss, s , &dummy); } } close(ss->sendctrl_fd); close(ss->recvctrl_fd); sp_release(ss->event_fd); FREE(ss); } static inline void check_wb_list(struct wb_list *s) { assert(s->head == NULL); assert(s->tail == NULL); } static struct socket * new_fd(struct socket_server *ss, int id, int fd, int protocol, uintptr_t opaque, bool add) { struct socket * s = &ss->slot[HASH_ID(id)]; assert(s->type == SOCKET_TYPE_RESERVE); if (add) { if (sp_add(ss->event_fd, fd, s)) { s->type = SOCKET_TYPE_INVALID; return NULL; } } s->id = id; s->fd = fd; s->protocol = protocol; s->p.size = MIN_READ_BUFFER; s->opaque = opaque; s->wb_size = 0; check_wb_list(&s->high); check_wb_list(&s->low); return s; } // return -1 when connecting static int open_socket(struct socket_server *ss, struct request_open * request, struct socket_message *result) { int id = request->id; result->opaque = request->opaque; result->id = id; result->ud = 0; result->data = NULL; struct socket *ns; int status; struct addrinfo ai_hints; struct addrinfo *ai_list = NULL; struct addrinfo *ai_ptr = NULL; char port[16]; sprintf(port, "%d", request->port); memset(&ai_hints, 0, sizeof( ai_hints ) ); ai_hints.ai_family = AF_UNSPEC; ai_hints.ai_socktype = SOCK_STREAM; ai_hints.ai_protocol = IPPROTO_TCP; status = getaddrinfo( request->host, port, &ai_hints, &ai_list ); if ( status != 0 ) { goto _failed; } int sock= -1; for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next ) { sock = socket( ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol ); if ( sock < 0 ) { continue; } socket_keepalive(sock); sp_nonblocking(sock); status = connect( sock, ai_ptr->ai_addr, ai_ptr->ai_addrlen); if ( status != 0 && errno != EINPROGRESS) { close(sock); sock = -1; continue; } break; } if (sock < 0) { goto _failed; } ns = new_fd(ss, id, sock, PROTOCOL_TCP, request->opaque, true); if (ns == NULL) { close(sock); goto _failed; } if(status == 0) { ns->type = SOCKET_TYPE_CONNECTED; struct sockaddr * addr = ai_ptr->ai_addr; void * sin_addr = (ai_ptr->ai_family == AF_INET) ? (void*)&((struct sockaddr_in *)addr)->sin_addr : (void*)&((struct sockaddr_in6 *)addr)->sin6_addr; if (inet_ntop(ai_ptr->ai_family, sin_addr, ss->buffer, sizeof(ss->buffer))) { result->data = ss->buffer; } freeaddrinfo( ai_list ); return SOCKET_OPEN; } else { ns->type = SOCKET_TYPE_CONNECTING; sp_write(ss->event_fd, ns->fd, ns, true); } freeaddrinfo( ai_list ); return -1; _failed: freeaddrinfo( ai_list ); ss->slot[HASH_ID(id)].type = SOCKET_TYPE_INVALID; return SOCKET_ERROR; } static int send_list_tcp(struct socket_server *ss, struct socket *s, struct wb_list *list, struct socket_message *result) { while (list->head) { struct write_buffer * tmp = list->head; for (;;) { int sz = write(s->fd, tmp->ptr, tmp->sz); if (sz < 0) { switch(errno) { case EINTR: continue; case EAGAIN: return -1; } force_close(ss,s, result); return SOCKET_CLOSE; } s->wb_size -= sz; if (sz != tmp->sz) { tmp->ptr += sz; tmp->sz -= sz; return -1; } break; } list->head = tmp->next; write_buffer_free(ss,tmp); } list->tail = NULL; return -1; } static socklen_t udp_socket_address(struct socket *s, const uint8_t udp_address[UDP_ADDRESS_SIZE], union sockaddr_all *sa) { int type = (uint8_t)udp_address[0]; if (type != s->protocol) return 0; uint16_t port = 0; memcpy(&port, udp_address+1, sizeof(uint16_t)); switch (s->protocol) { case PROTOCOL_UDP: memset(&sa->v4, 0, sizeof(sa->v4)); sa->s.sa_family = AF_INET; sa->v4.sin_port = port; memcpy(&sa->v4.sin_addr, udp_address + 1 + sizeof(uint16_t), sizeof(sa->v4.sin_addr)); // ipv4 address is 32 bits return sizeof(sa->v4); case PROTOCOL_UDPv6: memset(&sa->v6, 0, sizeof(sa->v6)); sa->s.sa_family = AF_INET6; sa->v6.sin6_port = port; memcpy(&sa->v6.sin6_addr, udp_address + 1 + sizeof(uint16_t), sizeof(sa->v6.sin6_addr)); // ipv6 address is 128 bits return sizeof(sa->v6); } return 0; } static int send_list_udp(struct socket_server *ss, struct socket *s, struct wb_list *list, struct socket_message *result) { while (list->head) { struct write_buffer * tmp = list->head; union sockaddr_all sa; socklen_t sasz = udp_socket_address(s, tmp->udp_address, &sa); int err = sendto(s->fd, tmp->ptr, tmp->sz, 0, &sa.s, sasz); if (err < 0) { switch(errno) { case EINTR: case EAGAIN: return -1; } fprintf(stderr, "socket-server : udp (%d) sendto error %s.\n",s->id, strerror(errno)); return -1; /* // ignore udp sendto error result->opaque = s->opaque; result->id = s->id; result->ud = 0; result->data = NULL; return SOCKET_ERROR; */ } s->wb_size -= tmp->sz; list->head = tmp->next; write_buffer_free(ss,tmp); } list->tail = NULL; return -1; } static int send_list(struct socket_server *ss, struct socket *s, struct wb_list *list, struct socket_message *result) { if (s->protocol == PROTOCOL_TCP) { return send_list_tcp(ss, s, list, result); } else { return send_list_udp(ss, s, list, result); } } static inline int list_uncomplete(struct wb_list *s) { struct write_buffer *wb = s->head; if (wb == NULL) return 0; return (void *)wb->ptr != wb->buffer; } static void raise_uncomplete(struct socket * s) { struct wb_list *low = &s->low; struct write_buffer *tmp = low->head; low->head = tmp->next; if (low->head == NULL) { low->tail = NULL; } // move head of low list (tmp) to the empty high list struct wb_list *high = &s->high; assert(high->head == NULL); tmp->next = NULL; high->head = high->tail = tmp; } /* Each socket has two write buffer list, high priority and low priority. 1. send high list as far as possible. 2. If high list is empty, try to send low list. 3. If low list head is uncomplete (send a part before), move the head of low list to empty high list (call raise_uncomplete) . 4. If two lists are both empty, turn off the event. (call check_close) */ static int send_buffer(struct socket_server *ss, struct socket *s, struct socket_message *result) { assert(!list_uncomplete(&s->low)); // step 1 if (send_list(ss,s,&s->high,result) == SOCKET_CLOSE) { return SOCKET_CLOSE; } if (s->high.head == NULL) { // step 2 if (s->low.head != NULL) { if (send_list(ss,s,&s->low,result) == SOCKET_CLOSE) { return SOCKET_CLOSE; } // step 3 if (list_uncomplete(&s->low)) { raise_uncomplete(s); } } else { // step 4 sp_write(ss->event_fd, s->fd, s, false); if (s->type == SOCKET_TYPE_HALFCLOSE) { force_close(ss, s, result); return SOCKET_CLOSE; } } } return -1; } static struct write_buffer * append_sendbuffer_(struct socket_server *ss, struct wb_list *s, struct request_send * request, int size, int n) { struct write_buffer * buf = MALLOC(size); struct send_object so; buf->userobject = send_object_init(ss, &so, request->buffer, request->sz); buf->ptr = (char*)so.buffer+n; buf->sz = so.sz - n; buf->buffer = request->buffer; buf->next = NULL; if (s->head == NULL) { s->head = s->tail = buf; } else { assert(s->tail != NULL); assert(s->tail->next == NULL); s->tail->next = buf; s->tail = buf; } return buf; } static inline void append_sendbuffer_udp(struct socket_server *ss, struct socket *s, int priority, struct request_send * request, const uint8_t udp_address[UDP_ADDRESS_SIZE]) { struct wb_list *wl = (priority == PRIORITY_HIGH) ? &s->high : &s->low; struct write_buffer *buf = append_sendbuffer_(ss, wl, request, SIZEOF_UDPBUFFER, 0); memcpy(buf->udp_address, udp_address, UDP_ADDRESS_SIZE); s->wb_size += buf->sz; } static inline void append_sendbuffer(struct socket_server *ss, struct socket *s, struct request_send * request, int n) { struct write_buffer *buf = append_sendbuffer_(ss, &s->high, request, SIZEOF_TCPBUFFER, n); s->wb_size += buf->sz; } static inline void append_sendbuffer_low(struct socket_server *ss,struct socket *s, struct request_send * request) { struct write_buffer *buf = append_sendbuffer_(ss, &s->low, request, SIZEOF_TCPBUFFER, 0); s->wb_size += buf->sz; } static inline int send_buffer_empty(struct socket *s) { return (s->high.head == NULL && s->low.head == NULL); } /* When send a package , we can assign the priority : PRIORITY_HIGH or PRIORITY_LOW If socket buffer is empty, write to fd directly. If write a part, append the rest part to high list. (Even priority is PRIORITY_LOW) Else append package to high (PRIORITY_HIGH) or low (PRIORITY_LOW) list. */ static int send_socket(struct socket_server *ss, struct request_send * request, struct socket_message *result, int priority, const uint8_t *udp_address) { int id = request->id; struct socket * s = &ss->slot[HASH_ID(id)]; struct send_object so; send_object_init(ss, &so, request->buffer, request->sz); if (s->type == SOCKET_TYPE_INVALID || s->id != id || s->type == SOCKET_TYPE_HALFCLOSE || s->type == SOCKET_TYPE_PACCEPT) { so.free_func(request->buffer); return -1; } assert(s->type != SOCKET_TYPE_PLISTEN && s->type != SOCKET_TYPE_LISTEN); if (send_buffer_empty(s) && s->type == SOCKET_TYPE_CONNECTED) { if (s->protocol == PROTOCOL_TCP) { int n = write(s->fd, so.buffer, so.sz); if (n<0) { switch(errno) { case EINTR: case EAGAIN: n = 0; break; default: fprintf(stderr, "socket-server: write to %d (fd=%d) error :%s.\n",id,s->fd,strerror(errno)); force_close(ss,s,result); return SOCKET_CLOSE; } } if (n == so.sz) { so.free_func(request->buffer); return -1; } append_sendbuffer(ss, s, request, n); // add to high priority list, even priority == PRIORITY_LOW } else { // udp if (udp_address == NULL) { udp_address = s->p.udp_address; } union sockaddr_all sa; socklen_t sasz = udp_socket_address(s, udp_address, &sa); int n = sendto(s->fd, so.buffer, so.sz, 0, &sa.s, sasz); if (n != so.sz) { append_sendbuffer_udp(ss,s,priority,request,udp_address); } else { so.free_func(request->buffer); return -1; } } sp_write(ss->event_fd, s->fd, s, true); } else { if (s->protocol == PROTOCOL_TCP) { if (priority == PRIORITY_LOW) { append_sendbuffer_low(ss, s, request); } else { append_sendbuffer(ss, s, request, 0); } } else { if (udp_address == NULL) { udp_address = s->p.udp_address; } append_sendbuffer_udp(ss,s,priority,request,udp_address); } } return -1; } static int listen_socket(struct socket_server *ss, struct request_listen * request, struct socket_message *result) { int id = request->id; int listen_fd = request->fd; struct socket *s = new_fd(ss, id, listen_fd, PROTOCOL_TCP, request->opaque, false); if (s == NULL) { goto _failed; } s->type = SOCKET_TYPE_PLISTEN; return -1; _failed: close(listen_fd); result->opaque = request->opaque; result->id = id; result->ud = 0; result->data = NULL; ss->slot[HASH_ID(id)].type = SOCKET_TYPE_INVALID; return SOCKET_ERROR; } static int close_socket(struct socket_server *ss, struct request_close *request, struct socket_message *result) { int id = request->id; struct socket * s = &ss->slot[HASH_ID(id)]; if (s->type == SOCKET_TYPE_INVALID || s->id != id) { result->id = id; result->opaque = request->opaque; result->ud = 0; result->data = NULL; return SOCKET_CLOSE; } if (!send_buffer_empty(s)) { int type = send_buffer(ss,s,result); if (type != -1) return type; } if (send_buffer_empty(s)) { force_close(ss,s,result); result->id = id; result->opaque = request->opaque; return SOCKET_CLOSE; } s->type = SOCKET_TYPE_HALFCLOSE; return -1; } static int bind_socket(struct socket_server *ss, struct request_bind *request, struct socket_message *result) { int id = request->id; result->id = id; result->opaque = request->opaque; result->ud = 0; struct socket *s = new_fd(ss, id, request->fd, PROTOCOL_TCP, request->opaque, true); if (s == NULL) { result->data = NULL; return SOCKET_ERROR; } sp_nonblocking(request->fd); s->type = SOCKET_TYPE_BIND; result->data = "binding"; return SOCKET_OPEN; } static int start_socket(struct socket_server *ss, struct request_start *request, struct socket_message *result) { int id = request->id; result->id = id; result->opaque = request->opaque; result->ud = 0; result->data = NULL; struct socket *s = &ss->slot[HASH_ID(id)]; if (s->type == SOCKET_TYPE_INVALID || s->id !=id) { return SOCKET_ERROR; } if (s->type == SOCKET_TYPE_PACCEPT || s->type == SOCKET_TYPE_PLISTEN) { if (sp_add(ss->event_fd, s->fd, s)) { s->type = SOCKET_TYPE_INVALID; return SOCKET_ERROR; } s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN; s->opaque = request->opaque; result->data = "start"; return SOCKET_OPEN; } else if (s->type == SOCKET_TYPE_CONNECTED) { s->opaque = request->opaque; result->data = "transfer"; return SOCKET_OPEN; } return -1; } static void setopt_socket(struct socket_server *ss, struct request_setopt *request) { int id = request->id; struct socket *s = &ss->slot[HASH_ID(id)]; if (s->type == SOCKET_TYPE_INVALID || s->id !=id) { return; } int v = request->value; setsockopt(s->fd, IPPROTO_TCP, request->what, &v, sizeof(v)); } static void block_readpipe(int pipefd, void *buffer, int sz) { for (;;) { int n = read(pipefd, buffer, sz); if (n<0) { if (errno == EINTR) continue; fprintf(stderr, "socket-server : read pipe error %s.\n",strerror(errno)); return; } // must atomic read from a pipe assert(n == sz); return; } } static int has_cmd(struct socket_server *ss) { struct timeval tv = {0,0}; int retval; FD_SET(ss->recvctrl_fd, &ss->rfds); retval = select(ss->recvctrl_fd+1, &ss->rfds, NULL, NULL, &tv); if (retval == 1) { return 1; } return 0; } static void add_udp_socket(struct socket_server *ss, struct request_udp *udp) { int id = udp->id; int protocol; if (udp->family == AF_INET6) { protocol = PROTOCOL_UDPv6; } else { protocol = PROTOCOL_UDP; } struct socket *ns = new_fd(ss, id, udp->fd, protocol, udp->opaque, true); if (ns == NULL) { close(udp->fd); ss->slot[HASH_ID(id)].type = SOCKET_TYPE_INVALID; return; } ns->type = SOCKET_TYPE_CONNECTED; memset(ns->p.udp_address, 0, sizeof(ns->p.udp_address)); } static int set_udp_address(struct socket_server *ss, struct request_setudp *request, struct socket_message *result) { int id = request->id; struct socket *s = &ss->slot[HASH_ID(id)]; if (s->type == SOCKET_TYPE_INVALID || s->id !=id) { return -1; } int type = request->address[0]; if (type != s->protocol) { // protocol mismatch result->opaque = s->opaque; result->id = s->id; result->ud = 0; result->data = NULL; return SOCKET_ERROR; } if (type == PROTOCOL_UDP) { memcpy(s->p.udp_address, request->address, 1+2+4); // 1 type, 2 port, 4 ipv4 } else { memcpy(s->p.udp_address, request->address, 1+2+16); // 1 type, 2 port, 16 ipv6 } return -1; } // return type static int ctrl_cmd(struct socket_server *ss, struct socket_message *result) { int fd = ss->recvctrl_fd; // the length of message is one byte, so 256+8 buffer size is enough. uint8_t buffer[256]; uint8_t header[2]; block_readpipe(fd, header, sizeof(header)); int type = header[0]; int len = header[1]; block_readpipe(fd, buffer, len); // ctrl command only exist in local fd, so don't worry about endian. switch (type) { case 'S': return start_socket(ss,(struct request_start *)buffer, result); case 'B': return bind_socket(ss,(struct request_bind *)buffer, result); case 'L': return listen_socket(ss,(struct request_listen *)buffer, result); case 'K': return close_socket(ss,(struct request_close *)buffer, result); case 'O': return open_socket(ss, (struct request_open *)buffer, result); case 'X': result->opaque = 0; result->id = 0; result->ud = 0; result->data = NULL; return SOCKET_EXIT; case 'D': return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_HIGH, NULL); case 'P': return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_LOW, NULL); case 'A': { struct request_send_udp * rsu = (struct request_send_udp *)buffer; return send_socket(ss, &rsu->send, result, PRIORITY_HIGH, rsu->address); } case 'C': return set_udp_address(ss, (struct request_setudp *)buffer, result); case 'T': setopt_socket(ss, (struct request_setopt *)buffer); return -1; case 'U': add_udp_socket(ss, (struct request_udp *)buffer); return -1; default: fprintf(stderr, "socket-server: Unknown ctrl %c.\n",type); return -1; }; return -1; } // return -1 (ignore) when error static int forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_message * result) { int sz = s->p.size; char * buffer = MALLOC(sz); int n = (int)read(s->fd, buffer, sz); if (n<0) { FREE(buffer); switch(errno) { case EINTR: break; case EAGAIN: fprintf(stderr, "socket-server: EAGAIN capture.\n"); break; default: // close when error force_close(ss, s, result); return SOCKET_ERROR; } return -1; } if (n==0) { FREE(buffer); force_close(ss, s, result); return SOCKET_CLOSE; } if (s->type == SOCKET_TYPE_HALFCLOSE) { // discard recv data FREE(buffer); return -1; } if (n == sz) { s->p.size *= 2; } else if (sz > MIN_READ_BUFFER && n*2 < sz) { s->p.size /= 2; } result->opaque = s->opaque; result->id = s->id; result->ud = n; result->data = buffer; return SOCKET_DATA; } static int gen_udp_address(int protocol, union sockaddr_all *sa, uint8_t * udp_address) { int addrsz = 1; udp_address[0] = (uint8_t)protocol; if (protocol == PROTOCOL_UDP) { memcpy(udp_address+addrsz, &sa->v4.sin_port, sizeof(sa->v4.sin_port)); addrsz += sizeof(sa->v4.sin_port); memcpy(udp_address+addrsz, &sa->v4.sin_addr, sizeof(sa->v4.sin_addr)); addrsz += sizeof(sa->v4.sin_addr); } else { memcpy(udp_address+addrsz, &sa->v6.sin6_port, sizeof(sa->v6.sin6_port)); addrsz += sizeof(sa->v6.sin6_port); memcpy(udp_address+addrsz, &sa->v6.sin6_addr, sizeof(sa->v6.sin6_addr)); addrsz += sizeof(sa->v6.sin6_addr); } return addrsz; } static int forward_message_udp(struct socket_server *ss, struct socket *s, struct socket_message * result) { union sockaddr_all sa; socklen_t slen = sizeof(sa); int n = recvfrom(s->fd, ss->udpbuffer,MAX_UDP_PACKAGE,0,&sa.s,&slen); if (n<0) { switch(errno) { case EINTR: case EAGAIN: break; default: // close when error force_close(ss, s, result); return SOCKET_ERROR; } return -1; } uint8_t * data; if (slen == sizeof(sa.v4)) { if (s->protocol != PROTOCOL_UDP) return -1; data = MALLOC(n + 1 + 2 + 4); gen_udp_address(PROTOCOL_UDP, &sa, data + n); } else { if (s->protocol != PROTOCOL_UDPv6) return -1; data = MALLOC(n + 1 + 2 + 16); gen_udp_address(PROTOCOL_UDPv6, &sa, data + n); } memcpy(data, ss->udpbuffer, n); result->opaque = s->opaque; result->id = s->id; result->ud = n; result->data = (char *)data; return SOCKET_UDP; } static int report_connect(struct socket_server *ss, struct socket *s, struct socket_message *result) { int error; socklen_t len = sizeof(error); int code = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &error, &len); if (code < 0 || error) { force_close(ss,s, result); return SOCKET_ERROR; } else { s->type = SOCKET_TYPE_CONNECTED; result->opaque = s->opaque; result->id = s->id; result->ud = 0; if (send_buffer_empty(s)) { sp_write(ss->event_fd, s->fd, s, false); } union sockaddr_all u; socklen_t slen = sizeof(u); if (getpeername(s->fd, &u.s, &slen) == 0) { void * sin_addr = (u.s.sa_family == AF_INET) ? (void*)&u.v4.sin_addr : (void *)&u.v6.sin6_addr; if (inet_ntop(u.s.sa_family, sin_addr, ss->buffer, sizeof(ss->buffer))) { result->data = ss->buffer; return SOCKET_OPEN; } } result->data = NULL; return SOCKET_OPEN; } } // return 0 when failed static int report_accept(struct socket_server *ss, struct socket *s, struct socket_message *result) { union sockaddr_all u; socklen_t len = sizeof(u); int client_fd = accept(s->fd, &u.s, &len); if (client_fd < 0) { return 0; } int id = reserve_id(ss); if (id < 0) { close(client_fd); return 0; } socket_keepalive(client_fd); sp_nonblocking(client_fd); struct socket *ns = new_fd(ss, id, client_fd, PROTOCOL_TCP, s->opaque, false); if (ns == NULL) { close(client_fd); return 0; } ns->type = SOCKET_TYPE_PACCEPT; result->opaque = s->opaque; result->id = s->id; result->ud = id; result->data = NULL; void * sin_addr = (u.s.sa_family == AF_INET) ? (void*)&u.v4.sin_addr : (void *)&u.v6.sin6_addr; int sin_port = ntohs((u.s.sa_family == AF_INET) ? u.v4.sin_port : u.v6.sin6_port); char tmp[INET6_ADDRSTRLEN]; if (inet_ntop(u.s.sa_family, sin_addr, tmp, sizeof(tmp))) { snprintf(ss->buffer, sizeof(ss->buffer), "%s:%d", tmp, sin_port); result->data = ss->buffer; } return 1; } static inline void clear_closed_event(struct socket_server *ss, struct socket_message * result, int type) { if (type == SOCKET_CLOSE || type == SOCKET_ERROR) { int id = result->id; int i; for (i=ss->event_index; ievent_n; i++) { struct event *e = &ss->ev[i]; struct socket *s = e->s; if (s) { if (s->type == SOCKET_TYPE_INVALID && s->id == id) { e->s = NULL; } } } } } // return type int socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) { for (;;) { if (ss->checkctrl) { if (has_cmd(ss)) { int type = ctrl_cmd(ss, result); if (type != -1) { clear_closed_event(ss, result, type); return type; } else continue; } else { ss->checkctrl = 0; } } if (ss->event_index == ss->event_n) { ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT); ss->checkctrl = 1; if (more) { *more = 0; } ss->event_index = 0; if (ss->event_n <= 0) { ss->event_n = 0; return -1; } } struct event *e = &ss->ev[ss->event_index++]; struct socket *s = e->s; if (s == NULL) { // dispatch pipe message at beginning continue; } switch (s->type) { case SOCKET_TYPE_CONNECTING: return report_connect(ss, s, result); case SOCKET_TYPE_LISTEN: if (report_accept(ss, s, result)) { return SOCKET_ACCEPT; } break; case SOCKET_TYPE_INVALID: fprintf(stderr, "socket-server: invalid socket\n"); break; default: if (e->read) { int type; if (s->protocol == PROTOCOL_TCP) { type = forward_message_tcp(ss, s, result); } else { type = forward_message_udp(ss, s, result); if (type == SOCKET_UDP) { // try read again --ss->event_index; return SOCKET_UDP; } } if (e->write) { // Try to dispatch write message next step if write flag set. e->read = false; --ss->event_index; } if (type == -1) break; clear_closed_event(ss, result, type); return type; } if (e->write) { int type = send_buffer(ss, s, result); if (type == -1) break; clear_closed_event(ss, result, type); return type; } break; } } } static void send_request(struct socket_server *ss, struct request_package *request, char type, int len) { request->header[6] = (uint8_t)type; request->header[7] = (uint8_t)len; for (;;) { int n = write(ss->sendctrl_fd, &request->header[6], len+2); if (n<0) { if (errno != EINTR) { fprintf(stderr, "socket-server : send ctrl command error %s.\n", strerror(errno)); } continue; } assert(n == len+2); return; } } static int open_request(struct socket_server *ss, struct request_package *req, uintptr_t opaque, const char *addr, int port) { int len = strlen(addr); if (len + sizeof(req->u.open) > 256) { fprintf(stderr, "socket-server : Invalid addr %s.\n",addr); return -1; } int id = reserve_id(ss); if (id < 0) return -1; req->u.open.opaque = opaque; req->u.open.id = id; req->u.open.port = port; memcpy(req->u.open.host, addr, len); req->u.open.host[len] = '\0'; return len; } int socket_server_connect(struct socket_server *ss, uintptr_t opaque, const char * addr, int port) { struct request_package request; int len = open_request(ss, &request, opaque, addr, port); if (len < 0) return -1; send_request(ss, &request, 'O', sizeof(request.u.open) + len); return request.u.open.id; } static void free_buffer(struct socket_server *ss, const void * buffer, int sz) { struct send_object so; send_object_init(ss, &so, (void *)buffer, sz); so.free_func((void *)buffer); } // return -1 when error int64_t socket_server_send(struct socket_server *ss, int id, const void * buffer, int sz) { struct socket * s = &ss->slot[HASH_ID(id)]; if (s->id != id || s->type == SOCKET_TYPE_INVALID) { free_buffer(ss, buffer, sz); return -1; } struct request_package request; request.u.send.id = id; request.u.send.sz = sz; request.u.send.buffer = (char *)buffer; send_request(ss, &request, 'D', sizeof(request.u.send)); return s->wb_size; } void socket_server_send_lowpriority(struct socket_server *ss, int id, const void * buffer, int sz) { struct socket * s = &ss->slot[HASH_ID(id)]; if (s->id != id || s->type == SOCKET_TYPE_INVALID) { free_buffer(ss, buffer, sz); return; } struct request_package request; request.u.send.id = id; request.u.send.sz = sz; request.u.send.buffer = (char *)buffer; send_request(ss, &request, 'P', sizeof(request.u.send)); } void socket_server_exit(struct socket_server *ss) { struct request_package request; send_request(ss, &request, 'X', 0); } void socket_server_close(struct socket_server *ss, uintptr_t opaque, int id) { struct request_package request; request.u.close.id = id; request.u.close.opaque = opaque; send_request(ss, &request, 'K', sizeof(request.u.close)); } // return -1 means failed // or return AF_INET or AF_INET6 static int do_bind(const char *host, int port, int protocol, int *family) { int fd; int status; int reuse = 1; struct addrinfo ai_hints; struct addrinfo *ai_list = NULL; char portstr[16]; if (host == NULL || host[0] == 0) { host = "0.0.0.0"; // INADDR_ANY } sprintf(portstr, "%d", port); memset( &ai_hints, 0, sizeof( ai_hints ) ); ai_hints.ai_family = AF_UNSPEC; if (protocol == IPPROTO_TCP) { ai_hints.ai_socktype = SOCK_STREAM; } else { assert(protocol == IPPROTO_UDP); ai_hints.ai_socktype = SOCK_DGRAM; } ai_hints.ai_protocol = protocol; status = getaddrinfo( host, portstr, &ai_hints, &ai_list ); if ( status != 0 ) { return -1; } *family = ai_list->ai_family; fd = socket(*family, ai_list->ai_socktype, 0); if (fd < 0) { goto _failed_fd; } if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(int))==-1) { goto _failed; } status = bind(fd, (struct sockaddr *)ai_list->ai_addr, ai_list->ai_addrlen); if (status != 0) goto _failed; freeaddrinfo( ai_list ); return fd; _failed: close(fd); _failed_fd: freeaddrinfo( ai_list ); return -1; } static int do_listen(const char * host, int port, int backlog) { int family = 0; int listen_fd = do_bind(host, port, IPPROTO_TCP, &family); if (listen_fd < 0) { return -1; } if (listen(listen_fd, backlog) == -1) { close(listen_fd); return -1; } return listen_fd; } int socket_server_listen(struct socket_server *ss, uintptr_t opaque, const char * addr, int port, int backlog) { int fd = do_listen(addr, port, backlog); if (fd < 0) { return -1; } struct request_package request; int id = reserve_id(ss); if (id < 0) { close(fd); return id; } request.u.listen.opaque = opaque; request.u.listen.id = id; request.u.listen.fd = fd; send_request(ss, &request, 'L', sizeof(request.u.listen)); return id; } int socket_server_bind(struct socket_server *ss, uintptr_t opaque, int fd) { struct request_package request; int id = reserve_id(ss); if (id < 0) return -1; request.u.bind.opaque = opaque; request.u.bind.id = id; request.u.bind.fd = fd; send_request(ss, &request, 'B', sizeof(request.u.bind)); return id; } void socket_server_start(struct socket_server *ss, uintptr_t opaque, int id) { struct request_package request; request.u.start.id = id; request.u.start.opaque = opaque; send_request(ss, &request, 'S', sizeof(request.u.start)); } void socket_server_nodelay(struct socket_server *ss, int id) { struct request_package request; request.u.setopt.id = id; request.u.setopt.what = TCP_NODELAY; request.u.setopt.value = 1; send_request(ss, &request, 'T', sizeof(request.u.setopt)); } void socket_server_userobject(struct socket_server *ss, struct socket_object_interface *soi) { ss->soi = *soi; } // UDP int socket_server_udp(struct socket_server *ss, uintptr_t opaque, const char * addr, int port) { int fd; int family; if (port != 0 || addr != NULL) { // bind fd = do_bind(addr, port, IPPROTO_UDP, &family); if (fd < 0) { return -1; } } else { family = AF_INET; fd = socket(family, SOCK_DGRAM, 0); if (fd < 0) { return -1; } } sp_nonblocking(fd); int id = reserve_id(ss); if (id < 0) { close(fd); return -1; } struct request_package request; request.u.udp.id = id; request.u.udp.fd = fd; request.u.udp.opaque = opaque; request.u.udp.family = family; send_request(ss, &request, 'U', sizeof(request.u.udp)); return id; } int64_t socket_server_udp_send(struct socket_server *ss, int id, const struct socket_udp_address *addr, const void *buffer, int sz) { struct socket * s = &ss->slot[HASH_ID(id)]; if (s->id != id || s->type == SOCKET_TYPE_INVALID) { free_buffer(ss, buffer, sz); return -1; } struct request_package request; request.u.send_udp.send.id = id; request.u.send_udp.send.sz = sz; request.u.send_udp.send.buffer = (char *)buffer; const uint8_t *udp_address = (const uint8_t *)addr; int addrsz; switch (udp_address[0]) { case PROTOCOL_UDP: addrsz = 1+2+4; // 1 type, 2 port, 4 ipv4 break; case PROTOCOL_UDPv6: addrsz = 1+2+16; // 1 type, 2 port, 16 ipv6 break; default: free_buffer(ss, buffer, sz); return -1; } memcpy(request.u.send_udp.address, udp_address, addrsz); send_request(ss, &request, 'A', sizeof(request.u.send_udp.send)+addrsz); return s->wb_size; } int socket_server_udp_connect(struct socket_server *ss, int id, const char * addr, int port) { int status; struct addrinfo ai_hints; struct addrinfo *ai_list = NULL; char portstr[16]; sprintf(portstr, "%d", port); memset( &ai_hints, 0, sizeof( ai_hints ) ); ai_hints.ai_family = AF_UNSPEC; ai_hints.ai_socktype = SOCK_DGRAM; ai_hints.ai_protocol = IPPROTO_UDP; status = getaddrinfo(addr, portstr, &ai_hints, &ai_list ); if ( status != 0 ) { return -1; } struct request_package request; request.u.set_udp.id = id; int protocol; if (ai_list->ai_family == AF_INET) { protocol = PROTOCOL_UDP; } else if (ai_list->ai_family == AF_INET6) { protocol = PROTOCOL_UDPv6; } else { freeaddrinfo( ai_list ); return -1; } int addrsz = gen_udp_address(protocol, (union sockaddr_all *)ai_list->ai_addr, request.u.set_udp.address); freeaddrinfo( ai_list ); send_request(ss, &request, 'C', sizeof(request.u.set_udp) - sizeof(request.u.set_udp.address) +addrsz); return 0; } const struct socket_udp_address * socket_server_udp_address(struct socket_server *ss, struct socket_message *msg, int *addrsz) { uint8_t * address = (uint8_t *)(msg->data + msg->ud); int type = address[0]; switch(type) { case PROTOCOL_UDP: *addrsz = 1+2+4; break; case PROTOCOL_UDPv6: *addrsz = 1+2+16; break; default: return NULL; } return (const struct socket_udp_address *)address; }