关于非阻塞 I/O (NIO)
使用 NIO(参见 IP 配置属性 中的 using-nio
)避免为每个套接字分配一个线程进行读取。对于少量套接字,您可能会发现不使用 NIO,以及异步传递(例如到 QueueChannel
),其性能与使用 NIO 相当或更好。
当处理大量连接时,您应该考虑使用 NIO。但是,使用 NIO 有一些其他的影响。一个线程池(在任务执行器中)在所有套接字之间共享。每个传入消息都会被组装并作为单独的工作单元发送到配置的通道,由从该池中选择的线程处理。到达同一个套接字的两个连续消息可能会由不同的线程处理。这意味着消息发送到通道的顺序是不确定的。不会维护到达套接字的消息的严格顺序。
对于某些应用程序,这不是问题。对于其他应用程序,这是一个问题。如果您需要严格的顺序,请考虑将 using-nio
设置为 false
并使用异步传递。
或者,您可以在入站端点下游插入一个重新排序器,以将消息返回到其正确的顺序。如果您将连接工厂上的 apply-sequence
设置为 true
,到达 TCP 连接的消息将设置 sequenceNumber
和 correlationId
头。重新排序器使用这些头将消息返回到其正确的顺序。
从 5.1.4 版本开始,接受新连接的优先级高于从现有连接读取。一般来说,这应该不会有太大影响,除非您有非常高的新传入连接速率。如果您希望恢复到以前的行为,即优先读取,请将 TcpNioServerConnectionFactory 上的 multiAccept 属性设置为 false 。
|
池大小
池大小属性不再使用。以前,它指定了在未指定任务执行器时默认线程池的大小。它也用于设置服务器套接字上的连接积压。第一个功能不再需要(参见下一段)。第二个功能被 backlog
属性取代。
以前,当使用固定线程池任务执行器(默认情况下)与 NIO 时,可能会出现死锁,导致处理停止。问题发生在缓冲区已满时,从套接字读取数据的线程试图向缓冲区添加更多数据,而没有线程可用以释放缓冲区中的空间。这仅在池大小非常小时才会发生,但在极端情况下可能会发生。从 2.2 版本开始,两个更改消除了此问题。首先,默认任务执行器是缓存线程池执行器。其次,添加了死锁检测逻辑,这样,如果发生线程饥饿,则不会出现死锁,而是抛出异常,从而释放死锁的资源。
现在默认任务执行器是无界的,如果传入消息的速率很高,而消息处理需要很长时间,则可能会出现内存不足的情况。如果您的应用程序表现出这种行为,您应该使用具有适当池大小的池化任务执行器,但请参见下一节。 |
使用 CALLER_RUNS
策略的线程池任务执行器
当您使用固定线程池和 CallerRunsPolicy
(在使用 <task/>
命名空间时为 CALLER_RUNS
)并且队列容量很小时,您应该牢记一些重要的注意事项。
如果您不使用固定线程池,则以下内容不适用。
对于 NIO 连接,有三种不同的任务类型。I/O 选择器处理在一个专用线程上执行(检测事件、接受新连接,并使用任务执行器将 I/O 读取操作分派到其他线程)。当 I/O 读取器线程(读取操作被分派到的线程)读取数据时,它会将数据传递给另一个线程以组装传入的消息。大型消息可能需要多次读取才能完成。这些“组装器”线程在等待数据时可能会阻塞。当发生新的读取事件时,读取器会确定此套接字是否已有一个组装器,如果没有,则运行一个新的组装器。当组装过程完成后,组装器线程将返回到池中。
当池子耗尽、使用 `CALLER_RUNS` 拒绝策略且任务队列已满时,这会导致死锁。当池子为空且队列中没有空间时,IO 选择器线程会收到 `OP_READ` 事件并使用执行器调度读取操作。由于队列已满,选择器线程本身开始读取过程。现在它检测到没有此套接字的组装器,并且在读取之前,它会触发一个组装器。同样,队列已满,选择器线程成为组装器。组装器现在被阻塞,等待数据被读取,但数据永远不会被读取。连接工厂现在处于死锁状态,因为选择器线程无法处理新的事件。
为了避免这种死锁,我们必须避免选择器(或读取器)线程执行组装任务。我们希望为 IO 操作和组装操作使用单独的池子。
框架提供了一个 `CompositeExecutor`,它允许配置两个不同的执行器:一个用于执行 IO 操作,另一个用于消息组装。在这种环境下,IO 线程永远不会成为组装器线程,死锁就不会发生。
此外,任务执行器应该配置为使用 `AbortPolicy`(在使用 `<task>` 时为 `ABORT`)。当 I/O 任务无法完成时,它会被延迟一小段时间,并不断重试,直到它能够完成并分配一个组装器。默认情况下,延迟为 100 毫秒,但您可以通过设置连接工厂的 `readDelay` 属性(在使用 XML 命名空间配置时为 `read-delay`)来更改它。
以下三个示例展示了如何配置复合执行器
@Bean
private CompositeExecutor compositeExecutor() {
ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
ioExec.setCorePoolSize(4);
ioExec.setMaxPoolSize(10);
ioExec.setQueueCapacity(0);
ioExec.setThreadNamePrefix("io-");
ioExec.setRejectedExecutionHandler(new AbortPolicy());
ioExec.initialize();
ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
assemblerExec.setCorePoolSize(4);
assemblerExec.setMaxPoolSize(10);
assemblerExec.setQueueCapacity(0);
assemblerExec.setThreadNamePrefix("assembler-");
assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
assemblerExec.initialize();
return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg ref="io"/>
<constructor-arg ref="assembler"/>
</bean>
<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="io-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="8" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="assembler-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
</bean>