入站通道适配器

入站通道适配器的主要功能是执行 SQL SELECT 查询并将结果集转换为消息。消息负载是整个结果集(表示为一个 List),列表项的类型取决于行映射策略。默认策略是一个通用映射器,它为查询结果中的每一行返回一个 Map。您可以选择通过添加对 RowMapper 实例的引用来更改此行为(有关行映射的详细信息,请参阅Spring JDBC 文档)。

如果您希望将 SELECT 查询结果中的行转换为单独的消息,可以使用下游的拆分器(splitter)。

入站适配器还需要对 JdbcTemplate 实例或 DataSource 的引用。

除了用于生成消息的 SELECT 语句外,适配器还有一个 UPDATE 语句,用于将记录标记为已处理,以便它们不会出现在下一次轮询中。更新操作可以通过原始 SELECT 查询返回的 ID 列表进行参数化。默认情况下,这是通过命名约定完成的(输入结果集中名为 id 的列会被转换为更新操作的参数 Map 中名为 id 的列表)。以下示例定义了一个带有更新查询和 DataSource 引用的入站通道适配器。

<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
    channel="target" data-source="dataSource"
    update="update item set status=10 where id in (:id)" />
更新查询中的参数使用参数名称前加冒号(:)指定(在前一个示例中,这是一个应用于轮询结果集中每行的表达式)。这是 Spring JDBC 中命名参数 JDBC 支持的标准特性,并结合了 Spring Integration 中采用的约定(投影到轮询结果列表)。底层的 Spring JDBC 功能限制了可用表达式(例如,大多数特殊字符除了句点外都不允许),但由于目标通常是一个对象列表(可能是单项列表),这些对象可以通过 bean 路径寻址,因此这并没有过度限制。

要更改参数生成策略,您可以向适配器注入 SqlParameterSourceFactory 来覆盖默认行为(适配器有一个 sql-parameter-source-factory 属性)。Spring Integration 提供了 ExpressionEvaluatingSqlParameterSourceFactory,它创建一个基于 SpEL 的参数源,并将查询结果作为 #root 对象。(如果 update-per-row 为 true,则根对象是单行)。如果同一个参数名称在更新查询中多次出现,它只会被求值一次,并且其结果会被缓存。

您还可以为 select 查询使用参数源。在这种情况下,由于没有要对其进行求值的“结果”对象,因此每次使用单个参数源(而不是使用参数源工厂)。从 4.0 版本开始,您可以使用 Spring 创建基于 SpEL 的参数源,如下例所示

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
	channel="target" data-source="dataSource"
	select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
			factory-method="createParameterSourceNoCache">
	<constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
		class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
	<property name="parameterExpressions">
		<map>
			<entry key="status" value="@statusBean.which()" />
		</map>
	</property>
</bean>

<bean id="statusBean" class="foo.StatusDetermination" />

每个参数表达式中的 value 可以是任何有效的 SpEL 表达式。表达式求值的 #root 对象是在 parameterSource bean 上定义的构造函数参数。对于所有求值,它都是静态的(在前一个示例中,是一个空的 String)。

从 5.0 版本开始,您可以为 ExpressionEvaluatingSqlParameterSourceFactory 提供 sqlParameterTypes 以指定特定参数的目标 SQL 类型。

以下示例为查询中使用的参数提供了 SQL 类型

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
    channel="target" data-source="dataSource"
    select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
            factory-method="createParameterSourceNoCache">
    <constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
        class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="sqlParameterTypes">
        <map>
            <entry key="status" value="#{ T(java.sql.Types).BINARY}" />
        </map>
    </property>
</bean>
使用 createParameterSourceNoCache 工厂方法。否则,参数源会缓存求值结果。另请注意,由于缓存被禁用,如果同一个参数名称在 select 查询中多次出现,则每次出现时都会重新求值。

轮询与事务

入站适配器接受常规的 Spring Integration 轮询器作为子元素。因此,可以控制轮询的频率(以及其他用途)。对于 JDBC 用法,轮询器的一个重要特性是可以在事务中包装轮询操作,如下例所示

<int-jdbc:inbound-channel-adapter query="..."
        channel="target" data-source="dataSource" update="...">
    <int:poller fixed-rate="1000">
        <int:transactional/>
    </int:poller>
</int-jdbc:inbound-channel-adapter>
如果您没有显式指定轮询器,则使用默认值。与 Spring Integration 通常的做法一样,默认轮询器可以定义为顶级 Bean)。

在前面的示例中,数据库每 1000 毫秒(即每秒一次)轮询一次,并且更新和 SELECT 查询都在同一事务中执行。此处未显示事务管理器配置。但是,只要事务管理器知道数据源,轮询就是事务性的。一个常见的用例是下游通道是直接通道(默认),这样端点在同一线程中调用,因此也在同一事务中。这样,如果其中任何一个失败,事务将回滚,输入数据将恢复到其原始状态。

max-rowsmax-messages-per-poll

JDBC 入站通道适配器定义了一个名为 max-rows 的属性。当您指定适配器的轮询器时,您还可以定义一个名为 max-messages-per-poll 的属性。虽然这两个属性看起来相似,但它们的含义却完全不同。

max-messages-per-poll 指定每个轮询间隔内查询执行的次数,而 max-rows 指定每次执行返回的行数。

在正常情况下,您可能不希望在使用 JDBC 入站通道适配器时设置轮询器的 max-messages-per-poll 属性。其默认值为 1,这意味着 JDBC 入站通道适配器的 receive() 方法在每个轮询间隔内精确执行一次。

max-messages-per-poll 属性设置为更大的值意味着查询会连续执行多次。有关 max-messages-per-poll 属性的更多信息,请参阅配置入站通道适配器

相比之下,max-rows 属性(如果大于 0)指定了从 receive() 方法创建的查询结果集中使用的最大行数。如果属性设置为 0,则所有行都包含在结果消息中。此属性默认为 0

建议通过特定供应商的查询选项来限制结果集,例如 MySQL 的 LIMIT 或 SQL Server 的 TOP 或 Oracle 的 ROWNUM。有关详细信息,请参阅特定供应商的文档。