入站通道适配器
入站通道适配器的主要功能是执行 SQL SELECT
查询并将结果集转换为消息。消息负载是整个结果集(表示为一个 List
),列表项的类型取决于行映射策略。默认策略是一个通用映射器,它为查询结果中的每一行返回一个 Map
。您可以选择通过添加对 RowMapper
实例的引用来更改此行为(有关行映射的详细信息,请参阅Spring JDBC 文档)。
如果您希望将 SELECT 查询结果中的行转换为单独的消息,可以使用下游的拆分器(splitter)。 |
入站适配器还需要对 JdbcTemplate
实例或 DataSource
的引用。
除了用于生成消息的 SELECT
语句外,适配器还有一个 UPDATE
语句,用于将记录标记为已处理,以便它们不会出现在下一次轮询中。更新操作可以通过原始 SELECT 查询返回的 ID 列表进行参数化。默认情况下,这是通过命名约定完成的(输入结果集中名为 id
的列会被转换为更新操作的参数 Map 中名为 id
的列表)。以下示例定义了一个带有更新查询和 DataSource
引用的入站通道适配器。
<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
channel="target" data-source="dataSource"
update="update item set status=10 where id in (:id)" />
更新查询中的参数使用参数名称前加冒号(: )指定(在前一个示例中,这是一个应用于轮询结果集中每行的表达式)。这是 Spring JDBC 中命名参数 JDBC 支持的标准特性,并结合了 Spring Integration 中采用的约定(投影到轮询结果列表)。底层的 Spring JDBC 功能限制了可用表达式(例如,大多数特殊字符除了句点外都不允许),但由于目标通常是一个对象列表(可能是单项列表),这些对象可以通过 bean 路径寻址,因此这并没有过度限制。 |
要更改参数生成策略,您可以向适配器注入 SqlParameterSourceFactory
来覆盖默认行为(适配器有一个 sql-parameter-source-factory
属性)。Spring Integration 提供了 ExpressionEvaluatingSqlParameterSourceFactory
,它创建一个基于 SpEL 的参数源,并将查询结果作为 #root
对象。(如果 update-per-row
为 true,则根对象是单行)。如果同一个参数名称在更新查询中多次出现,它只会被求值一次,并且其结果会被缓存。
您还可以为 select 查询使用参数源。在这种情况下,由于没有要对其进行求值的“结果”对象,因此每次使用单个参数源(而不是使用参数源工厂)。从 4.0 版本开始,您可以使用 Spring 创建基于 SpEL 的参数源,如下例所示
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
channel="target" data-source="dataSource"
select-sql-parameter-source="parameterSource" />
<bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<constructor-arg value="" />
</bean>
<bean id="parameterSourceFactory"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="status" value="@statusBean.which()" />
</map>
</property>
</bean>
<bean id="statusBean" class="foo.StatusDetermination" />
每个参数表达式中的 value
可以是任何有效的 SpEL 表达式。表达式求值的 #root
对象是在 parameterSource
bean 上定义的构造函数参数。对于所有求值,它都是静态的(在前一个示例中,是一个空的 String
)。
从 5.0 版本开始,您可以为 ExpressionEvaluatingSqlParameterSourceFactory
提供 sqlParameterTypes
以指定特定参数的目标 SQL 类型。
以下示例为查询中使用的参数提供了 SQL 类型
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
channel="target" data-source="dataSource"
select-sql-parameter-source="parameterSource" />
<bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<constructor-arg value="" />
</bean>
<bean id="parameterSourceFactory"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="sqlParameterTypes">
<map>
<entry key="status" value="#{ T(java.sql.Types).BINARY}" />
</map>
</property>
</bean>
使用 createParameterSourceNoCache 工厂方法。否则,参数源会缓存求值结果。另请注意,由于缓存被禁用,如果同一个参数名称在 select 查询中多次出现,则每次出现时都会重新求值。 |
轮询与事务
入站适配器接受常规的 Spring Integration 轮询器作为子元素。因此,可以控制轮询的频率(以及其他用途)。对于 JDBC 用法,轮询器的一个重要特性是可以在事务中包装轮询操作,如下例所示
<int-jdbc:inbound-channel-adapter query="..."
channel="target" data-source="dataSource" update="...">
<int:poller fixed-rate="1000">
<int:transactional/>
</int:poller>
</int-jdbc:inbound-channel-adapter>
如果您没有显式指定轮询器,则使用默认值。与 Spring Integration 通常的做法一样,默认轮询器可以定义为顶级 Bean)。 |
在前面的示例中,数据库每 1000 毫秒(即每秒一次)轮询一次,并且更新和 SELECT 查询都在同一事务中执行。此处未显示事务管理器配置。但是,只要事务管理器知道数据源,轮询就是事务性的。一个常见的用例是下游通道是直接通道(默认),这样端点在同一线程中调用,因此也在同一事务中。这样,如果其中任何一个失败,事务将回滚,输入数据将恢复到其原始状态。
max-rows
与 max-messages-per-poll
JDBC 入站通道适配器定义了一个名为 max-rows
的属性。当您指定适配器的轮询器时,您还可以定义一个名为 max-messages-per-poll
的属性。虽然这两个属性看起来相似,但它们的含义却完全不同。
max-messages-per-poll
指定每个轮询间隔内查询执行的次数,而 max-rows
指定每次执行返回的行数。
在正常情况下,您可能不希望在使用 JDBC 入站通道适配器时设置轮询器的 max-messages-per-poll
属性。其默认值为 1
,这意味着 JDBC 入站通道适配器的 receive()
方法在每个轮询间隔内精确执行一次。
将 max-messages-per-poll
属性设置为更大的值意味着查询会连续执行多次。有关 max-messages-per-poll
属性的更多信息,请参阅配置入站通道适配器。
相比之下,max-rows
属性(如果大于 0
)指定了从 receive()
方法创建的查询结果集中使用的最大行数。如果属性设置为 0
,则所有行都包含在结果消息中。此属性默认为 0
。
建议通过特定供应商的查询选项来限制结果集,例如 MySQL 的 LIMIT 或 SQL Server 的 TOP 或 Oracle 的 ROWNUM 。有关详细信息,请参阅特定供应商的文档。 |