四川建设招标网站,小说代理平台,怎么自己创建网站免费,太原百度推广优化排名4.3.4 并发性 选择器对象是线程安全的#xff0c;但它们包含的键集合不是。通过keys( )和selectKeys( )返回的键的集合是Selector对象内部的私有的Set对象集合的直接引用。这些集合可能在任意时间被改变。已注册的键的集合是只读的。如果您试图修改它#xff0c;那么您得到的…4.3.4 并发性 选择器对象是线程安全的但它们包含的键集合不是。通过keys( )和selectKeys( )返回的键的集合是Selector对象内部的私有的Set对象集合的直接引用。这些集合可能在任意时间被改变。已注册的键的集合是只读的。如果您试图修改它那么您得到的奖品将是一个java.lang.UnsupportedOperationException但是当您在观察它们的时候它们可能发生了改变的话您仍然会遇到麻烦。Iterator对象是快速失败的(fail-fast)如果底层的Set被改变了它们将会抛出java.util.ConcurrentModificationException因此如果您期望在多个线程间共享选择器和/或键请对此做好准备。您可以直接修改选择键但请注意您这么做时可能会彻底破坏另一个线程的Iterator。 如果在多个线程并发地访问一个选择器的键的集合的时候存在任何问题您可以采取一些步骤来合理地同步访问。在执行选择操作时选择器在Selector对象上进行同步然后是已注册的键的集合最后是已选择的键的集合按照这样的顺序。已取消的键的集合也在选择过程的的第1步和第3步之间保持同步当与已取消的键的集合相关的通道被注销时 在多线程的场景中如果您需要对任何一个键的集合进行更改不管是直接更改还是其他操作带来的副作用您都需要首先以相同的顺序在同一对象上进行同步。锁的过程是非常重要的。如果竞争的线程没有以相同的顺序请求锁就将会有死锁的潜在隐患。如果您可以确保否其他线程不会同时访问选择器那么就不必要进行同步了。 Selector类的close( )方法与slect( )方法的同步方式是一样的因此也有一直阻塞的可能性。在选择过程还在进行的过程中所有对close( )的调用都会被阻塞直到选择过程结束或者执行选择的线程进入睡眠。在后面的情况下执行选择的线程将会在执行关闭的线程获得锁是立即被唤醒并关闭选择器参见4.3.2小节。 4.4 异步可关闭性 任何时候都有可能关闭一个通道或者取消一个选择键。除非您采取步骤进行同步否则键的状态及相关的通道将发生意料之外的改变。一个特定的键的集合中的一个键的存在并不保证键仍然是有效的或者它相关的通道仍然是打开的。 关闭通道的过程不应该是一个耗时的操作。NIO的设计者们特别想要阻止这样的可能性一个线程在关闭一个处于选择操作中的通道时被阻塞于无限期的等待。当一个通道关闭时它相关的键也就都被取消了。这并不会影响正在进行的select( )但这意味着在您调用select( )之前仍然是有效的键在返回时可能会变为无效。您总是可以使用由选择器的selectKeys( )方法返回的已选择的键的集合请不要自己维护键的集合。理解3.4.5小节描述的选择过程对于避免遇到问题而言是非常重要的。 您可以参考4.3.2小节以详细了解一个在select( )中阻塞的线程是如何被唤醒的。如果您试图使用一个已经失效的键大多数方法将抛出CancelledKeyException。但是您可以安全地从从已取消的键中获取通道的句柄。如果通道已经关闭时仍然试图使用它的话在大多数情况下将引发ClosedChannelException 4.5 选择过程的可扩展性 我多次提到选择器可以简化用单线程同时管理多个可选择通道的实现。使用一个线程来为多个通道提供服务通过消除管理各个线程的额外开销可能会降低复杂性并可能大幅提升性能。但只使用一个线程来服务所有可选择的通道是否是一个好主意呢这要看情况。 对单CPU的系统而言这可能是一个好主意因为在任何情况下都只有一个线程能够运行。通过消除在线程之间进行上下文切换带来的额外开销总吞吐量可以得到提高。但对于一个多CPU的系统呢在一个有n个CPU的系统上当一个单一的线程线性地轮流处理每一个线程时可能有n-1个cpu处于空闲状态 那么让不同道请求不同的服务类的办法如何想象一下如果一个应用程序为大量的分布式的传感器记录信息。每个传感器在服务线程遍历每个就绪的通道时需要等待数秒钟。这在响应时间不重要时是可以的。但对于高优先级的连接如操作命令如果只用一个线程为所有通道提供服务将不得不在队列中等待。不同的应用程序的要求也是不同的。您采用的策略会受到您尝试解决的问题的影响。 在第一个场景中如果您想要将更多的线程来为通道提供服务请抵抗住使用多个选择器的欲望。在大量通道上执行就绪选择并不会有很大的开销大多数工作是由底层操作系统完成的。管理多个选择器并随机地将通道分派给它们当中的一个并不是这个问题的合理的解决方案。这只会形成这个场景的一个更小的版本。一个更好的策略是对所有的可选择通道使用一个选择器并将对就绪通道的服务委托给其他线程。您只用一个线程监控通道的就绪状态并使用一个协调好的工作线程池来处理共接收到的数据。根据部署的条件线程池的大小是可以调整的或者它自己进行动态的调整。对可选择通道的管理仍然是简单的而简单的就是好的。 第二个场景中某些通道要求比其他通道更高的响应速度可以通过使用两个选择器来解决一个为命令连接服务另一个为普通连接服务。但这种场景也可以使用与第一个场景十分相似的办法来解决。与将所有准备好的通道放到同一个线程池的做法不同通道可以根据功能由不同的工作线程来处理。它们可能可以是日志线程池命令/控制线程池状态请求线程池等等。 例 4-2的代码是例4-1的一般性的选择循环的扩展。它覆写了readDataFromSocket( )方法并使用线程池来为准备好数据用于读取的通道提供服务。与在主线程中同步地读取数据不同这个版本的实现将SelectionKey对象传递给为其服务的工作线程。 例 4-2. 使用线程池来为通道提供服务 /*** */
package test.noi.select;import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;/*** * Simple echo-back server which listens for incoming stream connections and* * echoes back whatever it reads. A single Selector object is used to listen to* * the server socket (to accept new connections) and all the active socket* * channels.* * author Ron Hitchens (ronronsoft.com)*/
public class SelectSockets {public static int PORT_NUMBER 1234;public static void main(String[] argv) throws Exception {new SelectSockets().go(argv);}public void go(String[] argv) throws Exception {int port PORT_NUMBER;if (argv.length 0) {// Override default listen portport Integer.parseInt(argv[0]);}System.out.println(Listening on port port);ServerSocketChannel serverChannel ServerSocketChannel.open();// Get the associated ServerSocket to bind it withServerSocket serverSocket serverChannel.socket();// Create a new Selector for use belowSelector selector Selector.open();// Set the port the server channel will listen toserverSocket.bind(new InetSocketAddress(port));// Set nonblocking mode for the listening socketserverChannel.configureBlocking(false);// Register the ServerSocketChannel with the SelectorserverChannel.register(selector, SelectionKey.OP_ACCEPT);while (true) {// This may block for a long time. Upon returning, the// selected set contains keys of the ready channels.int n selector.select();if (n 0) {continue;// nothing to do}// Get an iterator over the set of selected keysIterator it selector.selectedKeys().iterator();// Look at each key in the selected setwhile (it.hasNext()) {SelectionKey key (SelectionKey) it.next();// Is a new connection coming in?if (key.isAcceptable()) {ServerSocketChannel server (ServerSocketChannel) key.channel();SocketChannel channel server.accept();registerChannel(selector, channel, SelectionKey.OP_READ);sayHello(channel);}// Is there data to read on this channel?if (key.isReadable()) {readDataFromSocket(key);}// Remove key from selected set; its been handledit.remove();}}}/*** * Register the given channel with the given selector for the given ** operations of interest*/protected void registerChannel(Selector selector,SelectableChannel channel, int ops) throws Exception {if (channel null) {return; // could happen}// Set the new channel nonblockingchannel.configureBlocking(false);// Register it with the selectorchannel.register(selector, ops);}// ----------------------------------------------------------// Use the same byte buffer for all channels. A single thread is// servicing all the channels, so no danger of concurrent acccess.private ByteBuffer buffer ByteBuffer.allocateDirect(1024);/*** * Sample data handler method for a channel with data ready to read. * * param* key * A SelectionKey object associated with a channel determined by * the* selector to be ready for reading. If the channel returns* * * an EOF condition, it is closed here, which automatically * invalidates* the associated key. The selector will then * de-register the channel on* the next select call.*/protected void readDataFromSocket(SelectionKey key) throws Exception {SocketChannel socketChannel (SocketChannel) key.channel();int count;buffer.clear();// Empty buffer// Loop while data is available;channel is nonblockingwhile ((count socketChannel.read(buffer)) 0) {buffer.flip();// Make buffer readable// Send the data; dont assume it goes all at oncewhile (buffer.hasRemaining()) {socketChannel.write(buffer);}// WARNING: the above loop is evil. Because// its writing back to the same nonblocking// channel it read the data from, this code can// potentially spin in a busy loop. In real life// youd do something more useful than this.buffer.clear();// Empty buffer}if (count 0) {// Close channel on EOF, invalidates the keysocketChannel.close();}}/*** * Spew a greeting to the incoming client connection. * * param channel ** The newly connected SocketChannel to say hello to.*/private void sayHello(SocketChannel channel) throws Exception {buffer.clear();buffer.put(Hi there!\r\n.getBytes());buffer.flip();channel.write(buffer);}
} 由于执行选择过程的线程将重新循环并几乎立即再次调用select( )键的interest集合将被修改并将interest感兴趣的操作从读取就绪(read-rreadiness)状态中移除。这将防止选择器重复地调用readDataFromSocket( )因为通道仍然会准备好读取数据直到工作线程从它那里读取数据。当工作线程结束为通道提供的服务时它将再次更新键的ready集合来将interest重新放到读取就绪集合中。它也会在选择器上显式地调用wakeup( )。如果主线程在select( )中被阻塞这将使它继续执行。这个选择循环会再次执行一个轮回可能什么也没做并带着被更新的键重新进入select( )。 以上内容出自 nio 一书 转载于:https://www.cnblogs.com/mjorcen/p/4203867.html