互联网做视频网站需要许可证吗,网站开发外快,重庆景点介绍,诚企信年报开发高性能网络程序时#xff0c;windows开发者们言必称iocp#xff0c;linux开发者们则言必称epoll。大家都明白epoll是一种IO多路复用技术#xff0c;可以非常高效的处理数以百万计的socket句柄#xff0c;比起以前的select和poll效率高大发了。我们用起epoll来都感觉挺爽…开发高性能网络程序时windows开发者们言必称iocplinux开发者们则言必称epoll。大家都明白epoll是一种IO多路复用技术可以非常高效的处理数以百万计的socket句柄比起以前的select和poll效率高大发了。我们用起epoll来都感觉挺爽确实快那么它到底为什么可以高速处理这么多并发连接呢
先简单回顾下如何使用C库封装的3个epoll系统调用吧。
[cpp]view plaincopy 1 int epoll_create(int size); 2 int epoll_ctl(int epfd, int op, int fd, struct epoll_event*event); 3 int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout); 使用起来很清晰首先要调用epoll_create建立一个epoll对象。参数size是内核保证能够正确处理的最大句柄数多于这个最大数时内核可不保证效果。
epoll_ctl可以操作上面建立的epoll例如将刚建立的socket加入到epoll中让其监控或者把 epoll正在监控的某个socket句柄移出epoll不再监控它等等。
epoll_wait在调用时在给定的timeout时间内当在监控的所有句柄中有事件发生时就返回用户态的进程。 从上面的调用方式就可以看到epoll比select/poll的优越之处因为后者每次调用时都要传递你所要监控的所有socket给select/poll系统调用这意味着需要将用户态的socket列表copy到内核态如果以万计的句柄会导致每次都要copy几十几百KB的内存到内核态非常低效。而我们调用epoll_wait时就相当于以往调用select/poll但是这时却不用传递socket句柄给内核因为内核已经在epoll_ctl中拿到了要监控的句柄列表。 所以实际上在你调用epoll_create后内核就已经在内核态开始准备帮你存储要监控的句柄了每次调用epoll_ctl只是在往内核的数据结构里塞入新的socket句柄。 在内核里一切皆文件。所以epoll向内核注册了一个文件系统用于存储上述的被监控socket。当你调用epoll_create时就会在这个虚拟的epoll文件系统里创建一个file结点。当然这个file不是普通文件它只服务于epoll。 epoll在被内核初始化时操作系统启动同时会开辟出epoll自己的内核高速cache区用于安置每一个我们想监控的socket这些socket会以红黑树的形式保存在内核cache里以支持快速的查找、插入、删除。这个内核高速cache区就是建立连续的物理内存页然后在之上建立slab层简单的说就是物理上分配好你想要的size的内存对象每次使用时都是使用空闲的已分配好的对象。
[cpp]view plaincopy 4 staticint __init eventpoll_init(void) 5 { 6 ...... 7 8 /* Allocates slabcache used to allocate struct epitem items */ 9 epi_cache kmem_cache_create(eventpoll_epi, sizeof(struct epitem), 10 0,SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC, 11 NULL, NULL); 12 13 /* Allocates slab cache used to allocatestruct eppoll_entry */ 14 pwq_cache kmem_cache_create(eventpoll_pwq, 15 sizeof(struct eppoll_entry),0, 16 EPI_SLAB_DEBUG|SLAB_PANIC, NULL,NULL); 17 18 ... ... epoll的高效就在于当我们调用epoll_ctl往里塞入百万个句柄时epoll_wait仍然可以飞快的返回并有效的将发生事件的句柄给我们用户。这是由于我们在调用epoll_create时内核除了帮我们在epoll文件系统里建了个file结点在内核cache里建了个红黑树用于存储以后epoll_ctl传来的socket外还会再建立一个list链表用于存储准备就绪的事件当epoll_wait调用时仅仅观察这个list链表里有没有数据即可。有数据就返回没有数据就sleep等到timeout时间到后即使链表没数据也返回。所以epoll_wait非常高效。 而且通常情况下即使我们要监控百万计的句柄大多一次也只返回很少量的准备就绪句柄而已所以epoll_wait仅需要从内核态copy少量的句柄到用户态而已如何能不高效 那么这个准备就绪list链表是怎么维护的呢当我们执行epoll_ctl时除了把socket放到epoll文件系统里file对象对应的红黑树上之外还会给内核中断处理程序注册一个回调函数告诉内核如果这个句柄的中断到了就把它放到准备就绪list链表里。所以当一个socket上有数据到了内核在把网卡上的数据copy到内核中后就来把socket插入到准备就绪链表里了。 如此一颗红黑树一张准备就绪句柄链表少量的内核cache就帮我们解决了大并发下的socket处理问题。执行epoll_create时创建了红黑树和就绪链表执行epoll_ctl时如果增加socket句柄则检查在红黑树中是否存在存在立即返回不存在则添加到树干上然后向内核注册回调函数用于当中断事件来临时向准备就绪链表中插入数据。执行epoll_wait时立刻返回准备就绪链表里的数据即可。 最后看看epoll独有的两种模式LT和ET。无论是LT和ET模式都适用于以上所说的流程。区别是LT模式下只要一个句柄上的事件一次没有处理完会在以后调用epoll_wait时次次返回这个句柄而ET模式仅在第一次返回。 这件事怎么做到的呢当一个socket句柄上有事件时内核会把该句柄插入上面所说的准备就绪list链表这时我们调用epoll_wait会把准备就绪的socket拷贝到用户态内存然后清空准备就绪list链表最后epoll_wait干了件事就是检查这些socket如果不是ET模式就是LT模式的句柄了并且这些socket上确实有未处理的事件时又把该句柄放回到刚刚清空的准备就绪链表了。所以非ET的句柄只要它上面还有事件epoll_wait每次都会返回。而ET模式的句柄除非有新中断到即使socket上的事件没有处理完也是不会次次从epoll_wait返回的
#include sys/socket.h #include netinet/in.h #include arpa/inet.h #include assert.h #include stdio.h #include unistd.h #include errno.h #include string.h #include fcntl.h #include stdlib.h #include sys/epoll.h #include pthread.h #define MAX_EVENT_NUMBER 1024 #define BUFFER_SIZE 10 int setnonblocking( int fd ) { int old_option fcntl( fd, F_GETFL ); int new_option old_option | O_NONBLOCK; fcntl( fd, F_SETFL, new_option ); return old_option; } //添加fd void addfd( int epollfd, int fd, bool enable_et ) { epoll_event event; event.data.fd fd; event.events EPOLLIN; //是否开启et模式 if( enable_et ) { event.events | EPOLLET; } epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, event ); setnonblocking( fd ); } //LT模式的工作原理 void lt( epoll_event* events, int number, int epollfd, int listenfd ) { char buf[ BUFFER_SIZE ]; for ( int i 0; i number; i ) { int sockfd events[i].data.fd; //处理用户注册事件 if ( sockfd listenfd ) { struct sockaddr_in client_address; socklen_t client_addrlength sizeof( client_address ); int connfd accept( listenfd, ( struct sockaddr* )client_address, client_addrlength ); addfd( epollfd, connfd, false ); }//可读事件 else if ( events[i].events EPOLLIN ) { //只要socket读缓存中还有未读出的数据这段代码就被触发 printf( event trigger once\n ); memset( buf, \0, BUFFER_SIZE ); int ret recv( sockfd, buf, BUFFER_SIZE-1, 0 ); if( ret 0 ) { close( sockfd );//表明当前套接字已经关闭 continue; } printf( get %d bytes of content: %s\n, ret, buf ); } else { printf( something else happened \n ); } } } //ET模式 void et( epoll_event* events, int number, int epollfd, int listenfd ) { char buf[ BUFFER_SIZE ]; for ( int i 0; i number; i ) { int sockfd events[i].data.fd; if ( sockfd listenfd ) { struct sockaddr_in client_address; socklen_t client_addrlength sizeof( client_address ); int connfd accept( listenfd, ( struct sockaddr* )client_address, client_addrlength ); addfd( epollfd, connfd, true ); } else if ( events[i].events EPOLLIN ) { printf( event trigger once\n ); //这段代码不会重复触发因此需要我们循环读取数据以确保socket读缓存中的所有数据全部读出 while( 1 ) { memset( buf, \0, BUFFER_SIZE ); int ret recv( sockfd, buf, BUFFER_SIZE-1, 0 ); if( ret 0 ) { //对于非阻塞IO条件成立表明数据全部读取完毕此后。epoll就能再次触发 //sockfd上的epollin事件以驱动下一次读事件 if( ( errno EAGAIN ) || ( errno EWOULDBLOCK ) ) { printf( read later\n ); break; } close( sockfd ); break; } else if( ret 0 ) { close( sockfd ); } else { printf( get %d bytes of content: %s\n, ret, buf ); } } } else { printf( something else happened \n ); } } } int main( int argc, char* argv[] ) { if( argc 2 ) { printf( usage: %s ip_address port_number\n, basename( argv[0] ) ); return 1; } const char* ip argv[1]; int port atoi( argv[2] ); int ret 0; struct sockaddr_in address; bzero( address, sizeof( address ) ); address.sin_family AF_INET; inet_pton( AF_INET, ip, address.sin_addr ); address.sin_port htons( port ); int listenfd socket( PF_INET, SOCK_STREAM, 0 ); assert( listenfd 0 ); ret bind( listenfd, ( struct sockaddr* )address, sizeof( address ) ); assert( ret ! -1 ); ret listen( listenfd, 5 ); assert( ret ! -1 ); epoll_event events[ MAX_EVENT_NUMBER ]; int epollfd epoll_create( 5 ); assert( epollfd ! -1 ); addfd( epollfd, listenfd, true ); while( 1 ) { int ret epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 ); if ( ret 0 ) { printf( epoll failure\n ); break; } //测试不同模式 lt( events, ret, epollfd, listenfd ); et( events, ret, epollfd, listenfd ); } close( listenfd ); return 0; }