NIO源码分析之Selector

NIO源码分析之Selector

NIO的Selector源码第一句话就是 A multiplexor of {@link SelectableChannel} objects.

即 SelectableChannel对象的多路复用器。这很清楚的说明了Selector的作用。

这篇文章主要从以下几个点对Selector进行分析:

  • 选择器(Selector)
  • 可选择通道(SelectableChannel)
  • 选择键(SelectionKey)
  • Selector完整实例

Selector

Selector选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态。当这么做的时候,可以选择将被激发的线程挂起,直到有就绪的的通道。

下面是使用Selector管理多个channel的结构图:

  • Selector的创建

A selector may be created by invoking the open method of this class, which will use the system’s default selector provider to create a new selector. A selector may also be created by invoking the openSelector method of a custom selector provider. A selector remains open until it is closed via its close method.

可以通过调用此类的open方法来创建选择器,该方法将使用系统的默认选择器提供程序来创建新的选择器。 还可以通过调用自定义选择器提供程序的openSelector方法来创建选择器。 选择器保持打开状态,直到通过其关闭方法关闭。

Selector可以调用静态方法open()来创建Selector

Selector Selector=Selector.open()

openSelector() 是通过系统的默认获取

java.nio.channels.spi.SelectorProvider public static SelectorProvider provider()
Returns the system-wide default selector provider for this invocation of the Java virtual machine.
The first invocation of this method locates the default provider object as follows:
If the system property java.nio.channels.spi.SelectorProvider is defined then it is taken to be the fully-qualified name of a concrete provider class. The class is loaded and instantiated; if this process fails then an unspecified error is thrown.
If a provider class has been installed in a jar file that is visible to the system class loader, and that jar file contains a provider-configuration file named java.nio.channels.spi.SelectorProvider in the resource directory META-INF/services, then the first class name specified in that file is taken. The class is loaded and instantiated; if this process fails then an unspecified error is thrown.
Finally, if no provider has been specified by any of the above means then the system-default provider class is instantiated and the result is returned.
Subsequent invocations of this method return the provider that was returned by the first invocation.

java.nio.channels.spi.SelectorProvider public static SelectorProvider provider()
返回此Java虚拟机调用的系统范围默认选择器提供程序。
第一次调用此方法将按如下方式定位默认提供程序对象:
如果定义了系统属性java.nio.channels.spi.SelectorProvider,那么它将被视为具体提供程序类的完全限定名称。该类被加载并实例化;如果此过程失败,则抛出未指定的错误。
如果提供程序类已安装在对系统类加载器可见的jar文件中,并且该jar文件在资源目录META-INF / services中包含名为java.nio.channels.spi.SelectorProvider的提供程序配置文件,然后获取该文件中指定的第一个类名。该类被加载并实例化;如果此过程失败,则抛出未指定的错误。
最后,如果没有通过上述任何方式指定提供程序,则实例化系统缺省提供程序类并返回结果。
此方法的后续调用将返回第一次调用返回的提供程序。

为provider的类。

由以上的源码可见它加了锁,是线程安全的

SelectableChannel

  • SelectableChannel这个抽象类提供了实现通道的可选择性所需要的公共方法。它是所有支持就绪检查的通道类的父类。因为FileChannel类没有继承SelectableChannel因此是不是可选通道,而所有socket通道都是可选择的,包括从管道(Pipe)对象的中获得的通道。SelectableChannel可以被注册到Selector对象上,同时可以指定对那个选择器而言,那种操作是感兴趣的。一个通道可以被注册到多个选择器上,但对每个选择器而言只能被注册一次。

  • 源码上对它的介绍:

    A channel that can be multiplexed via a Selector.
    In order to be used with a selector, an instance of this class must first be registered via the register method. This method returns a new SelectionKey object that represents the channel's registration with the selector.
    Once registered with a selector, a channel remains registered until it is deregistered. This involves deallocating whatever resources were allocated to the channel by the selector.
    A channel cannot be deregistered directly; instead, the key representing its registration must be cancelled. Cancelling a key requests that the channel be deregistered during the selector's next selection operation. A key may be cancelled explicitly by invoking its cancel method. All of a channel's keys are cancelled implicitly when the channel is closed, whether by invoking its close method or by interrupting a thread blocked in an I/O operation upon the channel.
    If the selector itself is closed then the channel will be deregistered, and the key representing its registration will be invalidated, without further delay.
    A channel may be registered at most once with any particular selector.
    Whether or not a channel is registered with one or more selectors may be determined by invoking the isRegistered method.
    Selectable channels are safe for use by multiple concurrent threads.
    
    可以通过选择器进行多路复用的通道。
    为了与选择器一起使用,必须首先通过register方法注册该类的实例。此方法返回一个新的SelectionKey对象,该对象表示通道与选择器的注册。
    一旦注册选择器,通道将保持注册状态,直到它被注销。这涉及解除分配选择器分配给通道的任何资源。
    渠道不能直接注销;相反,必须取消代表其注册的密钥。取消密钥请求在选择器的下一个选择操作期间取消注册该通道。可以通过调用其cancel方法显式取消密钥。当通道关闭时,无论是通过调用其close方法还是通过中断在通道上的I / O操作中阻塞的线程,所有通道的键都会被隐式取消。
    如果选择器本身已关闭,则将取消注册该通道,并且表示其注册的密钥将无效,而不会有进一步的延迟。
    一个频道最多可以与任何特定选择器一起注册一次。
    可以通过调用isRegistered方法来确定是否向一个或多个选择器注册了频道。
    多个并发线程可以安全地使用可选择的通道。
    
  • SelectableChannel中有两个个方法为register,但是其实是一个方法

    因为底层实现都是一样的,只不过第一个方法调用时第三个参数设置为null。

    它的抽象类AbstractSelectableChannel实现了这个抽象方法,注释为

使用给定的选择器注册此通道,返回选择键。
此方法首先验证此通道是否已打开,以及给定的初始兴趣集是否有效。
如果此通道已在给定选择器中注册,则在将其兴趣设置为给定值后,将返回表示该注册的选择键。
否则,此通道尚未在给定的选择器中注册,因此在保持适当的锁定时调用选择器的register方法。返回后,生成的密钥将添加到此通道的密钥集中。

具体说明:
在SelectableChannel类中注册
PARAMS:
sel  - 要注册此通道的选择器
ops  - 为结果密钥设置的兴趣
att  - 生成密钥的附件;可能是null
返回:
表示使用给定选择器注册此通道的键
抛出:
ClosedSelectorException  - 如果选择器已关闭
IllegalBlockingModeException  - 如果此通道处于阻塞模式
IllegalSelectorException  - 如果此通道不是由与给定选择器相同的提供程序创建的
CancelledKeyException  - 如果此通道当前已在给定选择器中注册,但相应的键已被取消
IllegalArgumentException  - 如果ops set中的某个位与此通道支持的操作不对应,即,如果set&~validOps()!= 0
ClosedChannelException  - 如果此通道已关闭

   /** * Registers this channel with the given selector, returning a selection key. * * <p> This method first verifies that this channel is open and that the * given initial interest set is valid. * * <p> If this channel is already registered with the given selector then * the selection key representing that registration is returned after * setting its interest set to the given value. * * <p> Otherwise this channel has not yet been registered with the given * selector, so the {@link AbstractSelector#register register} method of * the selector is invoked while holding the appropriate locks. The * resulting key is added to this channel's key set before being returned. * </p> * * @throws ClosedSelectorException {@inheritDoc} * * @throws IllegalBlockingModeException {@inheritDoc} * * @throws IllegalSelectorException {@inheritDoc} * * @throws CancelledKeyException {@inheritDoc} * * @throws IllegalArgumentException {@inheritDoc} */
    public final SelectionKey register(Selector sel, int ops,
                                       Object att)
        throws ClosedChannelException
    {
        synchronized (regLock) {
            if (!isOpen())
                throw new ClosedChannelException();
            if ((ops & ~validOps()) != 0)
                throw new IllegalArgumentException();
            if (blocking)
                throw new IllegalBlockingModeException();
            SelectionKey k = findKey(sel);
            if (k != null) {
                k.interestOps(ops);
                k.attach(att);
            }
            if (k == null) {
                // New registration
                synchronized (keyLock) {
                    if (!isOpen())
                        throw new ClosedChannelException();
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
            }
            return k;
        }
    }

要实现Selector管理Channel,需要将channel注册到相应的Selector上,如下:

channel.configureBlocking(false);
SelectionKey key= channel.register(selector,SelectionKey.OP_READ);

通过调用通道的register()方***将它注册到一个选择器上。与Selector一起使用时,Channel必须处于非阻塞模式下,否则将抛出IllegalBlockingModeException异常,这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式,而套接字通道都可以。另外通道一旦被注册,将不能再回到阻塞状态,此时若调用通道的configureBlocking(true)将抛出BlockingModeException异常。

register()方法的第二个参数是“interest集合”,表示选择器所关心的通道操作,它实际上是一个表示选择器在检查通道就绪状态时需要关心的操作的比特掩码。比如一个选择器对通道的read和write操作感兴趣,那么选择器在检查该通道时,只会检查通道的read和write操作是否已经处在就绪状态。
它有以下四种操作类型:

  1. Connect 连接
  2. Accept 接受
  3. Read 读
  4. Write 写

需要注意并非所有的操作在所有的可选择通道上都能被支持,比如ServerSocketChannel支持Accept,而SocketChannel中不支持。我们可以通过通道上的validOps()方法来获取特定通道下所有支持的操作集合。

JAVA中定义了四个常量来表示这四种操作类型:

  1. SelectionKey.OP_CONNECT
  2. SelectionKey.OP_ACCEPT
  3. SelectionKey.OP_READ
  4. SelectionKey.OP_WRITE

如果Selector对通道的多操作类型感兴趣,可以用“位或”操作符来实现:

int interestSet=SelectionKey.OP_READ|SelectionKey.OP_WRITE;
当通道触发了某个操作之后,表示该通道的某个操作已经就绪,可以***作。因此,某个SocketChannel成功连接到另一个服务器称为“连接就绪”(OP_CONNECT)。一个ServerSocketChannel准备好接收新进入的连接称为“接收就绪”(OP_ACCEPT)。一个有数据可读的通道可以说是“读就绪”(OP_READ)。等待写数据的通道可以说是“写就绪”(OP_WRITE)。

我们注意到register()方***返回一个SelectionKey对象,我们称之为键对象。下面会对键对象详细说明。

  • ServerSocketChannel

    源码上的介绍:

    A selectable channel for stream-oriented listening sockets.
    A server-socket channel is created by invoking the open method of this class. It is not possible to create a channel for an arbitrary, pre-existing ServerSocket. A newly-created server-socket channel is open but not yet bound. An attempt to invoke the accept method of an unbound server-socket channel will cause a NotYetBoundException to be thrown. A server-socket channel can be bound by invoking one of the bind methods defined by this class.
    Socket options are configured using the setOption method. Server-socket channels support the following options:
    Option Name
    Description
    SO_RCVBUF
    The size of the socket receive buffer
    SO_REUSEADDR
    Re-use address
    Additional (implementation specific) options may also be supported.
    Server-socket channels are safe for use by multiple concurrent threads.
    
    面向流的侦听套接字的可选通道。
    通过调用此类的open方法创建服务器套接字通道。 无法为任意预先存在的ServerSocket创建通道。 新创建的服务器套接字通道已打开但尚未绑定。 尝试调用未绑定的服务器套接字通道的accept方法将导致抛出NotYetBoundException。 可以通过调用此类定义的绑定方法之一来绑定服务器套接字通道。
    使用setOption方法配置套接字选项。 服务器套接字通道支持以下选项:
    选项名称
    描述
    SO_RCVBUF
    套接字接收缓冲区的大小
    SO_REUSEADDR
    重复使用地址
    还可以支持其他(特定于实现的)选项。
    服务器套接字通道可供多个并发线程使用。
    

    ServerSocketChannel继承了AbstractSelectableChannel

    AbstractSelectableChannel继承了SelectableChannel,所以,ServerSocketChannel注册到Selector上

  • SocketChannel

    源码上的介绍:

    A selectable channel for stream-oriented connecting sockets.
    A socket channel is created by invoking one of the open methods of this class. It is not possible to create a channel for an arbitrary, pre-existing socket. A newly-created socket channel is open but not yet connected. An attempt to invoke an I/O operation upon an unconnected channel will cause a NotYetConnectedException to be thrown. A socket channel can be connected by invoking its connect method; once connected, a socket channel remains connected until it is closed. Whether or not a socket channel is connected may be determined by invoking its isConnected method.
    Socket channels support non-blocking connection: A socket channel may be created and the process of establishing the link to the remote socket may be initiated via the connect method for later completion by the finishConnect method. Whether or not a connection operation is in progress may be determined by invoking the isConnectionPending method.
    Socket channels support asynchronous shutdown, which is similar to the asynchronous close operation specified in the Channel class. If the input side of a socket is shut down by one thread while another thread is blocked in a read operation on the socket's channel, then the read operation in the blocked thread will complete without reading any bytes and will return -1. If the output side of a socket is shut down by one thread while another thread is blocked in a write operation on the socket's channel, then the blocked thread will receive an AsynchronousCloseException.
    Socket options are configured using the setOption method. Socket channels support the following options:
    Option Name
    Description
    SO_SNDBUF
    The size of the socket send buffer
    SO_RCVBUF
    The size of the socket receive buffer
    SO_KEEPALIVE
    Keep connection alive
    SO_REUSEADDR
    Re-use address
    SO_LINGER
    Linger on close if data is present (when configured in blocking mode only)
    TCP_NODELAY
    Disable the Nagle algorithm
    Additional (implementation specific) options may also be supported.
    Socket channels are safe for use by multiple concurrent threads. They support concurrent reading and writing, though at most one thread may be reading and at most one thread may be writing at any given time. The connect and finishConnect methods are mutually synchronized against each other, and an attempt to initiate a read or write operation while an invocation of one of these methods is in progress will block until that invocation is complete.
    
    用于面向流的连接套接字的可选通道。
    通过调用此类的一个打开方法来创建套接字通道。无法为任意预先存在的套接字创建通道。新创建的套接字通道已打开但尚未连接。尝试在未连接的通道上调用I / O操作将导致抛出NotYetConnectedException。可以通过调用connect方法连接套接字通道;连接后,插座通道保持连接状态,直到它关闭。是否连接套接字通道可以通过调用其isConnected方法来确定。
    套接字通道支持非阻塞连接:可以创建套接字通道,并且可以通过connect方法启动建立到远程套接字的链接的过程,以便稍后通过finishConnect方法完成。可以通过调用isConnectionPending方法来确定连接操作是否正在进行。
    套接字通道支持异步关闭,这类似于Channel类中指定的异步关闭操作。如果套接字的输入端被一个线程关闭而另一个线程在套接字通道上的读操作中被阻塞,那么被阻塞线程中的读操作将完成而不读取任何字节并返回-1。如果套接字的输出端被一个线程关闭而另一个线程在套接字通道上的写操作中被阻塞,则被阻塞的线程将收到AsynchronousCloseException。
    使用setOption方法配置套接字选项。套接字通道支持以下选项:
    选项名称
    描述
    SO_SNDBUF
    套接字发送缓冲区的大小
    SO_RCVBUF
    套接字接收缓冲区的大小
    SO_KEEPALIVE
    保持连接活着
    SO_REUSEADDR
    重复使用地址
    SO_LINGER
    如果存在数据则关闭(仅在阻止模式下配置时)
    TCP_NODELAY
    禁用Nagle算法
    还可以支持其他(特定于实现的)选项。
    套接字通道可以安全地由多个并发线程使用。它们支持并发读写,但最多只有一个线程可能正在读取,并且最多一个线程可能在任何给定时间写入。 connect和finishConnect方法彼此相互同步,并且在调用其中一个方法时尝试启动读取或写入操作将阻塞,直到该调用完成。
    

SocketChannel 继承了AbstractSelectableChannel,同ServerSocketChannel一样,可以注册到Selector。

SelectionKey

选择键封装了特定的通道与特定的选择器的注册关系。选择键对象被SelectableChannel.register()返回并提供一个表示这种注册关系的标记。选择键包含了两个比特集(以整数的形式进行编码),指示了该注册关系所关心的通道操作,以及通道已经准备好的操作。

源码中这样说:

A token representing the registration of a SelectableChannel with a Selector.
A selection key is created each time a channel is registered with a selector. A key remains valid until it is cancelled by invoking its cancel method, by closing its channel, or by closing its selector. Cancelling a key does not immediately remove it from its selector; it is instead added to the selector's cancelled-key set for removal during the next selection operation. The validity of a key may be tested by invoking its isValid method. 
A selection key contains two operation sets represented as integer values. Each bit of an operation set denotes a category of selectable operations that are supported by the key's channel.
The interest set determines which operation categories will be tested for readiness the next time one of the selector's selection methods is invoked. The interest set is initialized with the value given when the key is created; it may later be changed via the interestOps(int) method.
The ready set identifies the operation categories for which the key's channel has been detected to be ready by the key's selector. The ready set is initialized to zero when the key is created; it may later be updated by the selector during a selection operation, but it cannot be updated directly.
That a selection key's ready set indicates that its channel is ready for some operation category is a hint, but not a guarantee, that an operation in such a category may be performed by a thread without causing the thread to block. A ready set is most likely to be accurate immediately after the completion of a selection operation. It is likely to be made inaccurate by external events and by I/O operations that are invoked upon the corresponding channel.
This class defines all known operation-set bits, but precisely which bits are supported by a given channel depends upon the type of the channel. Each subclass of SelectableChannel defines an validOps() method which returns a set identifying just those operations that are supported by the channel. An attempt to set or test an operation-set bit that is not supported by a key's channel will result in an appropriate run-time exception.
It is often necessary to associate some application-specific data with a selection key, for example an object that represents the state of a higher-level protocol and handles readiness notifications in order to implement that protocol. Selection keys therefore support the attachment of a single arbitrary object to a key. An object can be attached via the attach method and then later retrieved via the attachment method.
Selection keys are safe for use by multiple concurrent threads. The operations of reading and writing the interest set will, in general, be synchronized with certain operations of the selector. Exactly how this synchronization is performed is implementation-dependent: In a naive implementation, reading or writing the interest set may block indefinitely if a selection operation is already in progress; in a high-performance implementation, reading or writing the interest set may block briefly, if at all. In any case, a selection operation will always use the interest-set value that was current at the moment that the operation began.

表示SelectableChannel与Selector注册的标记。
每次向选择器注册通道时,都会创建一个选择键。密钥保持有效,直到通过调用其取消方法,关闭其通道或关闭其选择器来取消密钥。取消密钥不会立即将其从选择器中删除;而是将其添加到选择器的已取消键集中,以便在下一个选择操作期间将其删除。可以通过调用其isValid方法来测试密钥的有效性。
选择键包含表示为整数值的两个操作集。操作集的每个位表示密钥通道支持的可选操作的类别。
- 兴趣集确定下次调用选择器的一个选择方法时将测试哪些操作类别的准备情况。利息集初始化为创建密钥时给定的值;稍后可以通过interestOps(int)方法进行更改。
就绪集合通过键的选择器识别检测到键的通道准备就绪的操作类别。创建密钥时,就绪集初始化为零;稍后可以在选择操作期间由选择器更新,但不能直接更新。
- 选择键的就绪集指示其通道已准备好用于某些操作类别是提示但不保证这样的类别中的操作可由线程执行而不会导致线程阻塞。在完成选择操作之后,就绪设置最有可能是准确的。外部事件和在相应通道上调用的I / O操作可能会使其不准确。
该类定义了所有已知的操作集位,但是精确地由给定通道支持哪些位取决于通道的类型。 SelectableChannel的每个子类定义一个validOps()方法,该方法返回一个集合,该集合仅标识通道支持的那些操作。尝试设置或测试密钥通道不支持的操作设置位将导致适当的运行时异常。
通常需要将一些特定于应用程序的数据与选择键相关联,例如,表示更高级别协议状态的对象,并处理准备就绪通知以实现该协议。因此,选择键支持将单个任意对象附加到键。可以通过attach方法附加对象,然后通过附加方法检索。
选择键可安全地供多个并发线程使用。通常,读取和写入兴趣集的操作将与选择器的某些操作同步。具体如何执行此同步依赖于实现:在一个简单的实现中,如果选择操作已在进行中,则读取或写入兴趣集可能会无限期地阻塞;在高性能实现中,如果有的话,读取或写入兴趣集可能会暂时阻止。在任何情况下,选择操作将始终使用操作开始时当前的兴趣设定值。

下图格式较为清晰:

可见interestOps() 和 readyOps()都是返回的int类型,其实他们返回的是上面OP_READ等四个常量的 “|”。

判断集合中是否有某一个常量,可以使用“&”来判断,如

(interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT

返回值为true则代表存在。

  • 该对象可以获取以下四种属性:

    • interest集合(其实是int类型,上面解释过了)
    • ready集合(其实是int类型,上面解释过了)
    • Channel
    • Selector

interest集合是Selector感兴趣的集合,用于指示选择器对通道关心的操作,可通过SelectionKey对象的interestOps()获取。最初,该兴趣集合是通道被注册到Selector时传进来的值。该集合不会被选择器改变,但是可通过interestOps()改变。

我们可以通过以下方法来判断Selector是否对Channel的某种事件感兴趣:

   int interestSet=selectionKey.interestOps();
   boolean isInterestedInAccept  = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;

ready 集合是通道已经就绪的操作的集合,表示一个通道准备好要执行的操作了,可通过SelctionKey对象的readyOps()来获取相关通道已经就绪的操作。它是interest集合的子集,并且表示了interest集合中从上次调用select()以后已经就绪的那些操作。(比如选择器对通道的ready,write操作感兴趣,而某时刻通道的read操作已经准备就绪可以被选择器获知了,前一种就是interest集合,后一种则是ready集合。)。JAVA中定义以下几个方法用来检查这些操作是否就绪:

    //int readSet=selectionKey.readOps();
    selectionKey.isAcceptable();//等价于selectionKey.readyOps()&SelectionKey.OP_ACCEPT
    selectionKey.isConnectable();
    selectionKey.isReadable();
    selectionKey.isWritable();

需要注意的是,通过相关的选择键的readyOps()方法返回的就绪状态指示只是一个提示,底层的通道在任何时候都会不断改变,而其他线程也可能在通道上执行操作并影响到它的就绪状态。另外,我们不能直接修改ready集合。

取出SelectionKey所关联的Selector和Channel
通过SelectionKey访问对应的Selector和Channel:

Channel channel =selectionKey.channel();
Selector selector=selectionKey.selector();
  • 关于取消SelectionKey对象的那点事

    我们可以通过SelectionKey对象的cancel()方法来取消特定的注册关系。

    该方法调用之后,该SelectionKey对象将会被”拷贝”至已取消键的集合中,该键此时已经失效,但是该注册关系并不会立刻终结。在下一次select()时,已取消键的集合中的元素会被清除,相应的注册关系也真正终结。

  • 为SelectionKey绑定附加对象

可以将一个或者多个附加对象绑定到SelectionKey上,以便容易的识别给定的通道。通常有两种方式:

  1. 在注册的时候直接绑定:

    SelectionKey key=channel.register(selector,SelectionKey.OP_READ,theObject); 
    
  2. 在绑定完成之后附加:

    selectionKey.attach(theObject);//绑定
    

绑定之后,可通过对应的SelectionKey取出该对象:
selectionKey.attachment();。
如果要取消该对象,则可以通过该种方式:
selectionKey.attach(null).

需要注意的是如果附加的对象不再使用,一定要人为清除,因为垃圾回收器不会回收该对象,若不清除的话会成内存泄漏。

一个单独的通道可被注册到多个选择器中,有些时候我们需要通过isRegistered()方法来检查一个通道是否已经被注册到任何一个选择器上。 通常来说,我们并不会这么做。

  • 通过Selector选择通道

我们知道选择器维护注册过的通道的集合,并且这种注册关系都被封装在SelectionKey当中。接下来我们简单的了解一下Selector维护的三种类型SelectionKey集合:

我们知道选择器维护注册过的通道的集合,并且这种注册关系都被封装在SelectionKey当中。接下来我们简单的了解一下

Selector维护的三种类型SelectionKey集合:(重点)

源码中已经说明了一切:

A multiplexor of SelectableChannel objects.
A selector may be created by invoking the open method of this class, which will use the system's default selector provider to create a new selector. A selector may also be created by invoking the openSelector method of a custom selector provider. A selector remains open until it is closed via its close method. 
A selectable channel's registration with a selector is represented by a SelectionKey object. A selector maintains three sets of selection keys:
The key set contains the keys representing the current channel registrations of this selector. This set is returned by the keys method.
The selected-key set is the set of keys such that each key's channel was detected to be ready for at least one of the operations identified in the key's interest set during a prior selection operation. This set is returned by the selectedKeys method. The selected-key set is always a subset of the key set.
The cancelled-key set is the set of keys that have been cancelled but whose channels have not yet been deregistered. This set is not directly accessible. The cancelled-key set is always a subset of the key set.
All three sets are empty in a newly-created selector.
A key is added to a selector's key set as a side effect of registering a channel via the channel's register method. Cancelled keys are removed from the key set during selection operations. The key set itself is not directly modifiable.
A key is added to its selector's cancelled-key set when it is cancelled, whether by closing its channel or by invoking its cancel method. Cancelling a key will cause its channel to be deregistered during the next selection operation, at which time the key will removed from all of the selector's key sets. 
Keys are added to the selected-key set by selection operations. A key may be removed directly from the selected-key set by invoking the set's remove method or by invoking the remove method of an iterator obtained from the set. Keys are never removed from the selected-key set in any other way; they are not, in particular, removed as a side effect of selection operations. Keys may not be added directly to the selected-key set.

SelectableChannel对象的多路复用器。
- 可以通过调用此类的open方法来创建选择器,该方法将使用系统的默认选择器提供程序来创建新的选择器。还可以通过调用自定义选择器提供程序的openSelector方法来创建选择器。选择器保持打开状态,直到通过其关闭方法关闭。
可选择通道的选择器注册由SelectionKey对象表示。选择器维护三组选择键:
重点: 密钥集包含表示此选择器的当前通道注册的键。该方法由keys方法返回。
重点: 所选择的密钥集是一组密钥,使得检测到每个密钥的信道准备好用于在先前选择操作期间在密钥的兴趣集中识别的至少一个操作。这个集由selectedKeys方法返回。选定键集始终是键集的子集。
重点: 取消密钥集是已取消但其通道尚未取消注册的密钥集。此套装无法直接访问。取消密钥集始终是密钥集的子集。
在新创建的选择器中,所有三个组都是空的。
将一个键添加到选择器的键集中,作为通过通道的寄存器方法注册通道的副作用。在选择操作期间,取消的密钥将从密钥集中删除。密钥集本身不能直接修改。
无论是通过关闭其通道还是通过调用其cancel方法,都会在取消选择器的取消键集时添加一个键。取消密钥将导致其通道在下一个选择操作期间取消注册,此时密钥将从所有选择器的密钥集中删除。
通过选择操作将键添加到选定键集。可以通过调用set的remove方法或通过调用从set中获取的迭代器的remove方法,直接从selected-key集中删除键。密钥永远不会以任何其他方式从选定密钥集中删除;特别是,它们不会作为选择操作的副作用而被删除。密钥可能无法直接添加到选定密钥集。

  • 密钥集(key set)包含表示此选择器的当前通道注册的键。该方法由keys方法返回。
  • 所选择的密钥集(selected-key set)是一组密钥,使得检测到每个密钥的信道准备好用于在先前选择操作期间在密钥的兴趣集中识别的至少一个操作。这个集由selectedKeys方法返回。选定键集始终是键集的子集。
  • 取消密钥集( cancelled-key set)是已取消但其通道尚未取消注册的密钥集。此套装无法直接访问。取消密钥集始终是密钥集的子集。

在刚初始化的Selector对象中,这三个集合都是空的。通过Selector的select()方法可以选择已经准备就绪的通道(这些通道包含你感兴趣的的事件)。比如你对读就绪的通道感兴趣,那么select()方法就会返回读事件已经就绪的那些通道。下面是Selector几个重载的select()方法:
select():阻塞到至少有一个通道在你注册的事件上就绪了。
select(long timeout):和select()一样,但最长阻塞事件为timeout毫秒。
selectNow():非阻塞,只要有通道就绪就立刻返回。

select()方法返回的int值表示有多少通道已经就绪,是自上次调用select()方法后有多少通道变成就绪状态。之前在select()调用时进入就绪的通道不会在本次调用中被记入,而在前一次select()调用进入就绪但现在已经不在处于就绪的通道也不会被记入。例如:首次调用select()方法,如果有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。

一旦调用select()方法,并且返回值不为0时,则可以通过调用Selector的selectedKeys()方法来访问已选择键集合。如下:

Set selectedKeys=selector.selectedKeys(); 

进而可以放到和某SelectionKey关联的Selector和Channel。如下所示:

Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
}

关于Selector执行选择的过程(重点)

我们知道调用select()方法进行通道,现在我们再来深入一下选择的过程,也就是select()执行过程。当select()被调用时将执行以下几步:

  1. 首先检查已取消键集合,也就是通过cancle()取消的键。如果该集合不为空,则清空该集合里的键,同时该集合中每个取消的键也将从已注册键集合和已选择键集合中移除。(**注意:**一个键被取消时,并不会立刻从集合中移除,而是将该键“拷贝”至已取消键集合中,这种取消策略就是我们常提到的“延迟取消”。)

  2. 再次检查已注册键集合(准确说是该集合中每个键的interest集合)。系统底层会依次询问每个已经注册的通道是否准备好选择器所感兴趣的某种操作,一旦发现某个通道已经就绪了,则会首先判断该通道是否已经存在在已选择键集合当中,如果已经存在,则更新该通道在已注册键集合中对应的键的ready集合,如果不存在,则首先清空该通道的对应的键的ready集合,然后重设ready集合,最后将该键存至已注册键集合中。这里需要明白,当更新ready集合时,在上次select()中已经就绪的操作不会被删除,也就是ready集合中的元素是累积的,比如在第一次的selector对某个通道的read和write操作感兴趣,在第一次执行select()时,该通道的read操作就绪,此时该通道对应的键中的ready集合存有read元素,在第二次执行select()时,该通道的write操作也就绪了,此时该通道对应的ready集合中将同时有read和write元素。

深入已注册键集合的管理(重点)

由上面贴出的源码可以知道。通过选择操作将键添加到选定键集。可以通过调用set的remove方法或通过调用从set中获取的迭代器的remove方法,直接从selected-key集中删除键。密钥永远不会以任何其他方式从选定密钥集中删除;特别是,它们不会作为选择操作的副作用而被删除。密钥可能无法直接添加到选定密钥集。

首先要记住:选择器不会主动删除被添加到已选择键集合中的键,而且被添加到已选择键集合中的键的ready集合只能被设置,而不能被清理。如果我们希望清空已选择键集合中某个键的ready集合该怎么办?我们知道一个键在新加入已选择键集合之前会首先置空该键的ready集合,这样的话我们可以人为的将某个键从已注册键集合中移除最终实现置空某个键的ready集合。被移除的键如果在下一次的select()中再次就绪,它将会重新被添加到已选择的键的集合中。这就是为什么要在每次迭代的末尾调用

keyIterator.remove()

停止选择

选择器执行选择的过程,系统底层会依次询问每个通道是否已经就绪,这个过程可能会造成调用线程进入阻塞状态,那么我们有以下三种方式可以唤醒在select()方法中阻塞的线程。

  1. 通过调用Selector对象的wakeup()方法让处在阻塞状态的select()方法立刻返回

    该方法使得选择器上的第一个还没有返回的选择操作立即返回。如果当前没有进行中的选择操作,那么下一次对select()方法的一次调用将立即返回。

  2. 通过close()方法关闭Selector

    该方法使得任何一个在选择操作中阻塞的线程都被唤醒(类似wakeup()),同时使得注册到该Selector的所有Channel被注销,所有的键将被取消,但是Channel本身并不会关闭。

  3. 调用interrupt()

    调用该方***使睡眠的线程抛出InterruptException异常,捕获该异常并在调用wakeup()

Selector完整实例

这里我们结合ServerSocketChannel和Selector构建简单的服务器,下面是完整的代码示例。
服务端代码:

public class ServerSocketChannelTest {

    private int size = 1024;
    private ServerSocketChannel socketChannel;
    private ByteBuffer byteBuffer;
    private Selector selector;
    private final int port = 8998;
    private int remoteClientNum=0;

    public ServerSocketChannelTest() {
        try {
            initChannel();
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }

    public void initChannel() throws Exception {
        socketChannel = ServerSocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.bind(new InetSocketAddress(port));
        System.out.println("listener on port:" + port);
        selector = Selector.open();
        socketChannel.register(selector, SelectionKey.OP_ACCEPT);
        byteBuffer = ByteBuffer.allocateDirect(size);
        byteBuffer.order(ByteOrder.BIG_ENDIAN);
    }

    private void listener() throws Exception {
        while (true) {
            int n = selector.select();
            if (n == 0) {
                continue;
            }
            Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey key = ite.next();
                //a connection was accepted by a ServerSocketChannel.
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel channel = server.accept();
                    registerChannel(selector, channel, SelectionKey.OP_READ);
                    remoteClientNum++;
                    System.out.println("online client num="+remoteClientNum);
                    replyClient(channel);
                }
                //a channel is ready for reading
                if (key.isReadable()) {
                    readDataFromSocket(key);
                }

                ite.remove();//must
            }

        }
    }

    protected void readDataFromSocket(SelectionKey key) throws Exception {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        int count;
        byteBuffer.clear();
        while ((count = socketChannel.read(byteBuffer)) > 0) {
            byteBuffer.flip(); // Make buffer readable
            // Send the data; don't assume it goes all at once
            while (byteBuffer.hasRemaining()) {
                socketChannel.write(byteBuffer);
            }
            byteBuffer.clear(); // Empty buffer
        }
        if (count < 0) {
            socketChannel.close();
        }
    }

    private void replyClient(SocketChannel channel) throws IOException {
        byteBuffer.clear();
        byteBuffer.put("hello client!\r\n".getBytes());
        byteBuffer.flip();
        channel.write(byteBuffer);
    }

    private void registerChannel(Selector selector, SocketChannel channel, int ops) throws Exception {
        if (channel == null) {
            return;
        }
        channel.configureBlocking(false);
        channel.register(selector, ops);
    }


    public static void main(String[] args) {
        try {
            new ServerSocketChannelTest().listener();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

客户端代码:

public class SocketChannelTest {

    private int size = 1024;
    private ByteBuffer byteBuffer;
    private SocketChannel socketChannel;

    public void connectServer() throws IOException {
        socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8998));
        byteBuffer = ByteBuffer.allocate(size);
        byteBuffer.order(ByteOrder.BIG_ENDIAN);
        receive();
    }

    private void receive() throws IOException {
        while (true) {
            int count;
            byteBuffer.clear();
            while ((count = socketChannel.read(byteBuffer)) > 0) {
                byteBuffer.flip();
                while (byteBuffer.hasRemaining()) {
                    System.out.print((char) byteBuffer.get());
                }
                //send("send data to server\r\n".getBytes());
                byteBuffer.clear();
            }
        }
    }

    private void send(byte[] data) throws IOException {
        byteBuffer.clear();
        byteBuffer.put(data);
        byteBuffer.flip();
        socketChannel.write(byteBuffer);
    }

    public static void main(String[] args) throws IOException {
        new SocketChannelTest().connectServer();
    }
}

本文参考了:https://blog.csdn.net/dd864140130/article/details/50299687,修正了其中的错误,并以源代码为依据,重新梳理了整个Selector。整个过程花费时间长,收获也很多,特别是对Selector,SelectorKey有了更深一个层次的了解。

全部评论

相关推荐

联通 技术人员 总包不低于12
点赞 评论 收藏
分享
点赞 收藏 评论
分享
牛客网
牛客企业服务