入站通道适配器

入站通道适配器的主要功能是执行 SQL SELECT 查询并将结果集转换为消息。消息有效负载是整个结果集(表示为 List),列表中项目的类型取决于行映射策略。默认策略是通用映射器,它为查询结果中的每一行返回一个 Map。可选地,您可以通过添加对 RowMapper 实例的引用来更改此策略(有关行映射的更多详细信息,请参阅 Spring JDBC 文档)。

如果您想将 SELECT 查询结果中的行转换为单个消息,可以使用下游拆分器。

入站适配器还需要引用 JdbcTemplate 实例或 DataSource

除了用于生成消息的 SELECT 语句之外,适配器还具有一个 UPDATE 语句,用于将记录标记为已处理,以便它们不会出现在下次轮询中。更新可以通过原始选择中的 ID 列表进行参数化。默认情况下,这是通过命名约定完成的(输入结果集中名为 id 的列被转换为更新参数映射中的名为 id 的列表)。以下示例定义了一个带有更新查询和 DataSource 引用的入站通道适配器。

<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
    channel="target" data-source="dataSource"
    update="update item set status=10 where id in (:id)" />
更新查询中的参数使用冒号 (:) 前缀指定参数名称(在前面的示例中,它是应用于轮询结果集中每一行的表达式)。这是 Spring JDBC 中命名参数 JDBC 支持的标准功能,结合了 Spring Integration 中采用的约定(投影到轮询结果列表)。底层的 Spring JDBC 功能限制了可用的表达式(例如,大多数除句点以外的特殊字符都不允许),但由于目标通常是可以通过 bean 路径寻址的对象列表(可能是单个对象的列表),因此这并不 unduly 限制。

要更改参数生成策略,您可以将 SqlParameterSourceFactory 注入适配器以覆盖默认行为(适配器具有 sql-parameter-source-factory 属性)。Spring Integration 提供 ExpressionEvaluatingSqlParameterSourceFactory,它创建基于 SpEL 的参数源,查询结果作为 #root 对象。(如果 update-per-row 为 true,则根对象是行)。如果更新查询中多次出现相同的参数名称,则只评估一次,其结果将被缓存。

您也可以为 select 查询使用参数源。在这种情况下,由于没有“结果”对象可供评估,因此每次都使用单个参数源(而不是使用参数源工厂)。从 4.0 版开始,您可以使用 Spring 创建基于 SpEL 的参数源,如下例所示

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
	channel="target" data-source="dataSource"
	select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
			factory-method="createParameterSourceNoCache">
	<constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
		class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
	<property name="parameterExpressions">
		<map>
			<entry key="status" value="@statusBean.which()" />
		</map>
	</property>
</bean>

<bean id="statusBean" class="foo.StatusDetermination" />

每个参数表达式中的 value 可以是任何有效的 SpEL 表达式。表达式评估的 #root 对象是 parameterSource bean 上定义的构造函数参数。它对所有评估都是静态的(在前面的示例中,是一个空的 String)。

从 5.0 版开始,您可以为 ExpressionEvaluatingSqlParameterSourceFactory 提供 sqlParameterTypes 来指定特定参数的目标 SQL 类型。

以下示例为查询中使用的参数提供 SQL 类型

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
    channel="target" data-source="dataSource"
    select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
            factory-method="createParameterSourceNoCache">
    <constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
        class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="sqlParameterTypes">
        <map>
            <entry key="status" value="#{ T(java.sql.Types).BINARY}" />
        </map>
    </property>
</bean>
使用 createParameterSourceNoCache 工厂方法。否则,参数源会缓存评估结果。还要注意,由于禁用缓存,如果 select 查询中多次出现相同的参数名称,则会针对每次出现重新评估它。

轮询和事务

入站适配器接受一个常规的 Spring Integration 轮询器作为子元素。因此,可以控制轮询的频率(以及其他用途)。JDBC 使用的轮询器的一个重要功能是将轮询操作包装在事务中的选项,如下例所示

<int-jdbc:inbound-channel-adapter query="..."
        channel="target" data-source="dataSource" update="...">
    <int:poller fixed-rate="1000">
        <int:transactional/>
    </int:poller>
</int-jdbc:inbound-channel-adapter>
如果您没有显式指定轮询器,则会使用默认值。与 Spring Integration 一样,它可以定义为顶级 bean)。

在前面的示例中,数据库每 1000 毫秒(或每秒一次)轮询一次,更新和 select 查询都在同一事务中执行。事务管理器配置未显示。但是,只要它知道数据源,轮询就是事务性的。一个常见的用例是下游通道是直接通道(默认值),以便端点在同一个线程中调用,因此在同一个事务中。这样,如果其中任何一个失败,事务就会回滚,输入数据将恢复到其原始状态。

max-rowsmax-messages-per-poll

JDBC 入站通道适配器定义了一个名为 max-rows 的属性。当您指定适配器的轮询器时,您还可以定义一个名为 max-messages-per-poll 的属性。虽然这两个属性看起来很相似,但它们的含义却大不相同。

max-messages-per-poll 指定了在每个轮询间隔内查询执行的次数,而 max-rows 指定了每次执行返回的行数。

在正常情况下,当使用 JDBC 入站通道适配器时,您可能不希望设置轮询器的 max-messages-per-poll 属性。它的默认值为 1,这意味着 JDBC 入站通道适配器的 receive() 方法在每个轮询间隔内只执行一次。

max-messages-per-poll 属性设置为更大的值意味着查询将被执行多次,并且这些执行是连续的。有关 max-messages-per-poll 属性的更多信息,请参阅 配置入站通道适配器

相反,max-rows 属性(如果大于 0)指定了从 receive() 方法创建的查询结果集中使用的最大行数。如果该属性设置为 0,则所有行都包含在结果消息中。该属性的默认值为 0

建议使用特定于供应商的查询选项来限制结果集,例如 MySQL 的 LIMIT、SQL Server 的 TOP 或 Oracle 的 ROWNUM。有关更多信息,请参阅特定供应商的文档。