手机网站搜索框代码,网址收录,本地搭建的wordpress怎么外网访问,购物网站主页模版文章目录1.多路I/O转接服务器2.select3.select代码4.poll5.epoll5.1 基础API5.3 epoll代码5.4 边沿触发和水平触发5.4.1 水平出发LT5.4.2 边缘触发5.4.3 服务器的边缘触发和水平触发5.4 边缘触发但是能一次读完6.epoll反应堆模型6.1 反应堆模型6.2 epoll反应堆代码7.心跳包8.线…
文章目录1.多路I/O转接服务器2.select3.select代码4.poll5.epoll5.1 基础API5.3 epoll代码5.4 边沿触发和水平触发5.4.1 水平出发LT5.4.2 边缘触发5.4.3 服务器的边缘触发和水平触发5.4 边缘触发但是能一次读完6.epoll反应堆模型6.1 反应堆模型6.2 epoll反应堆代码7.心跳包8.线程池8.1 线程池代码8.2 请问怎么实现线程池1.多路I/O转接服务器
多路IO转接服务器也叫做多任务IO服务器。该类服务器实现的主旨思想是不再由应用程序自己监视客户端连接取而代之由内核替应用程序监视文件。
2.select
#include sys/select.h
#include sys/time.h
#include sys/types.h
#include unistd.h
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);nfds: 监控的文件描述符集里最大文件描述符加1因为此参数会告诉内核检测前多少个文件描述符的状态readfds 监控有读数据到达文件描述符集合传入传出参数writefds 监控写数据到达文件描述符集合传入传出参数exceptfds 监控异常发生达文件描述符集合,如带外数据到达异常传入传出参数timeout 定时阻塞监控时间3种情况1.NULL永远等下去2.设置timeval等待固定时间3.设置timeval里时间均为0检查描述字后立即返回轮询struct timeval {long tv_sec; /* seconds */long tv_usec; /* microseconds */};void FD_CLR(int fd, fd_set *set); //把文件描述符集合里fd清0int FD_ISSET(int fd, fd_set *set); //测试文件描述符集合里fd是否置1void FD_SET(int fd, fd_set *set); //把文件描述符集合里fd位置1void FD_ZERO(fd_set *set); //把文件描述符集合里所有位清0返回值成功所监听的所有监听集合中满足条件的总数失败返回-13.select代码
服务器
/* server.c */
#include stdio.h
#include stdlib.h
#include string.h
#include netinet/in.h
#include arpa/inet.h
#include wrap.h#define MAXLINE 80
#define SERV_PORT 6666int main(int argc, char *argv[])
{int i, maxi, maxfd, listenfd, connfd, sockfd;int nready, client[FD_SETSIZE]; /* FD_SETSIZE 默认为 1024 */ssize_t n;fd_set rset, allset;char buf[MAXLINE];char str[INET_ADDRSTRLEN]; /* #define INET_ADDRSTRLEN 16 */socklen_t cliaddr_len;struct sockaddr_in cliaddr, servaddr;listenfd Socket(AF_INET, SOCK_STREAM, 0);bzero(servaddr, sizeof(servaddr));servaddr.sin_family AF_INET;servaddr.sin_addr.s_addr htonl(INADDR_ANY);servaddr.sin_port htons(SERV_PORT);Bind(listenfd, (struct sockaddr *)servaddr, sizeof(servaddr));Listen(listenfd, 20); /* 默认最大128 */maxfd listenfd; /* 初始化 */maxi -1; /* client[]的下标 */for (i 0; i FD_SETSIZE; i)client[i] -1; /* 用-1初始化client[] */FD_ZERO(allset);FD_SET(listenfd, allset); /* 构造select监控文件描述符集 */for ( ; ; ) {rset allset; /* 每次循环时都从新设置select监控信号集 */nready select(maxfd1, rset, NULL, NULL, NULL);if (nready 0)perr_exit(select error);if (FD_ISSET(listenfd, rset)) { /* new client connection */cliaddr_len sizeof(cliaddr);connfd Accept(listenfd, (struct sockaddr *)cliaddr, cliaddr_len);printf(received from %s at PORT %d\n,inet_ntop(AF_INET, cliaddr.sin_addr, str, sizeof(str)),ntohs(cliaddr.sin_port));for (i 0; i FD_SETSIZE; i) {if (client[i] 0) {client[i] connfd; /* 保存accept返回的文件描述符到client[]里 */break;}}/* 达到select能监控的文件个数上限 1024 */if (i FD_SETSIZE) {fputs(too many clients\n, stderr);exit(1);}FD_SET(connfd, allset); /* 添加一个新的文件描述符到监控信号集里 */if (connfd maxfd)maxfd connfd; /* select第一个参数需要 */if (i maxi)maxi i; /* 更新client[]最大下标值 */if (--nready 0)continue; /* 如果没有更多的就绪文件描述符继续回到上面select阻塞监听,负责处理未处理完的就绪文件描述符 */}for (i 0; i maxi; i) { /* 检测哪个clients 有数据就绪 */if ( (sockfd client[i]) 0)continue;if (FD_ISSET(sockfd, rset)) {if ( (n Read(sockfd, buf, MAXLINE)) 0) {Close(sockfd); /* 当client关闭链接时服务器端也关闭对应链接 */FD_CLR(sockfd, allset); /* 解除select监控此文件描述符 */client[i] -1;} else {int j;for (j 0; j n; j)buf[j] toupper(buf[j]);Write(sockfd, buf, n);}if (--nready 0)break;}}}close(listenfd);return 0;
}
客户端
/* client.c */
#include stdio.h
#include string.h
#include unistd.h
#include netinet/in.h
#include wrap.h#define MAXLINE 80
#define SERV_PORT 6666int main(int argc, char *argv[])
{struct sockaddr_in servaddr;char buf[MAXLINE];int sockfd, n;sockfd Socket(AF_INET, SOCK_STREAM, 0);bzero(servaddr, sizeof(servaddr));servaddr.sin_family AF_INET;inet_pton(AF_INET, 127.0.0.1, servaddr.sin_addr);servaddr.sin_port htons(SERV_PORT);Connect(sockfd, (struct sockaddr *)servaddr, sizeof(servaddr));while (fgets(buf, MAXLINE, stdin) ! NULL) {Write(sockfd, buf, strlen(buf));n Read(sockfd, buf, MAXLINE);if (n 0)printf(the other side has been closed.\n);elseWrite(STDOUT_FILENO, buf, n);}Close(sockfd);return 0;
}
4.poll
#include poll.h
int poll(struct pollfd *fds, nfds_t nfds, int timeout);struct pollfd {int fd; /* 文件描述符 */short events; /* 监控的事件 */short revents; /* 监控事件中满足条件返回的事件 */};POLLIN 普通或带外优先数据可读,即POLLRDNORM | POLLRDBANDPOLLRDNORM 数据可读POLLRDBAND 优先级带数据可读POLLPRI 高优先级可读数据POLLOUT 普通或带外数据可写POLLWRNORM 数据可写POLLWRBAND 优先级带数据可写POLLERR 发生错误POLLHUP 发生挂起POLLNVAL 描述字不是一个打开的文件nfds 监控数组中有多少文件描述符需要被监控timeout 毫秒级等待-1阻塞等#define INFTIM -1 Linux中没有定义此宏0立即返回不阻塞进程0等待指定毫秒数如当前系统时间精度不够毫秒向上取值如果不再监控某个文件描述符时可以把pollfd中fd设置为-1poll不再监控此pollfd
下次返回时把revents设置为0。#includestdio.h
#includestdlib.h
#includestring.h
#includenetinet/in.h
#includearpa/inet.h
#includepoll.h
#includeerrno.h
#includectype.h
#includeunistd.h#define MAXLINE 80
#define SERV_PORT 8000
#define OPEN_MAX 1024int main()
{int i,j,maxi,listenfd,connfd,sockfd;int nready;ssize_t n;char buf[MAXLINE],str[INET_ADDRSTRLEN];socklen_t clilen;struct pollfd client[OPEN_MAX];struct sockaddr_in cliaddr,servaddr;listenfdsocket(AF_INET,SOCK_STREAM,0);int opt1;setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,opt,sizeof(opt));bzero(servaddr,sizeof(servaddr));servaddr.sin_familyAF_INET;servaddr.sin_addr.s_addrhtonl(INADDR_ANY);servaddr.sin_porthtons(SERV_PORT);bind(listenfd,(struct sockaddr*)servaddr,sizeof(servaddr));listen(listenfd,120);client[0].fdlistenfd;client[0].eventsPOLLIN;for(int i1;iOPEN_MAX;i){client[i].fd-1;}maxi0;for( ; ;){nreadypoll(client,maxi1,-1);/*这个if语句监听listenfd是否有读事件也就是是否有客户端连接请求有的话accept连接并将客户端的文件描述符添加到监听队列中进行监听*/if(client[0].revents POLLIN){clilensizeof(cliaddr);connfdaccept(listenfd,(struct sockaddr*)cliaddr,clilen);printf(received from %s at PORT %d\n,inet_ntop(AF_INET,cliaddr.sin_addr,str,sizeof(str)),ntohs(cliaddr.sin_port));for(i1;iOPEN_MAX;i){if(client[i].fd0){client[i].fdconnfd;break;}}if(iOPEN_MAX){perror(too many clients);}client[i].eventsPOLLIN;if(imaxi){maxii;}if(--nready0){continue;}}for(int i1;imaxi;i){if((sockfdclient[i].fd)0){continue;}if(client[i].revents POLLIN){if((nread(sockfd,buf,MAXLINE))0){if(errnoECONNRESET){close(sockfd);client[i].fd-1;}else{perror(read error);}}else if(n0){printf(client[%d] closed connection\n,i);close(sockfd);client[i].fd-1;}else{for(j0;jn;j){buf[j]toupper(buf[j]);}printf(read 执行\n);write(sockfd,buf,n);}if(--nready0){break;}}}}return 0;
}5.epoll
epoll是Linux下多路复用IO接口select/poll的增强版本它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率因为它会复用文件描述符集合来传递结果而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合另一点原因就是获取事件的时候它无须遍历整个被侦听的描述符集只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。 目前epell是linux大规模并发网络程序中的热门首选模型。 epoll除了提供select/poll那种IO事件的电平触发Level Triggered外还提供了边沿触发Edge Triggered这就使得用户空间程序有可能缓存IO状态减少epoll_wait/epoll_pwait的调用提高应用程序效率。 可以使用cat命令查看一个进程可以打开的socket描述符上限。
cat /proc/sys/fs/file-max如有需要可以通过修改配置文件的方式修改该上限值
sudo vi /etc/security/limits.conf在文件尾部写入以下配置,soft软限制hard硬限制。如下图所示。* soft nofile 65536* hard nofile 1000005.1 基础API
创建一个epoll句柄参数size用来告诉内核监听的文件描述符的个数跟内存大小有关。
#include sys/epoll.hint epoll_create(int size) size监听数目控制某个epoll监控的文件描述符上的事件注册、修改、删除。
#include sys/epoll.hint epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)epfd 为epoll_creat的句柄op 表示动作用3个宏来表示EPOLL_CTL_ADD (注册新的fd到epfd)EPOLL_CTL_MOD (修改已经注册的fd的监听事件)EPOLL_CTL_DEL (从epfd删除一个fd)event 告诉内核需要监听的事件struct epoll_event {__uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */};typedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64;} epoll_data_t;EPOLLIN 表示对应的文件描述符可以读包括对端SOCKET正常关闭EPOLLOUT 表示对应的文件描述符可以写EPOLLPRI 表示对应的文件描述符有紧急的数据可读这里应该表示有带外数据到来EPOLLERR 表示对应的文件描述符发生错误EPOLLHUP 表示对应的文件描述符被挂断EPOLLET 将EPOLL设为边缘触发(Edge Triggered)模式这是相对于水平触发(Level Triggered)而言的EPOLLONESHOT只监听一次事件当监听完这次事件之后如果还需要继续监听这个socket的话需要再次把这个socket加入到EPOLL队列里
等待所监控文件描述符上有事件的产生类似于select()调用
#include sys/epoll.hint epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)events 用来存内核得到事件的集合maxevents 告之内核这个events有多大这个maxevents的值不能大于创建epoll_create()时的sizetimeout 是超时时间-1 阻塞0 立即返回非阻塞0 指定毫秒返回值 成功返回有多少文件描述符就绪时间到时返回0出错返回-1
5.3 epoll代码
#includestdio.h
#includeunistd.h
#includestdlib.h
#includestring.h
#includearpa/inet.h
#includesys/epoll.h
#includeerrno.h
#includectype.h#define MAXLINE 8192
#define SERV_PORT 8000
#define OPEN_MAX 5000int main()
{int i,listenfd,connfd,sockfd;int n,num0;ssize_t nready,efd,res;char buf[MAXLINE],str[INET_ADDRSTRLEN];socklen_t clilen;struct sockaddr_in cliaddr,servaddr;struct epoll_event tep,ep[OPEN_MAX]; //tep:epoll_ctl参数;ep[]:epoll_wait参数listenfdsocket(AF_INET,SOCK_STREAM,0);int opt1;setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,opt,sizeof(opt));bzero(servaddr,sizeof(servaddr));servaddr.sin_familyAF_INET;servaddr.sin_addr.s_addrhtonl(INADDR_ANY);servaddr.sin_porthtons(SERV_PORT);bind(listenfd,(struct sockaddr*)servaddr,sizeof(servaddr));listen(listenfd,20);efdepoll_create(OPEN_MAX); //创建epoll模型efd指向红黑树根节点if(efd-1){perror(epoll_create error);}tep.eventsEPOLLIN;tep.data.fdlistenfd; //指定lfd的监听事件为读resepoll_ctl(efd,EPOLL_CTL_ADD,listenfd,tep); //将lfd及对应的结构体添加到红黑树上efd可以找到该树if(res-1){perror(epoll_ctl error);}while(1){/*对红黑树上文件描述符监听ep为struct epoll_event类型数组OPEN_MAX为数组容量-1永久阻塞*/nreadyepoll_wait(efd,ep,OPEN_MAX,-1);if(nready-1){perror(epoll_wait error);}for(int i0;inready;i){if(!(ep[i].eventsEPOLLIN))continue;if(ep[i].data.fdlistenfd){clilensizeof(cliaddr);connfdaccept(listenfd,(struct sockaddr*)cliaddr,clilen);//listenfd可读说明有客户端连接printf(received from %s at PORT %d\n,inet_ntop(AF_INET,cliaddr.sin_addr,str,sizeof(str)),ntohs(cliaddr.sin_port));printf(cfd %d---client %d\n,connfd,num);tep.eventsEPOLLIN;tep.data.fdconnfd;resepoll_ctl(efd,EPOLL_CTL_ADD,connfd,tep); //将客户端文件描述符添加到红黑树上if(res-1){perror(epoll_ctl error);}}else{sockfdep[i].data.fd;nread(sockfd,buf,MAXLINE);if(n0){ //读到0说明客户端关闭连接resepoll_ctl(efd,EPOLL_CTL_DEL,sockfd,NULL); //将该文件描述符从红黑树摘除if(res-1){perror(epoll_ctl error);}close(sockfd);printf(client[%d] closed connection\n,sockfd);}else if(n0){ //出错perror(read n0 error:);resepoll_ctl(efd,EPOLL_CTL_DEL,sockfd,NULL);close(sockfd);}else{for(int i0;in;i){buf[i]toupper(buf[i]);}write(sockfd,buf,n);}}}}close(listenfd);close(efd);return 0;
}5.4 边沿触发和水平触发
5.4.1 水平出发LT
#includestdio.h
#includeunistd.h
#includestdlib.h
#includesys/epoll.h
#includeerrno.h#define MAXLINE 10int main()
{int efd,i;int pfd[2];pid_t pid;char buf[MAXLINE],cha;pipe(pfd);pidfork();if(pid0){close(pfd[0]);while(1){for(i0;iMAXLINE/2;i){buf[i]ch;}buf[i-1]\n;ch;for(;iMAXLINE;i){buf[i]ch;}buf[i-1]\n;ch;write(pfd[1],buf,sizeof(buf));sleep(5);}close(pfd[1]);}else if(pid0){struct epoll_event event;struct epoll_event resevent[10];int res,len;close(pfd[1]);efdepoll_create(10);//event.eventsEPOLLIN|EPOLLET; //边缘触发event.eventsEPOLLIN; //水平触发默认水平触发event.data.fdpfd[0];epoll_ctl(efd,EPOLL_CTL_ADD,pfd[0],event);while(1){resepoll_wait(efd,resevent,10,-1);printf(res %d\n,res);if(resevent[0].data.fdpfd[0]){lenread(pfd[0],buf,MAXLINE/2);write(STDOUT_FILENO,buf,len);}}close(pfd[0]);}}zhaoxrzhaoxr-ThinkPad-E450:~/select$ ./epoll_trigger
res 1
aaaa
res 1
bbbb
res 1
cccc
res 1
dddd
res 1
eeee
res 1
ffff
^C5.4.2 边缘触发
#includestdio.h
#includeunistd.h
#includestdlib.h
#includesys/epoll.h
#includeerrno.h#define MAXLINE 10int main()
{int efd,i;int pfd[2];pid_t pid;char buf[MAXLINE],cha;pipe(pfd);pidfork();if(pid0){close(pfd[0]);while(1){for(i0;iMAXLINE/2;i){buf[i]ch;}buf[i-1]\n;ch;for(;iMAXLINE;i){buf[i]ch;}buf[i-1]\n;ch;write(pfd[1],buf,sizeof(buf));sleep(5);}close(pfd[1]);}else if(pid0){struct epoll_event event;struct epoll_event resevent[10];int res,len;close(pfd[1]);efdepoll_create(10);event.eventsEPOLLIN|EPOLLET; //ET 边缘触发//event.eventsEPOLLIN; //LT 水平触发(默认)event.data.fdpfd[0];epoll_ctl(efd,EPOLL_CTL_ADD,pfd[0],event);while(1){resepoll_wait(efd,resevent,10,-1);printf(res %d\n,res);if(resevent[0].data.fdpfd[0]){lenread(pfd[0],buf,MAXLINE/2);write(STDOUT_FILENO,buf,len);}}close(pfd[0]);}}5.4.3 服务器的边缘触发和水平触发
边缘触发非阻塞模式:
#include stdio.h
#include string.h
#include netinet/in.h
#include arpa/inet.h
#include signal.h
#include sys/wait.h
#include sys/types.h
#include sys/epoll.h
#include unistd.h#define MAXLINE 10
#define SERV_PORT 8080int main(void)
{struct sockaddr_in servaddr, cliaddr;socklen_t cliaddr_len;int listenfd, connfd;char buf[MAXLINE];char str[INET_ADDRSTRLEN];int i, efd;listenfd socket(AF_INET, SOCK_STREAM, 0);bzero(servaddr, sizeof(servaddr));servaddr.sin_family AF_INET;servaddr.sin_addr.s_addr htonl(INADDR_ANY);servaddr.sin_port htons(SERV_PORT);bind(listenfd, (struct sockaddr *)servaddr, sizeof(servaddr));listen(listenfd, 20);struct epoll_event event;struct epoll_event resevent[10];int res, len;efd epoll_create(10);event.events EPOLLIN | EPOLLET; /* ET 边沿触发 默认是水平触发 *///event.eventsEPOLLIN;printf(Accepting connections ...\n);cliaddr_len sizeof(cliaddr);connfd accept(listenfd, (struct sockaddr *)cliaddr, cliaddr_len);printf(received from %s at PORT %d\n,inet_ntop(AF_INET, cliaddr.sin_addr, str, sizeof(str)),ntohs(cliaddr.sin_port));event.data.fd connfd;epoll_ctl(efd, EPOLL_CTL_ADD, connfd, event);while (1) {res epoll_wait(efd, resevent, 10, -1);printf(res %d\n, res);if (resevent[0].data.fd connfd) {len read(connfd, buf, MAXLINE/2);write(STDOUT_FILENO, buf, len);}}return 0;
}水平触发
#include stdio.h
#include string.h
#include netinet/in.h
#include arpa/inet.h
#include signal.h
#include sys/wait.h
#include sys/types.h
#include sys/epoll.h
#include unistd.h#define MAXLINE 10
#define SERV_PORT 8080int main(void)
{struct sockaddr_in servaddr, cliaddr;socklen_t cliaddr_len;int listenfd, connfd;char buf[MAXLINE];char str[INET_ADDRSTRLEN];int i, efd;listenfd socket(AF_INET, SOCK_STREAM, 0);bzero(servaddr, sizeof(servaddr));servaddr.sin_family AF_INET;servaddr.sin_addr.s_addr htonl(INADDR_ANY);servaddr.sin_port htons(SERV_PORT);bind(listenfd, (struct sockaddr *)servaddr, sizeof(servaddr));listen(listenfd, 20);struct epoll_event event;struct epoll_event resevent[10];int res, len;efd epoll_create(10);//event.events EPOLLIN | EPOLLET; /* ET 边沿触发 默认是水平触发 */event.eventsEPOLLIN;printf(Accepting connections ...\n);cliaddr_len sizeof(cliaddr);connfd accept(listenfd, (struct sockaddr *)cliaddr, cliaddr_len);printf(received from %s at PORT %d\n,inet_ntop(AF_INET, cliaddr.sin_addr, str, sizeof(str)),ntohs(cliaddr.sin_port));event.data.fd connfd;epoll_ctl(efd, EPOLL_CTL_ADD, connfd, event);while (1) {res epoll_wait(efd, resevent, 10, -1);printf(res %d\n, res);if (resevent[0].data.fd connfd) {len read(connfd, buf, MAXLINE/2);write(STDOUT_FILENO, buf, len);}}return 0;
}客户端程序
#include stdio.h
#include string.h
#include unistd.h
#include netinet/in.h
#includearpa/inet.h
#define MAXLINE 10
#define SERV_PORT 8080int main(int argc, char *argv[])
{struct sockaddr_in servaddr;char buf[MAXLINE];int sockfd, i;char ch a;sockfd socket(AF_INET, SOCK_STREAM, 0);bzero(servaddr, sizeof(servaddr));servaddr.sin_family AF_INET;inet_pton(AF_INET, 127.0.0.1, servaddr.sin_addr);servaddr.sin_port htons(SERV_PORT);connect(sockfd, (struct sockaddr *)servaddr, sizeof(servaddr));while (1) {for (i 0; i MAXLINE/2; i)buf[i] ch;buf[i-1] \n;ch;for (; i MAXLINE; i)buf[i] ch;buf[i-1] \n;ch;write(sockfd, buf, sizeof(buf));sleep(5);}close(sockfd);return 0;
}5.4 边缘触发但是能一次读完
边缘触发但是能一次读完不通过使用epoll_wait的触发可以一次读完所有的数据这其中的原理是将文件描述符设置为非阻塞通过while ((len read(connfd, buf, MAXLINE/2)) 0) write(STDOUT_FILENO, buf, len); 可以一次将connfd中的所有数据读完所有数据。
ET的非阻塞模式比LT模式效率要高因为ET减少了epoll_wait()的使用。
结论epoll 的 ET模式 高效模式但是只支持 非阻塞模式。 --- 忙轮询。struct epoll_event event;event.events EPOLLIN | EPOLLET;epoll_ctl(epfd, EPOLL_CTL_ADD, cfd event); int flg fcntl(cfd, F_GETFL); flg | O_NONBLOCK;fcntl(cfd, F_SETFL, flg);优点高效。突破1024文件描述符。缺点不能跨平台。 Linux。6.epoll反应堆模型
6.1 反应堆模型
epoll 反应堆模型epoll ET模式 非阻塞、轮询 void *ptr。原来 socket、bind、listen -- epoll_create 创建监听 红黑树 -- 返回 epfd -- epoll_ctl() 向树上添加一个监听fd -- while1---- epoll_wait 监听 -- 对应监听fd有事件产生 -- 返回 监听满足数组。 -- 判断返回数组元素 -- lfd满足 -- Accept -- cfd 满足 -- read() --- 小-大 -- write回去。反应堆不但要监听 cfd 的读事件、还要监听cfd的写事件。socket、bind、listen -- epoll_create 创建监听 红黑树 -- 返回 epfd -- epoll_ctl() 向树上添加一个监听fd -- while1---- epoll_wait 监听 -- 对应监听fd有事件产生 -- 返回 监听满足数组。 -- 判断返回数组元素 -- lfd满足 -- Accept -- cfd 满足 -- read() --- 小-大 -- cfd从监听红黑树上摘下 -- EPOLLOUT -- 回调函数 -- epoll_ctl() -- EPOLL_CTL_ADD 重新放到红黑上监听写事件-- 等待 epoll_wait 返回 -- 说明 cfd 可写 -- write回去 -- cfd从监听红黑树上摘下 -- EPOLLIN -- epoll_ctl() -- EPOLL_CTL_ADD 重新放到红黑上监听读事件 -- epoll_wait 监听
反应堆的理解加入IO转接之后有了事件server才去处理这里反应堆也是这样由于网络环境复杂服务器处理数据之后可能并不能直接写回去比如遇到网络繁忙或者对方缓冲区已经满了这种情况就不能直接写回给客户端。反应堆就是在处理数据之后监听写事件能写会客户端了才去做写回操作。写回之后再改为监听读事件。如此循环。
6.2 epoll反应堆代码
/*
*epoll基于非阻塞I/O事件驱动
*/
#include stdio.h
#include sys/socket.h
#include sys/epoll.h
#include arpa/inet.h
#include fcntl.h
#include unistd.h
#include errno.h
#include string.h
#include stdlib.h
#include time.h #define MAX_EVENTS 1024 //监听上限数
#define BUFLEN 4096
#define SERV_PORT 8080 void recvdata(int fd, int events, void *arg);
void senddata(int fd, int events, void *arg); /* 描述就绪文件描述符相关信息 */ struct myevent_s { int fd; //要监听的文件描述符 int events; //对应的监听事件 void *arg; //泛型参数 void (*call_back)(int fd, int events, void *arg); //回调函数 int status; //是否在监听:1-在红黑树上(监听), 0-不在(不监听) char buf[BUFLEN]; int len; long last_active; //记录每次加入红黑树 g_efd 的时间值
}; int g_efd; //全局变量, 保存epoll_create返回的文件描述符
struct myevent_s g_events[MAX_EVENTS1]; //自定义结构体类型数组. 1--listen fd /*将结构体 myevent_s 成员变量 初始化*/ void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg)
{ ev-fd fd; ev-call_back call_back; ev-events 0; ev-arg arg; ev-status 0; memset(ev-buf, 0, sizeof(ev-buf)); ev-len 0; ev-last_active time(NULL); //调用eventset函数的时间 return;
} /* 向 epoll监听的红黑树 添加一个 文件描述符 */ //eventadd(efd, EPOLLIN, g_events[MAX_EVENTS]);
void eventadd(int efd, int events, struct myevent_s *ev)
{ struct epoll_event epv {0, {0}}; int op; epv.data.ptr ev; epv.events ev-events events; //EPOLLIN 或 EPOLLOUT if (ev-status 0) { //已经在红黑树 g_efd 里 op EPOLL_CTL_ADD; //将其加入红黑树 g_efd, 并将status置1 ev-status 1; } if (epoll_ctl(efd, op, ev-fd, epv) 0) //实际添加/修改 printf(event add failed [fd%d], events[%d]\n, ev-fd, events); else printf(event add OK [fd%d], op%d, events[%0X]\n, ev-fd, op, events); return ;
} /* 从epoll 监听的 红黑树中删除一个 文件描述符*/ void eventdel(int efd, struct myevent_s *ev)
{ struct epoll_event epv {0, {0}}; if (ev-status ! 1) //不在红黑树上 return ; //epv.data.ptr ev; epv.data.ptr NULL; ev-status 0; //修改状态 epoll_ctl(efd, EPOLL_CTL_DEL, ev-fd, epv); //从红黑树 efd 上将 ev-fd 摘除 return ;
} /* 当有文件描述符就绪, epoll返回, 调用该函数 与客户端建立链接 */ void acceptconn(int lfd, int events, void *arg)
{ struct sockaddr_in cin; socklen_t len sizeof(cin); int cfd, i; if ((cfd accept(lfd, (struct sockaddr *)cin, len)) -1) { if (errno ! EAGAIN errno ! EINTR) { /* 暂时不做出错处理 */ } printf(%s: accept, %s\n, __func__, strerror(errno)); return ; } do { for (i 0; i MAX_EVENTS; i) //从全局数组g_events中找一个空闲元素 if (g_events[i].status 0) //类似于select中找值为-1的元素 break; //跳出 for if (i MAX_EVENTS) { printf(%s: max connect limit[%d]\n, __func__, MAX_EVENTS); break; //跳出do while(0) 不执行后续代码 } int flag 0; if ((flag fcntl(cfd, F_SETFL, O_NONBLOCK)) 0) { //将cfd也设置为非阻塞 printf(%s: fcntl nonblocking failed, %s\n, __func__, strerror(errno)); break; } /* 给cfd设置一个 myevent_s 结构体, 回调函数 设置为 recvdata */ eventset(g_events[i], cfd, recvdata, g_events[i]); eventadd(g_efd, EPOLLIN, g_events[i]); //将cfd添加到红黑树g_efd中,监听读事件 } while(0); printf(new connect [%s:%d][time:%ld], pos[%d]\n, inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i); return ;
} void recvdata(int fd, int events, void *arg) { struct myevent_s *ev (struct myevent_s *)arg; int len; len recv(fd, ev-buf, sizeof(ev-buf), 0); //读文件描述符, 数据存入myevent_s成员buf中 eventdel(g_efd, ev); //将该节点从红黑树上摘除 if (len 0) { ev-len len; ev-buf[len] \0; //手动添加字符串结束标记 printf(C[%d]:%s\n, fd, ev-buf); eventset(ev, fd, senddata, ev); //设置该 fd 对应的回调函数为 senddata eventadd(g_efd, EPOLLOUT, ev); //将fd加入红黑树g_efd中,监听其写事件 } else if (len 0) { close(ev-fd); /* ev-g_events 地址相减得到偏移元素位置 */ printf([fd%d] pos[%ld], closed\n, fd, ev-g_events); } else { close(ev-fd); printf(recv[fd%d] error[%d]:%s\n, fd, errno, strerror(errno)); } return; } void senddata(int fd, int events, void *arg) { struct myevent_s *ev (struct myevent_s *)arg; int len; len send(fd, ev-buf, ev-len, 0); //直接将数据 回写给客户端。未作处理 eventdel(g_efd, ev); //从红黑树g_efd中移除 if (len 0) { printf(send[fd%d], [%d]%s\n, fd, len, ev-buf); eventset(ev, fd, recvdata, ev); //将该fd的 回调函数改为 recvdata eventadd(g_efd, EPOLLIN, ev); //从新添加到红黑树上 设为监听读事件 } else { close(ev-fd); //关闭链接 printf(send[fd%d] error %s\n, fd, strerror(errno)); } return ; } /*创建 socket, 初始化lfd */ void initlistensocket(int efd, short port) { struct sockaddr_in sin; int lfd socket(AF_INET, SOCK_STREAM, 0); fcntl(lfd, F_SETFL, O_NONBLOCK); //将socket设为非阻塞 memset(sin, 0, sizeof(sin)); //bzero(sin, sizeof(sin)) sin.sin_family AF_INET; sin.sin_addr.s_addr INADDR_ANY; sin.sin_port htons(port); bind(lfd, (struct sockaddr *)sin, sizeof(sin)); listen(lfd, 20); /* void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg); */ eventset(g_events[MAX_EVENTS], lfd, acceptconn, g_events[MAX_EVENTS]); /* void eventadd(int efd, int events, struct myevent_s *ev) */ eventadd(efd, EPOLLIN, g_events[MAX_EVENTS]); return ; } int main(int argc, char *argv[]) { unsigned short port SERV_PORT; if (argc 2) port atoi(argv[1]); //使用用户指定端口.如未指定,用默认端口 g_efd epoll_create(MAX_EVENTS1); //创建红黑树,返回给全局 g_efd if (g_efd 0) printf(create efd in %s err %s\n, __func__, strerror(errno)); initlistensocket(g_efd, port); //初始化监听socket struct epoll_event events[MAX_EVENTS1]; //保存已经满足就绪事件的文件描述符数组 printf(server running:port[%d]\n, port); int checkpos 0, i; while (1) { /* 超时验证每次测试100个链接不测试listenfd 当客户端60秒内没有和服务器通信则关闭此客户端链接 */ long now time(NULL); //当前时间 for (i 0; i 100; i, checkpos) { //一次循环检测100个。 使用checkpos控制检测对象 if (checkpos MAX_EVENTS) checkpos 0; if (g_events[checkpos].status ! 1) //不在红黑树 g_efd 上 continue; long duration now - g_events[checkpos].last_active; //客户端不活跃的世间 if (duration 60) { close(g_events[checkpos].fd); //关闭与该客户端链接 printf([fd%d] timeout\n, g_events[checkpos].fd); eventdel(g_efd, g_events[checkpos]); //将该客户端 从红黑树 g_efd移除 } } /*监听红黑树g_efd, 将满足的事件的文件描述符加至events数组中, 1秒没有事件满足, 返回 0*/ int nfd epoll_wait(g_efd, events, MAX_EVENTS1, 1000); if (nfd 0) { printf(epoll_wait error, exit\n); break; } for (i 0; i nfd; i) { /*使用自定义结构体myevent_s类型指针, 接收 联合体data的void *ptr成员*/ struct myevent_s *ev (struct myevent_s *)events[i].data.ptr; if ((events[i].events EPOLLIN) (ev-events EPOLLIN)) { //读就绪事件 ev-call_back(ev-fd, events[i].events, ev-arg); //lfd EPOLLIN } if ((events[i].events EPOLLOUT) (ev-events EPOLLOUT)) { //写就绪事件 ev-call_back(ev-fd, events[i].events, ev-arg); } } } /* 退出前释放所有资源 */ return 0; } 7.心跳包
TCP保活机制
心跳包
由应用程序自己发送心跳包来检测连接是否正常大致的方法是服务器在一个 Timer事件中定时向客户端发送一个短小精悍的数据包然后启动一个低级别的线程在该线程中不断检测客户端的回应 如果在一定时间内没有收到客户端的回应即认为客户端已经掉线同样如果客户端在一定时间内没有收到服务器的心跳包则认为连接不可用。
心跳检测机制 在TCP网络通信中经常会出现客户端和服务器之间的非正常断开需要实时检测查询链接状态。常用的解决方法就是在程序中加入心跳机制。
Heart-Beat线程 这个是最常用的简单方法。在接收和发送数据时个人设计一个守护进程(线程)定时发送Heart-Beat包客户端/服务器收到该小包后立刻返回相应的包即可检测对方是否实时在线。
该方法的好处是通用但缺点就是会改变现有的通讯协议大家一般都是使用业务层心跳来处理主要是灵活可控。
UNIX网络编程不推荐使用SO_KEEPALIVE来做心跳检测还是在业务层以心跳包做检测比较好也方便控制。
乒乓包
举例微信朋友圈有人评论客户端怎么知道有人评论服务器怎么将评论发给客户端的
微信客户端每隔一段时间就向服务器询问是否有人评论 当服务器检查到有人给评论时服务器发送一个乒乓包给客户端该乒乓包中携带的数据是[此时有人评论的标志位] 注步骤1和2服务器和客户端不需要建立连接只是发送简单的乒乓包。 当客户端接收到服务器回复的带有评论标志位的乒乓包后才真正的去和服务器通过三次握手建立连接建立连接后服务器将评论的数据发送给客户端。
注意乒乓包是携带很简单的数据的包
设置TCP属性: SO_KEEPALIVE
1.因为要考虑到一个服务器通常会连接多个客户端因此由用户在应用层自己实现心跳包代码较多 且稍显复杂而利用TCPIP协议层为内置的KeepAlive功能来实现心跳功能则简单得多。
2.不论是服务端还是客户端一方开启KeepAlive功能后就会自动在规定时间内向对方发送心跳包 而另一方在收到心跳包后就会自动回复以告诉对方我仍然在线。
3.因为开启KeepAlive功能需要消耗额外的宽带和流量所以TCP协议层默认并不开启KeepAlive功 能尽管这微不足道但在按流量计费的环境下增加了费用另一方面KeepAlive设置不合理时可能会 因为短暂的网络波动而断开健康的TCP连接。并且默认的KeepAlive超时需要7,200000 MilliSeconds 即2小时探测次数为5次。对于很多服务端应用程序来说2小时的空闲时间太长。
4.因此我们需要手工开启KeepAlive功能并设置合理的KeepAlive参数。
在《UNIX网络编程第1卷》中也有详细的阐述 SO_KEEPALIVE保持连接检测对方主机是否崩溃避免服务器永远阻塞于TCP连接的输入。设置该选项后如果2小时内在此套接口的任一方向都没有数据交换TCP就自动给对方 发一个保持存活探测分节(keepalive probe)。这是一个对方必须响应的TCP分节.它会导致以下三种情况
对方接收一切正常以期望的ACK响应。2小时后TCP将发出另一个探测分节。
对方已崩溃且已重新启动以RST响应。套接口的待处理错误被置为ECONNRESET套接口本身则被关闭。
对方无任何响应源自berkeley的TCP发送另外8个探测分节相隔75秒一个试图得到一个响应。在发出第一个探测分节11分钟15秒后若仍无响应就放弃。套接口的待处理错误被置为ETIMEOUT套接口本身则被关闭。如ICMP错误是“host unreachable(主机不可达)”说明对方主机并没有崩溃但是不可达这种情况下待处理错误被置为EHOSTUNREACH。根据上面的介绍可以知道对端以一种非优雅的方式断开连接的时候可以设置SO_KEEPALIVE属性使得在2小时以后发现对方的TCP连接是否依然存在。如果不能接受如此之长的等待时间从TCP-Keepalive-HOWTO上可以知道一共有两种方式可以设置
修改内核关于网络方面的配置参数
SOL_TCP字段的TCP_KEEPIDLETCP_KEEPINTVLTCP_KEEPCNT三个选项int keepIdle 6; /*开始首次KeepAlive探测前的TCP空闭时间 */
int keepInterval 5; /* 两次KeepAlive探测间的时间间隔 */
int keepCount 3; /* 判定断开前的KeepAlive探测次数 */
Setsockopt(listenfd, SOL_TCP, TCP_KEEPIDLE, (void *)keepIdle, sizeof(keepIdle));
Setsockopt(listenfd, SOL_TCP,TCP_KEEPINTVL, (void *)keepInterval, sizeof(keepInterval));
Setsockopt(listenfd,SOL_TCP, TCP_KEEPCNT, (void *)keepCount, sizeof(keepCount)); 8.线程池 8.1 线程池代码
#ifndef __THREADPOOL_H_
#define __THREADPOOL_H_typedef struct threadpool_t threadpool_t;/*** function threadpool_create* descCreates a threadpool_t object.* param thr_num thread num* param max_thr_num max thread size* param queue_max_size size of the queue.* return a newly created thread pool or NULL*/
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);/*** function threadpool_add* desc add a new task in the queue of a thread pool* param pool Thread pool to which add the task.* param function Pointer to the function that will perform the task.* param argument Argument to be passed to the function.* return 0 if all goes well,else -1*/
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);/*** function threadpool_destroy* desc Stops and destroys a thread pool.* param pool Thread pool to destroy.* return 0 if destory success else -1*/
int threadpool_destroy(threadpool_t *pool);/*** desc get the thread num* pool pool threadpool* return # of the thread*/
int threadpool_all_threadnum(threadpool_t *pool);/*** desc get the busy thread num* param pool threadpool* return # of the busy thread*/
int threadpool_busy_threadnum(threadpool_t *pool);#endif#include stdlib.h
#include pthread.h
#include unistd.h
#include assert.h
#include stdio.h
#include string.h
#include signal.h
#include errno.h
#include threadpool.h#define DEFAULT_TIME 10 /*10s检测一次*/
#define MIN_WAIT_TASK_NUM 10 /*如果queue_size MIN_WAIT_TASK_NUM 添加新的线程到线程池*/
#define DEFAULT_THREAD_VARY 10 /*每次创建和销毁线程的个数*/
#define true 1
#define false 0typedef struct {void *(*function)(void *); /* 函数指针回调函数 */void *arg; /* 上面函数的参数 */
} threadpool_task_t; /* 各子线程任务结构体 *//* 描述线程池相关信息 */struct threadpool_t {pthread_mutex_t lock; /* 用于锁住本结构体 */ pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */pthread_cond_t queue_not_full; /* 当任务队列满时添加任务的线程阻塞等待此条件变量 */pthread_cond_t queue_not_empty; /* 任务队列里不为空时通知等待任务的线程 */pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */pthread_t adjust_tid; /* 存管理线程tid */threadpool_task_t *task_queue; /* 任务队列(数组首地址) */int min_thr_num; /* 线程池最小线程数 */int max_thr_num; /* 线程池最大线程数 */int live_thr_num; /* 当前存活线程个数 */int busy_thr_num; /* 忙状态线程个数 */int wait_exit_thr_num; /* 要销毁的线程个数 */int queue_front; /* task_queue队头下标 */int queue_rear; /* task_queue队尾下标 */int queue_size; /* task_queue队中实际任务数 */int queue_max_size; /* task_queue队列可容纳任务数上限 */int shutdown; /* 标志位线程池使用状态true或false */
};void *threadpool_thread(void *threadpool);void *adjust_thread(void *threadpool);int is_thread_alive(pthread_t tid);
int threadpool_free(threadpool_t *pool);//threadpool_create(3,100,100);
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{int i;threadpool_t *pool NULL; /* 线程池 结构体 */do {if((pool (threadpool_t *)malloc(sizeof(threadpool_t))) NULL) { printf(malloc threadpool fail);break; /*跳出do while*/}pool-min_thr_num min_thr_num;pool-max_thr_num max_thr_num;pool-busy_thr_num 0;pool-live_thr_num min_thr_num; /* 活着的线程数 初值最小线程数 */pool-wait_exit_thr_num 0;pool-queue_size 0; /* 有0个产品 */pool-queue_max_size queue_max_size; /* 最大任务队列数 */pool-queue_front 0;pool-queue_rear 0;pool-shutdown false; /* 不关闭线程池 *//* 根据最大线程上限数 给工作线程数组开辟空间, 并清零 */pool-threads (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); if (pool-threads NULL) {printf(malloc threads fail);break;}memset(pool-threads, 0, sizeof(pthread_t)*max_thr_num);/* 给 任务队列 开辟空间 */pool-task_queue (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);if (pool-task_queue NULL) {printf(malloc task_queue fail);break;}/* 初始化互斥琐、条件变量 */if (pthread_mutex_init((pool-lock), NULL) ! 0|| pthread_mutex_init((pool-thread_counter), NULL) ! 0|| pthread_cond_init((pool-queue_not_empty), NULL) ! 0|| pthread_cond_init((pool-queue_not_full), NULL) ! 0){printf(init the lock or cond fail);break;}/* 启动 min_thr_num 个 work thread */for (i 0; i min_thr_num; i) {pthread_create((pool-threads[i]), NULL, threadpool_thread, (void *)pool); /*pool指向当前线程池*/printf(start thread 0x%x...\n, (unsigned int)pool-threads[i]);}pthread_create((pool-adjust_tid), NULL, adjust_thread, (void *)pool); /* 创建管理者线程 */return pool;} while (0);threadpool_free(pool); /* 前面代码调用失败时释放poll存储空间 */return NULL;
}/* 向线程池中 添加一个任务 */
//threadpool_add(thp, process, (void*)num[i]); /* 向线程池中添加任务 process: 小写----大写*/int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{pthread_mutex_lock((pool-lock));/* 为真队列已经满 调wait阻塞 */while ((pool-queue_size pool-queue_max_size) (!pool-shutdown)) {pthread_cond_wait((pool-queue_not_full), (pool-lock));}if (pool-shutdown) {pthread_cond_broadcast((pool-queue_not_empty));pthread_mutex_unlock((pool-lock));return 0;}/* 清空 工作线程 调用的回调函数 的参数arg */if (pool-task_queue[pool-queue_rear].arg ! NULL) {pool-task_queue[pool-queue_rear].arg NULL;}/*添加任务到任务队列里*/pool-task_queue[pool-queue_rear].function function;pool-task_queue[pool-queue_rear].arg arg;pool-queue_rear (pool-queue_rear 1) % pool-queue_max_size; /* 队尾指针移动, 模拟环形 */pool-queue_size;/*添加完任务后队列不为空唤醒线程池中 等待处理任务的线程*/pthread_cond_signal((pool-queue_not_empty));pthread_mutex_unlock((pool-lock));return 0;
}/* 线程池中各个工作线程 */
void *threadpool_thread(void *threadpool)
{threadpool_t *pool (threadpool_t *)threadpool;threadpool_task_t task;while (true) {/* Lock must be taken to wait on conditional variable *//*刚创建出线程等待任务队列里有任务否则阻塞等待任务队列里有任务后再唤醒接收任务*/pthread_mutex_lock((pool-lock));/*queue_size 0 说明没有任务调 wait 阻塞在条件变量上, 若有任务跳过该while*/while ((pool-queue_size 0) (!pool-shutdown)) { printf(thread 0x%x is waiting\n, (unsigned int)pthread_self());pthread_cond_wait((pool-queue_not_empty), (pool-lock));/*清除指定数目的空闲线程如果要结束的线程个数大于0结束线程*/if (pool-wait_exit_thr_num 0) {pool-wait_exit_thr_num--;/*如果线程池里线程个数大于最小值时可以结束当前线程*/if (pool-live_thr_num pool-min_thr_num) {printf(thread 0x%x is exiting\n, (unsigned int)pthread_self());pool-live_thr_num--;pthread_mutex_unlock((pool-lock));pthread_exit(NULL);}}}/*如果指定了true要关闭线程池里的每个线程自行退出处理---销毁线程池*/if (pool-shutdown) {pthread_mutex_unlock((pool-lock));printf(thread 0x%x is exiting\n, (unsigned int)pthread_self());pthread_detach(pthread_self());pthread_exit(NULL); /* 线程自行结束 */}/*从任务队列里获取任务, 是一个出队操作*/task.function pool-task_queue[pool-queue_front].function;task.arg pool-task_queue[pool-queue_front].arg;pool-queue_front (pool-queue_front 1) % pool-queue_max_size; /* 出队模拟环形队列 */pool-queue_size--;/*通知可以有新的任务添加进来*/pthread_cond_broadcast((pool-queue_not_full));/*任务取出后立即将 线程池琐 释放*/pthread_mutex_unlock((pool-lock));/*执行任务*/ printf(thread 0x%x start working\n, (unsigned int)pthread_self());pthread_mutex_lock((pool-thread_counter)); /*忙状态线程数变量琐*/pool-busy_thr_num; /*忙状态线程数1*/pthread_mutex_unlock((pool-thread_counter));(*(task.function))(task.arg); /*执行回调函数任务*///task.function(task.arg); /*执行回调函数任务*//*任务结束处理*/ printf(thread 0x%x end working\n, (unsigned int)pthread_self());pthread_mutex_lock((pool-thread_counter));pool-busy_thr_num--; /*处理掉一个任务忙状态数线程数-1*/pthread_mutex_unlock((pool-thread_counter));}pthread_exit(NULL);
}/* 管理线程 */
void *adjust_thread(void *threadpool)
{int i;threadpool_t *pool (threadpool_t *)threadpool;while (!pool-shutdown) {sleep(DEFAULT_TIME); /*定时 对线程池管理*/pthread_mutex_lock((pool-lock));int queue_size pool-queue_size; /* 关注 任务数 */int live_thr_num pool-live_thr_num; /* 存活 线程数 */pthread_mutex_unlock((pool-lock));pthread_mutex_lock((pool-thread_counter));int busy_thr_num pool-busy_thr_num; /* 忙着的线程数 */pthread_mutex_unlock((pool-thread_counter));/* 创建新线程 算法 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如3010 40100*/if (queue_size MIN_WAIT_TASK_NUM live_thr_num pool-max_thr_num) {pthread_mutex_lock((pool-lock)); int add 0;/*一次增加 DEFAULT_THREAD 个线程*/for (i 0; i pool-max_thr_num add DEFAULT_THREAD_VARY pool-live_thr_num pool-max_thr_num; i) {if (pool-threads[i] 0 || !is_thread_alive(pool-threads[i])) {pthread_create((pool-threads[i]), NULL, threadpool_thread, (void *)pool);add;pool-live_thr_num;}}pthread_mutex_unlock((pool-lock));}/* 销毁多余的空闲线程 算法忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/if ((busy_thr_num * 2) live_thr_num live_thr_num pool-min_thr_num) {/* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */pthread_mutex_lock((pool-lock));pool-wait_exit_thr_num DEFAULT_THREAD_VARY; /* 要销毁的线程数 设置为10 */pthread_mutex_unlock((pool-lock));for (i 0; i DEFAULT_THREAD_VARY; i) {/* 通知处在空闲状态的线程, 他们会自行终止*/pthread_cond_signal((pool-queue_not_empty));}}}return NULL;
}int threadpool_destroy(threadpool_t *pool)
{int i;if (pool NULL) {return -1;}pool-shutdown true;/*先销毁管理线程*/pthread_join(pool-adjust_tid, NULL);for (i 0; i pool-live_thr_num; i) {/*通知所有的空闲线程*/pthread_cond_broadcast((pool-queue_not_empty));}for (i 0; i pool-live_thr_num; i) {pthread_join(pool-threads[i], NULL);}threadpool_free(pool);return 0;
}int threadpool_free(threadpool_t *pool)
{if (pool NULL) {return -1;}if (pool-task_queue) {free(pool-task_queue);}if (pool-threads) {free(pool-threads);pthread_mutex_lock((pool-lock));pthread_mutex_destroy((pool-lock));pthread_mutex_lock((pool-thread_counter));pthread_mutex_destroy((pool-thread_counter));pthread_cond_destroy((pool-queue_not_empty));pthread_cond_destroy((pool-queue_not_full));}free(pool);pool NULL;return 0;
}int threadpool_all_threadnum(threadpool_t *pool)
{int all_threadnum -1; // 总线程数pthread_mutex_lock((pool-lock));all_threadnum pool-live_thr_num; // 存活线程数pthread_mutex_unlock((pool-lock));return all_threadnum;
}int threadpool_busy_threadnum(threadpool_t *pool)
{int busy_threadnum -1; // 忙线程数pthread_mutex_lock((pool-thread_counter));busy_threadnum pool-busy_thr_num; pthread_mutex_unlock((pool-thread_counter));return busy_threadnum;
}int is_thread_alive(pthread_t tid)
{int kill_rc pthread_kill(tid, 0); //发0号信号测试线程是否存活if (kill_rc ESRCH) {return false;}return true;
}/*测试*/ #if 1/* 线程池中的线程模拟处理业务 */
void *process(void *arg)
{printf(thread 0x%x working on task %d\n ,(unsigned int)pthread_self(),(int)arg);sleep(1); //模拟 小---大写printf(task %d is end\n,(int)arg);return NULL;
}int main(void)
{/*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/threadpool_t *thp threadpool_create(3,100,100); /*创建线程池池里最小3个线程最大100队列最大100*/printf(pool inited);//int *num (int *)malloc(sizeof(int)*20);int num[20], i;for (i 0; i 20; i) {num[i] i;printf(add task %d\n,i);/*int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg) */threadpool_add(thp, process, (void*)num[i]); /* 向线程池中添加任务 */}sleep(10); /* 等子线程完成任务 */threadpool_destroy(thp);return 0;
}#endif
8.2 请问怎么实现线程池
参考回答
设置一个生产者消费者队列作为临界资源初始化n个线程并让其运行起来加锁去队列取任务运行当任务队列为空的时候所有线程阻塞当生产者队列来了一个任务后先对队列加锁把任务挂在到队列上然后使用条件变量去通知阻塞中的一个线程