关于非阻塞I/O (NIO)
使用NIO(参见 IP配置属性 中的 using-nio
)避免为读取每个socket而分配一个专用线程。对于少量socket,您可能会发现,不使用NIO并结合异步移交(例如到QueueChannel
)的性能与使用NIO相当或更优。
当处理大量连接时,您应该考虑使用NIO。然而,使用NIO还有一些其他影响。一个线程池(在任务执行器中)在所有socket之间共享。每个传入消息被组装,并在从该池中选择的线程上作为一个独立的工作单元发送到配置的通道。到达同一socket的两个连续消息可能由不同的线程处理。这意味着消息发送到通道的顺序是不确定的。到达socket的消息的严格顺序不会得到保持。
对于某些应用程序来说,这不是问题。但对于其他应用程序来说,这是一个问题。如果您需要严格排序,请考虑将 using-nio
设置为 false
并使用异步移交。
或者,您可以在入站端点的下游插入一个重新排序器(resequencer),以将消息恢复到正确的顺序。如果您在连接工厂上将 apply-sequence
设置为 true
,则到达TCP连接的消息将设置 sequenceNumber
和 correlationId
头。重新排序器使用这些头来将消息恢复到正确的顺序。
从版本5.1.4开始,接收新连接的优先级高于从现有连接读取。通常,除非您有非常高的新传入连接速率,否则这几乎没有影响。如果您希望恢复到之前读取优先的行为,请将 TcpNioServerConnectionFactory 上的 multiAccept 属性设置为 false 。 |
线程池大小
pool size
属性不再使用。以前,它在未指定任务执行器时指定默认线程池的大小。它还用于设置服务器socket的连接积压(backlog)。第一个功能已不再需要(参见下一段)。第二个功能已被 backlog
属性取代。
以前,当将固定线程池任务执行器(这是默认设置)与NIO一起使用时,可能会发生死锁并导致处理停止。问题发生在缓冲区满时,一个从socket读取数据的线程试图向缓冲区添加更多数据,但没有可用线程来在缓冲区中腾出空间。这种情况仅在线程池大小非常小的情况下发生,但在极端条件下也可能出现。自2.2版本以来,有两个更改消除了这个问题。首先,默认任务执行器是缓存线程池执行器。其次,添加了死锁检测逻辑,以便在发生线程饥饿时,不会死锁,而是抛出异常,从而释放死锁资源。
现在默认的任务执行器是无界的,如果消息处理需要较长时间,则在高传入消息速率下可能会发生内存不足的情况。如果您的应用程序表现出这种行为,您应该使用具有适当线程池大小的池化任务执行器,但请参见 下一节。 |
带有 CALLER_RUNS
策略的线程池任务执行器
当您使用带有 CallerRunsPolicy
(使用 <task/>
命名空间时为 CALLER_RUNS
)的固定线程池且队列容量较小时,应该牢记一些重要注意事项。
如果您不使用固定线程池,则以下内容不适用。
对于NIO连接,存在三种不同的任务类型。I/O选择器处理在一个专用线程上执行(检测事件、接受新连接以及使用任务执行器将I/O读取操作分派到其他线程)。当一个I/O读取器线程(读取操作被分派到的线程)读取数据时,它将数据移交给另一个线程来组装传入消息。大型消息可能需要多次读取才能完成。这些“组装器”(assembler)线程在等待数据时可能会阻塞。当新的读取事件发生时,读取器判断该socket是否已经有组装器,如果没有,则运行一个新的。组装过程完成后,组装器线程被返回到线程池。
当线程池耗尽、使用了 CALLER_RUNS
拒绝策略且任务队列已满时,这可能导致死锁。当线程池为空且队列中没有空间时,I/O选择器线程接收到 OP_READ
事件,并使用执行器分派读取操作。队列已满,因此选择器线程自身开始读取过程。现在它检测到该socket没有组装器,并在执行读取之前启动一个组装器。再次,队列已满,选择器线程变成了组装器。组装器现在被阻塞,等待数据被读取,但这永远不会发生。连接工厂现在处于死锁状态,因为选择器线程无法处理新事件。
为了避免这种死锁,我们必须避免选择器(或读取器)线程执行组装任务。我们希望为I/O和组装操作使用独立的线程池。
框架提供了 CompositeExecutor
,它允许配置两个不同的执行器:一个用于执行I/O操作,一个用于消息组装。在这种环境中,一个I/O线程永远不会成为组装器线程,因此不会发生死锁。
此外,任务执行器应配置使用 AbortPolicy
(使用 <task>
时为 ABORT
)。当I/O任务无法完成时,它会被延迟一小段时间并持续重试,直到能够完成并分配一个组装器。默认情况下,延迟时间为100毫秒,但您可以通过在连接工厂上设置 readDelay
属性(使用XML命名空间配置时为 read-delay
)来更改它。
以下三个示例展示了如何配置复合执行器
@Bean
private CompositeExecutor compositeExecutor() {
ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
ioExec.setCorePoolSize(4);
ioExec.setMaxPoolSize(10);
ioExec.setQueueCapacity(0);
ioExec.setThreadNamePrefix("io-");
ioExec.setRejectedExecutionHandler(new AbortPolicy());
ioExec.initialize();
ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
assemblerExec.setCorePoolSize(4);
assemblerExec.setMaxPoolSize(10);
assemblerExec.setQueueCapacity(0);
assemblerExec.setThreadNamePrefix("assembler-");
assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
assemblerExec.initialize();
return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg ref="io"/>
<constructor-arg ref="assembler"/>
</bean>
<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="io-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="8" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="assembler-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
</bean>