一、epoll模型的构建
由于网络服务高并发的需求,一般socket网络模型都采用epoll模型,有关epoll模型的原理在相关论坛中有许多讲述,在此不做重复讲解,主要讲一讲epoll模型的封装实现。
EPoller类的具体实现如下代码所示:
class EPoller{public: int Create(int maxfd); void JoinSocket(Socket* sock, unsigned int flag); void DelSocket(CEpollSocket* sock); int LoopForEvent(int timeout); int EpollWait(int timeout);protected: int epoll_fd; //epoll文件描述符 epoll_event* p_events; //用于保存epoll_wait返回的事件 int max_fd; //最大监听的文件描述符数量};// 创建epoll模型int EPoller::Create(int maxfd){ max_fd = maxfd ; epoll_fd = epoll_create(maxfd); p_events = new epoll_event[maxfd]; memset(p_events, 0, sizeof(epoll_event) * maxfd); return 0;}//将某个fd添加至监听的队列中,并利用flag设置监听的事件类型//其中Socket为基类int EPoller::JoinSocket(Socket* sock, unsigned int flag){ int fd = sock->GetSockHandle(); epoll_event event; event.data.ptr = sock; event.events = flag|EPOLLHUP|EPOLLERR; if (epoll_ctl(_epoll_fd, EPOLL_CTL_MOD , fd, &ev) < 0) { if (epoll_ctl(_epoll_fd, EPOLL_CTL_ADD , fd, &ev) < 0) { return -1; } } return 0;}//删除指定的fdint EPoller::DelSocket(Socket* sock){ int fd = sock->GetSockHandle();//利用Socket对象取得相应的文件句柄 if ( fd > 0 ) { epoll_event ev; ev.data.ptr = sock; ev.events = 0; if (epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, &ev) < 0) { return -1; } return 0; } return -1;}//等待监控的事件触发int EPoller::EpollWait(int timeout){ int fd; int nfds; Socket* sock; unsigned int flag; nfds = epoll_wait(epoll_fd, p_events, max_fd, timeout); if (nfds < 0) { return -1; } else if ( nfds == 0 ) { return 0 ; } for (int i=0; idata.ptr); if ( sock == NULL ) { continue; } flag = (p_events+i)->events; if (flag & EPOLLIN) { sock->Recv(); } else if (flag & EPOLLOUT) { sock->Send(); } else if (flag & EPOLLHUP) { sock->Close(); } else if (flag & EPOLLERR) { sock->Error(); } } return nfds ;}// 循环等待事件的触发int EPoller::LoopForEvent(int timeout){ while(true) { int result = EpollWait(timeout); if( result <= 0 ) { return result; } }}
其中的Socket类型作为其他网络通信服务类的基类,主要为其他的socket网络通信类提供一个统一的入口,其结构定义如下:
class Socket{ friend class EPoller;public: int GetSockHandle(); virtual int SendMsg(char* buffer, int length); protected: virtual int Recv(); virtual int Send(); virtual int Close(); virtual int Error(); EPoller* epoller; int fd; } int Socket::SendMsg(char* buff, int len) { int ret; int send_len = 0; int _len = len; errno = 0; while(_len > 0) { ret = send(fd, (void*)(buff+send_len), _len, 0); if (ret <= 0) { if (errno == EINTR || errno == EAGAIN) { usleep(10); continue; } break; } send_len += ret; _len -= ret; } return send_len; }
二、UDP网络通信的实现
//Udp创建socket的流程int CreateUdpSocket(){ // 1.创建socket sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); // 2.设置非阻塞 fcntl(sock_fd, F_GETFL, 0); fcntl(sock_fd, F_SETFL, val | O_NONBLOCK); // 3.绑定端口与ip struct sockaddr_in addr; string ip = "0.0.0.0"; addr.sin_family = AF_INET; addr.sin_port = htons(5678); ip2uint(ip.c_str(), &(addr.sin_addr)); bind(sock_fd, (struct sockaddr *)&addr, sizeof(struct sockaddr_in)); // 4.设置socket缓存大小 int recv_buf_size = 1024; int send_buf_size = 1024; setsockopt(sock_fd, SOL_SOCKET, SO_RCVBUF, (char *)&recv_buf_size, sizeof(recv_buf_size)); setsockopt(sock_fd, SOL_SOCKET, SO_SNDBUF, (char *)&send_buf_size, sizeof(send_buf_size)); return 0;}// 接收数据if ((len = recvfrom(sock, buffer, len, 0, (struct sockaddr *)&addr, &addrlen)) < 0){ if (errno == EAGAIN || errno == EINTR) { len = 0; return 0; } else { return -1; }}
三、TCP网络通信的实现
//Tcp创建bool CreateTcpServer(){ //建立TCP套接字 sock_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); struct sockaddr_in svr_addr= { 0}; svr_addr.sin_port = htons(port); svr_addr.sin_family = AF_INET; inet_pton(AF_INET, ip.c_str(), &(svr_addr.sin_addr)); // 绑定ip地址 if(bind(sock_fd, (struct sockaddr*)&svr_addr, sizeof(struct sockaddr_in)) < 0) // 设置socket属性 int recv_buf_size = 1024 * 10; int send_buf_size = 1024 * 10; if (setsockopt(_sock_fd, SOL_SOCKET, SO_RCVBUF, (char*)&recv_buf_size, sizeof(recv_buf_size)) < 0) if(setsockopt(_sock_fd, SOL_SOCKET, SO_SNDBUF, (char*)&send_buf_size, sizeof(send_buf_size)) < 0) int reuse=1; if(setsockopt(_sock_fd, SOL_SOCKET, SO_REUSEADDR, (char*)&reuse, sizeof(reuse)) < 0) // 设置非阻塞 int val = fcntl(_sock_fd, F_GETFL, 0); if(fcntl(_sock_fd, F_SETFL, val | O_NONBLOCK) == -1) // 监听socket if(listen(sock_fd,1024) < 0) { close(sock_fd); sock_fd = -1; return false; } return true ;}// 一种hash值的生成算法int string2hash(const char * str, const int& len, const int& hash_size){ int hashvalue = 0; for (int i = len-1; i < len; ++i) { hashvalue = ((unsigned char)(*(str + i)) + hashvalue) % hash_size; } return hashvalue;}//接收数据流程int TCP_recv(){ char recv_buff[1024]; int total_recv_len = 0; int clientfd = accept(_sock_fd,(struct sockaddr *)NULL,(socklen_t*)NULL); if (clientfd <= 0) { return -1; } //设置发送缓冲区和接收缓冲区大小 int send_buff_size = 1024; int recv_buff_size = 1024; int set_result = 0; set_result = setsockopt(clientfd, SOL_SOCKET, SO_RCVBUF, (char*)&recv_buff_size, sizeof(recv_buff_size)); set_result = setsockopt(clientfd, SOL_SOCKET, SO_SNDBUF, (char*)&send_buff_size, sizeof(send_buff_size)); int free_len = sizeof(recv_buff) - total_recv_len ; if(free_len <= 0 || total_recv_len >= 1024) { //包太长了 return 0; } //接收数据 int recv_len = 0 ; do { recv_len = recv(sock_fd, &recv_buff[total_recv_len], free_len, 0); } while((recv_len < 0) && (errno == EINTR)); if(recv_len <= 0) { // 对端连接关闭 if( recv_len == 0 ) { return 0; } else if( errno == EAGAIN) { //暂时阻塞 return 0; } else { return 0; } } total_recv_len += recv_len; // 解析命令 int offset = ParseCmd(recv_buff, total_recv_len); if(0 == offset) { //未接收完 return 0; } // 完成解析后,将已处理后的数据清空,使留下的未处理数据移到缓存区的起始位置。 else if(0 < offset) { //解析成功,单条命令 total_recv_len = total_recv_len - offset; memmove(recv_buff, recv_buff + offset, total_recv_len); } return 0;}// 与服务端建立socket长连接的方法 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); int recv_buf_size = 1024; int send_buf_size = 1024; setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (char *)&recv_buf_size, sizeof(recv_buf_size)); setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (char *)&send_buf_size, sizeof(send_buf_size)); struct sockaddr_in addr; memset(&addr, 0, sizeof(struct sockaddr_in)); addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip.c_str()); int argc = fcntl(fd, F_GETFL, 0); if (fcntl(fd, F_SETFL, argc | O_NONBLOCK) == -1) { close(fd); return false; } int ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr)); if (0 > ret) { bool bConnected = false; struct pollfd conncet_client[1]; int _nfd = 1; memset(&_conncet_client[0], 0, sizeof(pollfd)); conncet_client[0].fd = fd; conncet_client[0].events = POLLOUT; int can_write = ::poll(_conncet_client, _nfd, (int)(1000)); if (can_write > 0 && (_conncet_client[0].revents & POLLOUT) > 0) { bConnected = true; } }