关于非阻塞 I/O (NIO)

使用 NIO (参见 IP 配置属性 中的 using-nio) 避免了为每个套接字分配一个专用线程进行读取。对于少量套接字,您可能会发现不使用 NIO,并结合异步移交 (例如到 QueueChannel),其性能与使用 NIO 相当或更优。

当处理大量连接时,您应该考虑使用 NIO。然而,使用 NIO 还有其他影响。一个线程池 (在任务执行器中) 在所有套接字之间共享。每个传入消息都被组装并作为一个独立的工作单元发送到配置的通道,由该池中选定的线程处理。到达同一套接字的两个连续消息可能由不同的线程处理。这意味着消息发送到通道的顺序是不确定的。套接字接收到的消息的严格顺序不会被保持。

对于某些应用程序,这不是问题。对于其他应用程序,它是一个问题。如果您需要严格的顺序,请考虑将 using-nio 设置为 false 并使用异步移交。

或者,您可以在入站端点下游插入一个重排序器,将消息恢复到其正确的序列。如果您在连接工厂上将 apply-sequence 设置为 true,则到达 TCP 连接的消息将设置 sequenceNumbercorrelationId 头。重排序器使用这些头将消息恢复到其正确的序列。

从 5.1.4 版本开始,接受新连接的优先级高于从现有连接读取。通常,这影响很小,除非您有非常高的新传入连接速率。如果您希望恢复到以前读取优先的行为,请将 TcpNioServerConnectionFactory 上的 multiAccept 属性设置为 false

池大小

池大小属性不再使用。以前,它在未指定任务执行器时指定了默认线程池的大小。它还用于设置服务器套接字上的连接积压。第一个功能不再需要;请参阅下一段。第二个功能已被 backlog 属性取代。

以前,当使用固定线程池任务执行器 (这是默认值) 与 NIO 时,可能会发生死锁,导致处理停止。当缓冲区已满,一个从套接字读取的线程试图向缓冲区添加更多数据,并且没有可用线程在缓冲区中腾出空间时,就会出现问题。这只发生在池大小非常小的情况下,但在极端条件下也可能发生。自 2.2 版本以来,两个更改已消除了这个问题。首先,默认的任务执行器是一个缓存线程池执行器。其次,添加了死锁检测逻辑,如果发生线程饥饿,则会抛出异常而不是死锁,从而释放死锁资源。

现在默认的任务执行器是无界的,如果消息处理需要很长时间,则在高传入消息速率下可能会出现内存不足的情况。如果您的应用程序表现出这种行为,您应该使用具有适当池大小的池化任务执行器,但请参阅 下一节

使用 CALLER_RUNS 策略的线程池任务执行器

当您使用具有 CallerRunsPolicy (使用 <task/> 命名空间时为 CALLER_RUNS) 的固定线程池且队列容量很小时,您应该记住一些重要的注意事项。

如果您不使用固定线程池,则以下内容不适用。

对于 NIO 连接,有三种不同的任务类型。I/O 选择器处理在一个专用线程上执行 (检测事件、接受新连接以及通过任务执行器将 I/O 读取操作分派到其他线程)。当 I/O 读取器线程 (读取操作被分派到该线程) 读取数据时,它会移交给另一个线程来组装传入消息。大消息可能需要多次读取才能完成。这些“组装器”线程在等待数据时可能会阻塞。当发生新的读取事件时,读取器会确定此套接字是否已有组装器,如果没有,则运行一个新的组装器。当组装过程完成时,组装器线程返回到池中。

当池耗尽、正在使用 CALLER_RUNS 拒绝策略且任务队列已满时,这可能会导致死锁。当池为空且队列中没有空间时,IO 选择器线程会收到 OP_READ 事件并通过执行器分派读取。队列已满,因此选择器线程本身开始读取过程。现在它检测到此套接字没有组装器,并且在读取之前,启动一个组装器。同样,队列已满,选择器线程成为组装器。组装器现在被阻塞,等待永远不会发生的读取数据。连接工厂现在死锁,因为选择器线程无法处理新事件。

为了避免这种死锁,我们必须避免选择器 (或读取器) 线程执行组装任务。我们希望为 IO 和组装操作使用独立的池。

该框架提供了一个 CompositeExecutor,它允许配置两个不同的执行器:一个用于执行 IO 操作,另一个用于消息组装。在这种环境中,IO 线程永远不会成为组装器线程,因此死锁不会发生。

此外,任务执行器应配置为使用 AbortPolicy (使用 <task> 时为 ABORT)。当 I/O 任务无法完成时,它会被延迟一小段时间并持续重试,直到它可以完成并分配一个组装器。默认情况下,延迟为 100 毫秒,但您可以通过在连接工厂上设置 readDelay 属性 (使用 XML 命名空间配置时为 read-delay) 来更改它。

以下三个示例展示了如何配置复合执行器

@Bean
private CompositeExecutor compositeExecutor() {
    ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
    ioExec.setCorePoolSize(4);
    ioExec.setMaxPoolSize(10);
    ioExec.setQueueCapacity(0);
    ioExec.setThreadNamePrefix("io-");
    ioExec.setRejectedExecutionHandler(new AbortPolicy());
    ioExec.initialize();
    ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
    assemblerExec.setCorePoolSize(4);
    assemblerExec.setMaxPoolSize(10);
    assemblerExec.setQueueCapacity(0);
    assemblerExec.setThreadNamePrefix("assembler-");
    assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
    assemblerExec.initialize();
    return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg ref="io"/>
    <constructor-arg ref="assembler"/>
</bean>

<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="io-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="8" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="assembler-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="10" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
</bean>
© . This site is unofficial and not affiliated with VMware.