JDBC Support
Spring Integration provides channel adapters for receiving and sending messages by using database queries. Through those adapters, Spring Integration supports not only plain JDBC SQL queries but also stored procedure and stored function calls.
You need to include this dependency into your project:
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jdbc</artifactId>
    <version>6.1.5-SNAPSHOT</version>
</dependency>compile "org.springframework.integration:spring-integration-jdbc:6.1.5-SNAPSHOT"
By default, the following JDBC components are available:
The Spring Integration JDBC Module also provides a JDBC Message Store.
Inbound Channel Adapter
The main function of an inbound channel adapter is to execute a SQL SELECT query and turn the result set into a message.
The message payload is the whole result set (expressed as a List), and the types of the items in the list depend on the row-mapping strategy.
The default strategy is a generic mapper that returns a Map for each row in the query result.
Optionally, you can change this by adding a reference to a RowMapper instance (see the Spring JDBC documentation for more detailed information about row mapping).
| If you want to convert rows in the SELECTquery result to individual messages, you can use a downstream splitter. | 
The inbound adapter also requires a reference to either a JdbcTemplate instance or a DataSource.
As well as the SELECT statement to generate the messages, the adapter also has an UPDATE statement that marks the records as processed so that they do not show up in the next poll.
The update can be parameterized by the list of IDs from the original select.
By default, this is done through a naming convention (a column in the input result set called id is translated into a list in the parameter map for the update called id).
The following example defines an inbound channel adapter with an update query and a DataSource reference.
<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
    channel="target" data-source="dataSource"
    update="update item set status=10 where id in (:id)" />| The parameters in the update query are specified with a colon ( :) prefix to the name of a parameter (which, in the preceding example, is an expression to be applied to each of the rows in the polled result set).
This is a standard feature of the named parameter JDBC support in Spring JDBC, combined with a convention (projection onto the polled result list) adopted in Spring Integration.
The underlying Spring JDBC features limit the available expressions (for example, most special characters other than a period are disallowed), but since the target is usually a list of objects (possibly a list of one) that are addressable by bean paths this is not unduly restrictive. | 
To change the parameter generation strategy, you can inject a SqlParameterSourceFactory into the adapter to override the default behavior (the adapter has a sql-parameter-source-factory attribute).
Spring Integration provides ExpressionEvaluatingSqlParameterSourceFactory, which creates a SpEL-based parameter source, with the results of the query as the #root object.
(If update-per-row is true, the root object is the row).
If the same parameter name appears multiple times in the update query, it is evaluated only once, and its result is cached.
You can also use a parameter source for the select query. In this case, since there is no “result” object to evaluate against, a single parameter source is used each time (rather than using a parameter source factory). Starting with version 4.0, you can use Spring to create a SpEL based parameter source, as the following example shows:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
	channel="target" data-source="dataSource"
	select-sql-parameter-source="parameterSource" />
<bean id="parameterSource" factory-bean="parameterSourceFactory"
			factory-method="createParameterSourceNoCache">
	<constructor-arg value="" />
</bean>
<bean id="parameterSourceFactory"
		class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
	<property name="parameterExpressions">
		<map>
			<entry key="status" value="@statusBean.which()" />
		</map>
	</property>
</bean>
<bean id="statusBean" class="foo.StatusDetermination" />The value in each parameter expression can be any valid SpEL expression.
The #root object for the expression evaluation is the constructor argument defined on the parameterSource bean.
It is static for all evaluations (in the preceding example, an empty String).
Starting with version 5.0, you ca supply ExpressionEvaluatingSqlParameterSourceFactory with sqlParameterTypes to specify the target SQL type for the particular parameter.
The following example provides SQL types for the parameters being used in the query:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
    channel="target" data-source="dataSource"
    select-sql-parameter-source="parameterSource" />
<bean id="parameterSource" factory-bean="parameterSourceFactory"
            factory-method="createParameterSourceNoCache">
    <constructor-arg value="" />
</bean>
<bean id="parameterSourceFactory"
        class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="sqlParameterTypes">
        <map>
            <entry key="status" value="#{ T(java.sql.Types).BINARY}" />
        </map>
    </property>
</bean>| Use the createParameterSourceNoCachefactory method.
Otherwise, the parameter source caches the result of the evaluation.
Also note that, because caching is disabled, if the same parameter name appears in the select query multiple times, it is re-evaluated for each occurrence. | 
Polling and Transactions
The inbound adapter accepts a regular Spring Integration poller as a child element. Consequently, the frequency of the polling can be controlled (among other uses). An important feature of the poller for JDBC usage is the option to wrap the poll operation in a transaction, as the following example shows:
<int-jdbc:inbound-channel-adapter query="..."
        channel="target" data-source="dataSource" update="...">
    <int:poller fixed-rate="1000">
        <int:transactional/>
    </int:poller>
</int-jdbc:inbound-channel-adapter>| If you do not explicitly specify a poller, a default value is used. As is normal with Spring Integration, it can be defined as a top-level bean). | 
In the preceding example, the database is polled every 1000 milliseconds (or once a second), and the update and select queries are both executed in the same transaction. The transaction manager configuration is not shown. However, as long as it is aware of the data source, the poll is transactional. A common use case is for the downstream channels to be direct channels (the default), so that the endpoints are invoked in the same thread and, hence, the same transaction. That way, if any of them fail, the transaction rolls back and the input data is reverted to its original state.
max-rows Versus max-messages-per-poll
The JDBC inbound channel adapter defines an attribute called max-rows.
When you specify the adapter’s poller, you can also define a property called max-messages-per-poll.
While these two attributes look similar, their meaning is quite different.
max-messages-per-poll specifies the number of times the query is executed per polling interval, whereas max-rows specifies the number of rows returned for each execution.
Under normal circumstances, you would likely not want to set the poller’s max-messages-per-poll property when you use the JDBC inbound channel adapter.
Its default value is 1, which means that the JDBC inbound channel adapter’s receive() method is executed exactly once for each poll interval.
Setting the max-messages-per-poll attribute to a larger value means that the query is executed that many times back to back.
For more information regarding the max-messages-per-poll attribute, see Configuring An Inbound Channel Adapter.
In contrast, the max-rows attribute, if greater than 0, specifies the maximum number of rows to be used from the query result set created by the receive() method.
If the attribute is set to 0, all rows are included in the resulting message.
The attribute defaults to 0.
| It is recommended to use result set limiting via vendor-specific query options, for example MySQL LIMITor SQL ServerTOPor Oracle’sROWNUM.
See the particular vendor documentation for more information. | 
Outbound Channel Adapter
The outbound channel adapter is the inverse of the inbound: its role is to handle a message and use it to execute a SQL query. By default, the message payload and headers are available as input parameters to the query, as the following example shows:
<int-jdbc:outbound-channel-adapter
    query="insert into foos (id, status, name) values (:headers[id], 0, :payload[something])"
    data-source="dataSource"
    channel="input"/>In the preceding example, messages arriving at the channel labelled input have a payload of a map with a key of something, so the [] operator dereferences that value from the map.
The headers are also accessed as a map.
| The parameters in the preceding query are bean property expressions on the incoming message (not SpEL expressions).
This behavior is part of the SqlParameterSource, which is the default source created by the outbound adapter.
You can inject a differentSqlParameterSourceFactoryto get different behavior. | 
The outbound adapter requires a reference to either a DataSource or a JdbcTemplate.
You can also inject a SqlParameterSourceFactory to control the binding of each incoming message to a query.
If the input channel is a direct channel, the outbound adapter runs its query in the same thread and, therefore, the same transaction (if there is one) as the sender of the message.
Passing Parameters by Using SpEL Expressions
A common requirement for most JDBC channel adapters is to pass parameters as part of SQL queries or stored procedures or functions.
As mentioned earlier, these parameters are by default bean property expressions, not SpEL expressions.
However, if you need to pass SpEL expression as parameters, you must explicitly inject a SqlParameterSourceFactory.
The following example uses a ExpressionEvaluatingSqlParameterSourceFactory to achieve that requirement:
<jdbc:outbound-channel-adapter data-source="dataSource" channel="input"
    query="insert into MESSAGES (MESSAGE_ID,PAYLOAD,CREATED_DATE) values (:id, :payload, :createdDate)"
    sql-parameter-source-factory="spelSource"/>
<bean id="spelSource"
      class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="parameterExpressions">
        <map>
            <entry key="id"          value="headers['id'].toString()"/>
            <entry key="createdDate" value="new java.util.Date()"/>
            <entry key="payload"     value="payload"/>
        </map>
    </property>
</bean>For further information, see Defining Parameter Sources.
Using the PreparedStatement Callback
Sometimes, the flexibility and loose-coupling of SqlParameterSourceFactory does not do what we need for the target PreparedStatement or we need to do some low-level JDBC work.
The Spring JDBC module provides APIs to configure the execution environment (such as ConnectionCallback or PreparedStatementCreator) and manipulate parameter values (such as SqlParameterSource).
It can even access APIs for low-level operations, such as StatementCallback.
Starting with Spring Integration 4.2, MessagePreparedStatementSetter allows the specification of parameters on the PreparedStatement manually, in the requestMessage context.
This class plays exactly the same role as PreparedStatementSetter in the standard Spring JDBC API.
Actually, it is invoked directly from an inline PreparedStatementSetter implementation when the JdbcMessageHandler invokes execute on the JdbcTemplate.
This functional interface option is mutually exclusive with sqlParameterSourceFactory and can be used as a more powerful alternative to populate parameters of the PreparedStatement from the requestMessage.
For example, it is useful when we need to store File data to the DataBase BLOB column in a streaming manner.
The following example shows how to do so:
@Bean
@ServiceActivator(inputChannel = "storeFileChannel")
public MessageHandler jdbcMessageHandler(DataSource dataSource) {
    JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(dataSource,
            "INSERT INTO imagedb (image_name, content, description) VALUES (?, ?, ?)");
    jdbcMessageHandler.setPreparedStatementSetter((ps, m) -> {
        ps.setString(1, m.getHeaders().get(FileHeaders.FILENAME));
        try (FileInputStream inputStream = new FileInputStream((File) m.getPayload()); ) {
            ps.setBlob(2, inputStream);
        }
        catch (Exception e) {
            throw new MessageHandlingException(m, e);
        }
        ps.setClob(3, new StringReader(m.getHeaders().get("description", String.class)));
    });
    return jdbcMessageHandler;
}
From the XML configuration perspective, the prepared-statement-setter attribute is available on the <int-jdbc:outbound-channel-adapter> component.
It lets you specify a MessagePreparedStatementSetter bean reference.
Batch Update
Starting with version 5.1, the JdbcMessageHandler performs a JdbcOperations.batchUpdate() if the payload of the request message is an Iterable instance.
Each element of the Iterable is wrapped to a Message with the headers from the request message if such an element is not a Message already.
In the case of regular SqlParameterSourceFactory-based configuration these messages are used to build an SqlParameterSource[] for an argument used in the mentioned JdbcOperations.batchUpdate() function.
When a MessagePreparedStatementSetter configuration is applied, a BatchPreparedStatementSetter variant is used to iterate over those messages for each item and the provided MessagePreparedStatementSetter is called against them.
The batch update is not supported when keysGenerated mode is selected.
Outbound Gateway
The outbound gateway is like a combination of the outbound and inbound adapters: Its role is to handle a message and use it to execute a SQL query and then respond with the result by sending it to a reply channel. By default, the message payload and headers are available as input parameters to the query, as the following example shows:
<int-jdbc:outbound-gateway
    update="insert into mythings (id, status, name) values (:headers[id], 0, :payload[thing])"
    request-channel="input" reply-channel="output" data-source="dataSource" />The result of the preceding example is to insert a record into the mythings table and return a message that indicates the number of rows affected (the payload is a map: {UPDATED=1}) to the output channel .
If the update query is an insert with auto-generated keys, you can populate the reply message with the generated keys by adding keys-generated="true" to the preceding example (this is not the default because it is not supported by some database platforms).
The following example shows the changed configuration:
<int-jdbc:outbound-gateway
    update="insert into mythings (status, name) values (0, :payload[thing])"
    request-channel="input" reply-channel="output" data-source="dataSource"
    keys-generated="true"/>Instead of the update count or the generated keys, you can also provide a select query to execute and generate a reply message from the result (such as the inbound adapter), as the following example shows:
<int-jdbc:outbound-gateway
    update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])"
    query="select * from foos where id=:headers[$id]"
    request-channel="input" reply-channel="output" data-source="dataSource"/>Since Spring Integration 2.2, the update SQL query is no longer mandatory.
You can now provide only a select query, by using either the query attribute or the query element.
This is extremely useful if you need to actively retrieve data by using, for example, a generic gateway or a payload enricher.
The reply message is then generated from the result (similar to how the inbound adapter works) and passed to the reply channel.
The following example show to use the query attribute:
<int-jdbc:outbound-gateway
    query="select * from foos where id=:headers[id]"
    request-channel="input"
    reply-channel="output"
    data-source="dataSource"/>| By default, the component for the  | 
As with the channel adapters, you can also provide SqlParameterSourceFactory instances for request and reply.
The default is the same as for the outbound adapter, so the request message is available as the root of an expression.
If keys-generated="true", the root of the expression is the generated keys (a map if there is only one or a list of maps if multi-valued).
The outbound gateway requires a reference to either a DataSource or a JdbcTemplate.
It can also have a SqlParameterSourceFactory injected to control the binding of the incoming message to the query.
Starting with the version 4.2, the request-prepared-statement-setter attribute is available on the <int-jdbc:outbound-gateway> as an alternative to request-sql-parameter-source-factory.
It lets you specify a MessagePreparedStatementSetter bean reference, which implements more sophisticated PreparedStatement preparation before its execution.
Starting with the version 6.0, the JdbcOutboundGateway returns an empty list result as is instead of converting it to null as it was before with the meaning "no reply".
This caused an extra configuration in applications where handling of empty lists is a part of downstream logic.
See Splitter Discard Channel for possible empty list handling option.
See Outbound Channel Adapter for more information about MessagePreparedStatementSetter.
JDBC Message Store
Spring Integration provides two JDBC specific message store implementations.
The JdbcMessageStore is suitable for use with aggregators and the claim check pattern.
The JdbcChannelMessageStore implementation provides a more targeted and scalable implementation specifically for message channel.
Note that you can use a JdbcMessageStore to back a message channel, JdbcChannelMessageStore is optimized for that purpose.
| Starting with versions 5.0.11, 5.1.2, the indexes for the JdbcChannelMessageStorehave been optimized.
If you have large message groups in such a store, you may wish to alter the indexes.
Furthermore, the index forPriorityChannelis commented out because it is not needed unless you are using such channels backed by JDBC. | 
| When using the OracleChannelMessageStoreQueryProvider, the priority channel index must be added because it is included in a hint in the query. | 
Initializing the Database
Before starting to use JDBC message store components, you should provision a target database with the appropriate objects.
Spring Integration ships with some sample scripts that can be used to initialize a database.
In the spring-integration-jdbc JAR file, you can find scripts in the org.springframework.integration.jdbc package.
It provides an example create and an example drop script for a range of common database platforms.
A common way to use these scripts is to reference them in a Spring JDBC data source initializer.
Note that the scripts are provided as samples and as specifications of the required table and column names.
You may find that you need to enhance them for production use (for, example, by adding index declarations).
The Generic JDBC Message Store
The JDBC module provides an implementation of the Spring Integration MessageStore (important in the claim check pattern) and MessageGroupStore (important in stateful patterns such as an aggregator) backed by a database.
Both interfaces are implemented by the JdbcMessageStore, and there is support for configuring store instances in XML, as the following example shows:
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>You can specify a JdbcTemplate instead of a DataSource.
The following example shows some other optional attributes:
<int-jdbc:message-store id="messageStore" data-source="dataSource"
    lob-handler="lobHandler" table-prefix="MY_INT_"/>In the preceding example, we have specified a LobHandler for dealing with messages as large objects (which is often necessary for Oracle) and a prefix for the table names in the queries generated by the store.
The table name prefix defaults to INT_.
Backing Message Channels
If you intend to back message channels with JDBC, we recommend using the JdbcChannelMessageStore implementation.
It works only in conjunction with Message Channels.
Supported Databases
The JdbcChannelMessageStore uses database-specific SQL queries to retrieve messages from the database.
Therefore, you must set the ChannelMessageStoreQueryProvider property on the JdbcChannelMessageStore.
This channelMessageStoreQueryProvider provides the SQL queries for the particular database you specify.
Spring Integration provides support for the following relational databases:
- 
PostgreSQL 
- 
HSQLDB 
- 
MySQL 
- 
Oracle 
- 
Derby 
- 
H2 
- 
SqlServer 
- 
Sybase 
- 
DB2 
If your database is not listed, you can extend the AbstractChannelMessageStoreQueryProvider class and provide your own custom queries.
Version 4.0 added the MESSAGE_SEQUENCE column to the table to ensure first-in-first-out (FIFO) queueing even when messages are stored in the same millisecond.
Custom Message Insertion
Since version 5.0, by overloading the ChannelMessageStorePreparedStatementSetter class, you can provide a custom implementation for message insertion in the JdbcChannelMessageStore.
You can use it to set different columns or change the table structure or serialization strategy.
For example, instead of default serialization to byte[], you can store its structure as a JSON string.
The following example uses the default implementation of setValues to store common columns and overrides the behavior to store the message payload as a varchar:
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
    @Override
    public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
        Object groupId, String region, 	boolean priorityEnabled) throws SQLException {
        // Populate common columns
        super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
        // Store message payload as varchar
        preparedStatement.setString(6, requestMessage.getPayload().toString());
    }
}
| Generally, we do not recommend using a relational database for queuing. Instead, if possible, consider using either JMS- or AMQP-backed channels instead. For further reference, see the following resource: If you are still planning to use your database as a queue, consider using PostgreSQL and its notification mechanism which is described in a subsequent section. | 
Concurrent Polling
When polling a message channel, you have the option to configure the associated Poller with a TaskExecutor reference.
| Keep in mind, though, that if you use a JDBC backed message channel and you plan to poll the channel and consequently the message store transactionally with multiple threads, you should ensure that you use a relational database that supports Multiversion Concurrency Control (MVCC). Otherwise, locking may be an issue and the performance, when using multiple threads, may not materialize as expected. For example, Apache Derby is problematic in that regard. To achieve better JDBC queue throughput and avoid issues when different threads may poll the same   | 
Priority Channel
Starting with version 4.0, JdbcChannelMessageStore implements PriorityCapableChannelMessageStore and provides the priorityEnabled option, letting it be used as a message-store reference for priority-queue instances.
For this purpose, the INT_CHANNEL_MESSAGE table has a MESSAGE_PRIORITY column to store the value of PRIORITY message headers.
In addition, a new MESSAGE_SEQUENCE column lets us achieve a robust first-in-first-out (FIFO) polling mechanism, even when multiple messages are stored with the same priority in the same millisecond.
Messages are polled (selected) from the database with order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE.
| We do not recommend using the same JdbcChannelMessageStorebean for priority and non-priority queue channels, because thepriorityEnabledoption applies to the entire store and proper FIFO queue semantics are not retained for the queue channel.
However, the sameINT_CHANNEL_MESSAGEtable (and evenregion) can be used for bothJdbcChannelMessageStoretypes.
To configure that scenario, you can extend one message store bean from the other, as the following example shows: | 
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
    <int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>Partitioning a Message Store
It is common to use a JdbcMessageStore as a global store for a group of applications or nodes in the same application.
To provide some protection against name clashes and to give control over the database meta-data configuration, the message store lets the tables be partitioned in two ways.
One way is to use separate table names, by changing the prefix (as described earlier).
The other way is to specify a region name for partitioning data within a single table.
An important use case for the second approach is when the MessageStore is managing persistent queues that back a Spring Integration Message Channel.
The message data for a persistent channel is keyed in the store on the channel name.
Consequently, if the channel names are not globally unique, the channels can pick up data that is not intended for them.
To avoid this danger, you can use the message store region to keep data separate for different physical channels that have the same logical name.
PostgreSQL: Receiving Push Notifications
PostgreSQL offers a listen and notification framework for receiving push notifications upon database table manipulations.
Spring Integration leverages this mechanism (starting with version 6.0) to allow for receiving push notifications when new messages are added to a JdbcChannelMessageStore.
When using this feature, a database trigger must be defined, which can be found as part of the comments of the schema-postgresql.sql file which is included in the JDBC module of Spring Integration.
Push notifications are received via the PostgresChannelMessageTableSubscriber class which allows its subscribers to receive a callback upon the arrival of new messages for any given region and groupId.
These notifications are received even if a message was appended on a different JVM, but to the same database.
The PostgresSubscribableChannel implementation uses a PostgresChannelMessageTableSubscriber.Subscription contract to pull messages from the store as a reaction for notification from the mentioned PostgresChannelMessageTableSubscriber notifications.
For example, push notifications for some group can be received as follows:
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
    messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
    return messageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(
      @Value("${spring.datasource.url}") String url,
      @Value("${spring.datasource.username}") String username,
      @Value("${spring.datasource.password}") String password) {
    return new PostgresChannelMessageTableSubscriber(() ->
        DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}
@Bean
public PostgresSubscribableChannel channel(
    PostgresChannelMessageTableSubscriber subscriber,
    JdbcChannelMessageStore messageStore) {
  return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
Transaction support
Starting with version 6.0.5, specifying a PlatformTransactionManager on a PostgresSubscribableChannel will notify subscribers in a transaction.
An exception in a subscriber will cause the transaction to be rolled back and the message to be put back in the message store.
Transactional support is not activated by default.
Retries
Starting with version 6.0.5, a retry policy can be specified by providing a RetryTemplate to the PostgresSubscribableChannel.
By default, no retries are performed.
| Any active  For this need of an exclusive connection, it is also recommended that a JVM only runs a single  | 
Stored Procedures
In certain situations, plain JDBC support is not sufficient. Maybe you deal with legacy relational database schemas or you have complex data processing needs, but, ultimately, you have to use stored procedures or stored functions. Since Spring Integration 2.1, we provide three components to execute stored procedures or stored functions:
- 
Stored Procedures Inbound Channel Adapter 
- 
Stored Procedures Outbound Channel Adapter 
- 
Stored Procedures Outbound Gateway 
Supported Databases
In order to enable calls to stored procedures and stored functions, the stored procedure components use the org.springframework.jdbc.core.simple.SimpleJdbcCall class.
Consequently, the following databases are fully supported for executing stored procedures:
- 
Apache Derby 
- 
DB2 
- 
MySQL 
- 
Microsoft SQL Server 
- 
Oracle 
- 
PostgreSQL 
- 
Sybase 
If you want to execute stored functions instead, the following databases are fully supported:
- 
MySQL 
- 
Microsoft SQL Server 
- 
Oracle 
- 
PostgreSQL 
| Even though your particular database may not be fully supported, chances are that you can use the stored procedure Spring Integration components quite successfully anyway, provided your RDBMS supports stored procedures or stored functions. As a matter of fact, some provided integration tests use the H2 database. Nevertheless, it is very important to thoroughly test those usage scenarios. | 
Configuration
The stored procedure components provide full XML Namespace support, and configuring the components is similar as for the general purpose JDBC components discussed earlier.
Common Configuration Attributes
All stored procedure components share certain configuration parameters:
- 
auto-startup: Lifecycle attribute signaling whether this component should be started during application context startup. It defaults totrue. Optional.
- 
data-source: Reference to ajavax.sql.DataSource, which is used to access the database. Required.
- 
id: Identifies the underlying Spring bean definition, which is an instance of eitherEventDrivenConsumerorPollingConsumer, depending on whether the outbound channel adapter’schannelattribute references aSubscribableChannelor aPollableChannel. Optional.
- 
ignore-column-meta-data: For fully supported databases, the underlyingSimpleJdbcCallclass can automatically retrieve the parameter information for the stored procedure or stored function from the JDBC metadata.However, if the database does not support metadata lookups or if you need to provide customized parameter definitions, this flag can be set to true. It defaults tofalse. Optional.
- 
is-function: Iftrue, a SQL Function is called. In that case, thestored-procedure-nameorstored-procedure-name-expressionattributes define the name of the called function. It defaults tofalse. Optional.
- 
stored-procedure-name: This attribute specifies the name of the stored procedure. If theis-functionattribute is set totrue, this attribute specifies the function name instead. Either this property orstored-procedure-name-expressionmust be specified.
- 
stored-procedure-name-expression: This attribute specifies the name of the stored procedure by using a SpEL expression. By using SpEL, you have access to the full message (if available), including its headers and payload. You can use this attribute to invoke different stored procedures at runtime. For example, you can provide stored procedure names that you would like to execute as a message header. The expression must resolve to aString.If the is-functionattribute is set totrue, this attribute specifies a stored function. Either this property orstored-procedure-namemust be specified.
- 
jdbc-call-operations-cache-size: Defines the maximum number of cachedSimpleJdbcCallOperationsinstances. Basically, for each stored procedure name, a newSimpleJdbcCallOperationsinstance is created that, in return, is cached.Spring Integration 2.2 added the stored-procedure-name-expressionattribute and thejdbc-call-operations-cache-sizeattribute.The default cache size is 10. A value of0disables caching. Negative values are not permitted.If you enable JMX, statistical information about the jdbc-call-operations-cacheis exposed as an MBean. See MBean Exporter for more information.
- 
sql-parameter-source-factory: (Not available for the stored procedure inbound channel adapter.) Reference to aSqlParameterSourceFactory. By default, bean properties of the passed inMessagepayload are used as a source for the stored procedure’s input parameters by using aBeanPropertySqlParameterSourceFactory.This may suffice for basic use cases. For more sophisticated options, consider passing in one or more ProcedureParametervalues. See Defining Parameter Sources. Optional.
- 
use-payload-as-parameter-source: (Not available for the stored procedure inbound channel adapter.) If set totrue, the payload of theMessageis used as a source for providing parameters. If set tofalse, however, the entireMessageis available as a source for parameters.If no procedure parameters are passed in, this property defaults to true. This means that, by using a defaultBeanPropertySqlParameterSourceFactory, the bean properties of the payload are used as a source for parameter values for the stored procedure or stored function.However, if procedure parameters are passed in, this property (by default) evaluates to false.ProcedureParameterlets SpEL Expressions be provided. Therefore, it is highly beneficial to have access to the entireMessage. The property is set on the underlyingStoredProcExecutor. Optional.
Common Configuration Sub-Elements
The stored procedure components share a common set of child elements that you can use to define and pass parameters to stored procedures or stored functions. The following elements are available:
- 
parameter
- 
returning-resultset
- 
sql-parameter-definition
- 
poller
- 
parameter: Provides a mechanism to provide stored procedure parameters. Parameters can be either static or provided by using a SpEL Expressions.<int-jdbc:parameter name="" (1) type="" (2) value=""/> (3) <int-jdbc:parameter name="" expression=""/> (4)+ <1> The name of the parameter to be passed into the Stored Procedure or Stored Function. Required. <2> This attribute specifies the type of the value. If nothing is provided, this attribute defaults to java.lang.String. This attribute is used only when thevalueattribute is used. Optional. <3> The value of the parameter. You must provide either this attribute or theexpressionattribute. Optional. <4> Instead of thevalueattribute, you can specify a SpEL expression for passing the value of the parameter. If you specify theexpression, thevalueattribute is not allowed. Optional.Optional. 
- 
returning-resultset: Stored procedures may return multiple result sets. By setting one or morereturning-resultsetelements, you can specifyRowMappersto convert each returnedResultSetto meaningful objects. Optional.<int-jdbc:returning-resultset name="" row-mapper="" />
- 
sql-parameter-definition: If you use a database that is fully supported, you typically do not have to specify the stored procedure parameter definitions. Instead, those parameters can be automatically derived from the JDBC metadata. However, if you use databases that are not fully supported, you must set those parameters explicitly by using thesql-parameter-definitionelement.You can also choose to turn off any processing of parameter metadata information obtained through JDBC by using the ignore-column-meta-dataattribute.<int-jdbc:sql-parameter-definition name="" (1) direction="IN" (2) type="STRING" (3) scale="5" (4) type-name="FOO_STRUCT" (5) return-type="fooSqlReturnType"/> (6)1 Specifies the name of the SQL parameter. Required. 2 Specifies the direction of the SQL parameter definition. Defaults to IN. Valid values are:IN,OUT, andINOUT. If your procedure is returning result sets, use thereturning-resultsetelement. Optional.3 The SQL type used for this SQL parameter definition. Translates into an integer value, as defined by java.sql.Types. Alternatively, you can provide the integer value as well. If this attribute is not explicitly set, it defaults to 'VARCHAR'. Optional.4 The scale of the SQL parameter. Only used for numeric and decimal parameters. Optional. 5 The typeNamefor types that are user-named, such as:STRUCT,DISTINCT,JAVA_OBJECT, and named array types. This attribute is mutually exclusive with thescaleattribute. Optional.6 The reference to a custom value handler for complex types. An implementation of SqlReturnType. This attribute is mutually exclusive with thescaleattribute and is only applicable for OUT and INOUT parameters. Optional.
- 
poller: Lets you configure a message poller if this endpoint is aPollingConsumer. Optional.
Defining Parameter Sources
Parameter sources govern the techniques of retrieving and mapping the Spring Integration message properties to the relevant stored procedure input parameters.
The stored procedure components follow certain rules.
By default, the bean properties of the Message payload are used as a source for the stored procedure’s input parameters.
In that case, a BeanPropertySqlParameterSourceFactory is used.
This may suffice for basic use cases.
The next example illustrates that default behavior.
| For the “automatic” lookup of bean properties by using the BeanPropertySqlParameterSourceFactoryto work, your bean properties must be defined in lower case.
This is due to the fact that inorg.springframework.jdbc.core.metadata.CallMetaDataContext(the Java method ismatchInParameterValuesWithCallParameters()), the retrieved stored procedure parameter declarations are converted to lower case.
As a result, if you have camel-case bean properties (such aslastName), the lookup fails.
In that case, provide an explicitProcedureParameter. | 
Suppose we have a payload that consists of a simple bean with the following three properties: id, name, and description.
Furthermore, we have a simplistic Stored Procedure called INSERT_COFFEE that accepts three input parameters: id, name, and description.
We also use a fully supported database.
In that case, the following configuration for a stored procedure outbound adapter suffices:
<int-jdbc:stored-proc-outbound-channel-adapter data-source="dataSource"
    channel="insertCoffeeProcedureRequestChannel"
    stored-procedure-name="INSERT_COFFEE"/>For more sophisticated options, consider passing in one or more ProcedureParameter values.
If you do provide ProcedureParameter values explicitly, by default, an ExpressionEvaluatingSqlParameterSourceFactory is used for parameter processing, to enable the full power of SpEL expressions.
If you need even more control over how parameters are retrieved, consider passing in a custom implementation of SqlParameterSourceFactory by using the sql-parameter-source-factory attribute.
Stored Procedure Inbound Channel Adapter
The following listing calls out the attributes that matter for a stored procedure inbound channel adapter:
<int-jdbc:stored-proc-inbound-channel-adapter
                                   channel=""                                    (1)
                                   stored-procedure-name=""
                                   data-source=""
                                   auto-startup="true"
                                   id=""
                                   ignore-column-meta-data="false"
                                   is-function="false"
                                   skip-undeclared-results=""                    (2)
                                   return-value-required="false"                 (3)
    <int:poller/>
    <int-jdbc:sql-parameter-definition name="" direction="IN"
                                               type="STRING"
                                               scale=""/>
    <int-jdbc:parameter name="" type="" value=""/>
    <int-jdbc:parameter name="" expression=""/>
    <int-jdbc:returning-resultset name="" row-mapper="" />
</int-jdbc:stored-proc-inbound-channel-adapter>| 1 | Channel to which polled messages are sent.
If the stored procedure or function does not return any data, the payload of the Messageis null.
Required. | 
| 2 | If this attribute is set to true, all results from a stored procedure call that do not have a correspondingSqlOutParameterdeclaration are bypassed.
For example, stored procedures can return an update count value, even though your stored procedure declared only a single result parameter.
The exact behavior depends on the database implementation.
The value is set on the underlyingJdbcTemplate.
The value defaults totrue.
Optional. | 
| 3 | Indicates whether this procedure’s return value should be included. Since Spring Integration 3.0. Optional. | 
Stored Procedure Outbound Channel Adapter
The following listing calls out the attributes that matter for a stored procedure outbound channel adapter:
<int-jdbc:stored-proc-outbound-channel-adapter channel=""                        (1)
                                               stored-procedure-name=""
                                               data-source=""
                                               auto-startup="true"
                                               id=""
                                               ignore-column-meta-data="false"
                                               order=""                          (2)
                                               sql-parameter-source-factory=""
                                               use-payload-as-parameter-source="">
    <int:poller fixed-rate=""/>
    <int-jdbc:sql-parameter-definition name=""/>
    <int-jdbc:parameter name=""/>
</int-jdbc:stored-proc-outbound-channel-adapter>| 1 | The receiving message channel of this endpoint. Required. | 
| 2 | Specifies the order for invocation when this endpoint is connected as a subscriber to a channel.
This is particularly relevant when that channel is using a failoverdispatching strategy.
It has no effect when this endpoint is itself a polling consumer for a channel with a queue.
Optional. | 
Stored Procedure Outbound Gateway
The following listing calls out the attributes that matter for a stored procedure outbound channel adapter:
<int-jdbc:stored-proc-outbound-gateway request-channel=""                        (1)
                                       stored-procedure-name=""
                                       data-source=""
                                   auto-startup="true"
                                   id=""
                                   ignore-column-meta-data="false"
                                   is-function="false"
                                   order=""
                                   reply-channel=""                              (2)
                                   reply-timeout=""                              (3)
                                   return-value-required="false"                 (4)
                                   skip-undeclared-results=""                    (5)
                                   sql-parameter-source-factory=""
                                   use-payload-as-parameter-source="">
<int-jdbc:sql-parameter-definition name="" direction="IN"
                                   type=""
                                   scale="10"/>
<int-jdbc:sql-parameter-definition name=""/>
<int-jdbc:parameter name="" type="" value=""/>
<int-jdbc:parameter name="" expression=""/>
<int-jdbc:returning-resultset name="" row-mapper="" />| 1 | The receiving message channel of this endpoint. Required. | 
| 2 | Message channel to which replies should be sent after receiving the database response. Optional. | 
| 3 | Lets you specify how long this gateway waits for the reply message to be sent successfully before throwing an exception.
Keep in mind that, when sending to a DirectChannel, the invocation occurs in the sender’s thread.
Consequently, the failing of the send operation may be caused by other components further downstream.
The value is specified in milliseconds.
Optional. | 
| 4 | Indicates whether this procedure’s return value should be included. Optional. | 
| 5 | If the skip-undeclared-resultsattribute is set totrue, all results from a stored procedure call that do not have a correspondingSqlOutParameterdeclaration are bypassed.
For example, stored procedures may return an update count value, even though your stored procedure only declared a single result parameter.
The exact behavior depends on the database.
The value is set on the underlyingJdbcTemplate.
The value defaults totrue.
Optional. | 
Examples
This section contains two examples that call Apache Derby stored procedures.
The first procedure calls a stored procedure that returns a ResultSet.
By using a RowMapper, the data is converted into a domain object, which then becomes the Spring Integration message payload.
In the second sample, we call a stored procedure that uses output parameters to return data instead.
| Have a look at the Spring Integration Samples project. The project contains the Apache Derby example referenced here, as well as instructions on how to run it. The Spring Integration Samples project also provides an example of using Oracle stored procedures. | 
In the first example, we call a stored procedure named FIND_ALL_COFFEE_BEVERAGES that does not define any input parameters but that returns a ResultSet.
In Apache Derby, stored procedures are implemented in Java. The following listing shows the method signature:
public static void findAllCoffeeBeverages(ResultSet[] coffeeBeverages)
            throws SQLException {
    ...
}
The following listing shows the corresponding SQL:
CREATE PROCEDURE FIND_ALL_COFFEE_BEVERAGES() \
PARAMETER STYLE JAVA LANGUAGE JAVA MODIFIES SQL DATA DYNAMIC RESULT SETS 1 \
EXTERNAL NAME 'o.s.i.jdbc.storedproc.derby.DerbyStoredProcedures.findAllCoffeeBeverages';In Spring Integration, you can now call this stored procedure by using, for example, a stored-proc-outbound-gateway, as the following example shows:
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-all"
                                       data-source="dataSource"
                                       request-channel="findAllProcedureRequestChannel"
                                       expect-single-result="true"
                                       stored-procedure-name="FIND_ALL_COFFEE_BEVERAGES">
<int-jdbc:returning-resultset name="coffeeBeverages"
    row-mapper="org.springframework.integration.support.CoffeBeverageMapper"/>
</int-jdbc:stored-proc-outbound-gateway>In the second example, we call a stored procedure named FIND_COFFEE that has one input parameter.
Instead of returning a ResultSet, it uses an output parameter.
The following example shows the method signature:
public static void findCoffee(int coffeeId, String[] coffeeDescription)
            throws SQLException {
    ...
}
The following listing shows the corresponding SQL:
CREATE PROCEDURE FIND_COFFEE(IN ID INTEGER, OUT COFFEE_DESCRIPTION VARCHAR(200)) \
PARAMETER STYLE JAVA LANGUAGE JAVA EXTERNAL NAME \
'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findCoffee';In Spring Integration, you can now call this Stored Procedure by using, for example, a stored-proc-outbound-gateway, as the following example shows:
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-coffee"
                                       data-source="dataSource"
                                       request-channel="findCoffeeProcedureRequestChannel"
                                       skip-undeclared-results="true"
                                       stored-procedure-name="FIND_COFFEE"
                                       expect-single-result="true">
    <int-jdbc:parameter name="ID" expression="payload" />
</int-jdbc:stored-proc-outbound-gateway>JDBC Lock Registry
Version 4.3 introduced the JdbcLockRegistry.
Certain components (for example, aggregator and resequencer) use a lock obtained from a LockRegistry instance to ensure that only one thread manipulates a group at a time.
The DefaultLockRegistry performs this function within a single component.
You can now configure an external lock registry on these components.
When used with a shared MessageGroupStore, you can use the JdbcLockRegistry to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.
When a lock is released by a local thread, another local thread can generally acquire the lock immediately. If a lock is released by a thread that uses a different registry instance, it can take up to 100ms to acquire the lock.
The JdbcLockRegistry is based on the LockRepository abstraction, which has a DefaultLockRepository implementation.
The database schema scripts are located in the org.springframework.integration.jdbc package, which is divided for the particular RDBMS vendors.
For example, the following listing shows the H2 DDL for the lock table:
CREATE TABLE INT_LOCK  (
    LOCK_KEY CHAR(36),
    REGION VARCHAR(100),
    CLIENT_ID CHAR(36),
    CREATED_DATE TIMESTAMP NOT NULL,
    constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);The INT_ can be changed according to the target database design requirements.
Therefore, you must use prefix property on the DefaultLockRepository bean definition.
Sometimes, one application has moved to such a state that it cannot release the distributed lock and remove the particular record in the database.
For this purpose, such deadlocks can be expired by the other application on the next locking invocation.
The timeToLive (TTL) option on the DefaultLockRepository is provided for this purpose.
You may also want to specify CLIENT_ID for the locks stored for a given DefaultLockRepository instance.
If so, you can specify the id to be associated with the DefaultLockRepository as a constructor parameter.
Starting with version 5.1.8, the JdbcLockRegistry can be configured with the idleBetweenTries - a Duration to sleep between lock record insert/update executions.
By default, it is 100 milliseconds and in some environments non-leaders pollute connections with data source too often.
Starting with version 5.4, the RenewableLockRegistry interface has been introduced and added to JdbcLockRegistry.
The renewLock() method must be called during locked process in case of the locked process would be longer than time to live of the lock.
So the time to live can be highly reduce and deployments can retake a lost lock quickly.
| The lock renewal can be done only if the lock is held by the current thread. | 
String with version 5.5.6, the JdbcLockRegistry is support automatically clean up cache for JdbcLock in JdbcLockRegistry.locks via JdbcLockRegistry.setCacheCapacity().
See its JavaDocs for more information.
String with version 6.0, the DefaultLockRepository can be supplied with a PlatformTransactionManager instead of relying on the primary bean from the application context.
String with version 6.1, the DefaultLockRepository can be configured for custom insert, update and renew queries.
For this purpose the respective setters and getters are exposed.
For example, an insert query for PostgreSQL hint can be configured like this:
lockRepository.setInsertQuery(lockRepository.getInsertQuery() + " ON CONFLICT DO NOTHING");
JDBC Metadata Store
Version 5.0 introduced the JDBC MetadataStore (see Metadata Store) implementation.
You can use the JdbcMetadataStore to maintain the metadata state across application restarts.
This MetadataStore implementation can be used with adapters such as the following:
To configure these adapters to use the JdbcMetadataStore, declare a Spring bean by using a bean name of metadataStore.
The Feed inbound channel adapter and the feed inbound channel adapter both automatically pick up and use the declared JdbcMetadataStore, as the following example shows:
@Bean
public MetadataStore metadataStore(DataSource dataSource) {
    return new JdbcMetadataStore(dataSource);
}
The org.springframework.integration.jdbc package has Database schema scripts for several RDMBS vendors.
For example, the following listing shows the H2 DDL for the metadata table:
CREATE TABLE INT_METADATA_STORE  (
	METADATA_KEY VARCHAR(255) NOT NULL,
	METADATA_VALUE VARCHAR(4000),
	REGION VARCHAR(100) NOT NULL,
	constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION)
);You can change the INT_ prefix to match the target database design requirements.
You can also configure JdbcMetadataStore to use the custom prefix.
The JdbcMetadataStore implements ConcurrentMetadataStore, letting it be reliably shared across multiple application instances, where only one instance can store or modify a key’s value.
All of these operations are atomic, thanks to transaction guarantees.
Transaction management must use JdbcMetadataStore.
Inbound channel adapters can be supplied with a reference to the TransactionManager in the poller configuration.
Unlike non-transactional MetadataStore implementations, with JdbcMetadataStore, the entry appears in the target table only after the transaction commits.
When a rollback occurs, no entries are added to the INT_METADATA_STORE table.
Since version 5.0.7, you can configure the JdbcMetadataStore with the RDBMS vendor-specific lockHint option for lock-based queries on metadata store entries.
By default, it is FOR UPDATE and can be configured with an empty string if the target database does not support row locking functionality.
Consult with your vendor for particular and possible hints in the SELECT expression for locking rows before updates.