Class Kafka
java.lang.Object
org.springframework.integration.kafka.dsl.Kafka
Factory class for Apache Kafka components.
- Since:
- 5.4
- Author:
- Artem Bilan, Nasko Vasilev, Gary Russell, Anshul Mehra
-
Method Summary
Modifier and TypeMethodDescriptionstatic KafkaPointToPointChannelSpecchannel(org.springframework.kafka.core.KafkaTemplate<?, ?> template, org.springframework.kafka.config.KafkaListenerContainerFactory<?> containerFactory, String topic) Create a spec for a subscribable channel with the provided parameters.static <K,V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties) Create an initialKafkaInboundChannelAdapterSpecwith the consumer factory and topics.static <K,V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch) Create an initialKafkaInboundChannelAdapterSpecwith the consumer factory and topics.static <K,V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K, V> ackCallbackFactory) Create an initialKafkaInboundChannelAdapterSpecwith the consumer factory and topics with a custom ack callback factory.static <K,V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K, V> ackCallbackFactory, boolean allowMultiFetch) Create an initialKafkaInboundChannelAdapterSpecwith the consumer factory and topics with a custom ack callback factory.static <K,V, R> KafkaInboundGatewaySpec.KafkaInboundGatewayListenerContainerSpec<K, V, R> inboundGateway(KafkaMessageListenerContainerSpec<K, V> containerSpec, KafkaTemplateSpec<K, R> templateSpec) Create an initialKafkaInboundGatewaySpecwith the provided container and template specs.static <K,V, R> KafkaInboundGatewaySpec.KafkaInboundGatewayListenerContainerSpec<K, V, R> inboundGateway(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ContainerProperties containerProperties, org.springframework.kafka.core.ProducerFactory<K, R> producerFactory) Create an initialKafkaInboundGatewaySpecwith the provided consumer factory, container properties and producer factory.static <K,V, R> KafkaInboundGatewaySpec<K, V, R, ?> inboundGateway(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> container, org.springframework.kafka.core.KafkaTemplate<K, R> template) Create an initialKafkaInboundGatewaySpecwith the provided container and template.static <K,V> KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K, V> messageDrivenChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, String... topics) static <K,V> KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K, V> messageDrivenChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, Pattern topicPattern) static <K,V> KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K, V> messageDrivenChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode, String... topics) static <K,V> KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K, V> messageDrivenChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode, Pattern topicPattern) static <K,V> KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K, V> messageDrivenChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode, org.springframework.kafka.support.TopicPartitionOffset... topicPartitions) static <K,V> KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K, V> messageDrivenChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ContainerProperties containerProperties) static <K,V> KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K, V> messageDrivenChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ContainerProperties containerProperties, KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode) static <K,V> KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K, V> messageDrivenChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.support.TopicPartitionOffset... topicPartitions) static <K,V> KafkaMessageDrivenChannelAdapterSpec<K, V, ?> messageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> listenerContainer) Create an initialKafkaMessageDrivenChannelAdapterSpec.static <K,V> KafkaMessageDrivenChannelAdapterSpec<K, V, ?> messageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> listenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode) Create an initialKafkaMessageDrivenChannelAdapterSpec.static <K,V> KafkaProducerMessageHandlerSpec<K, V, ?> outboundChannelAdapter(org.springframework.kafka.core.KafkaTemplate<K, V> kafkaTemplate) Create an initialKafkaProducerMessageHandlerSpec.static <K,V> KafkaProducerMessageHandlerSpec.KafkaProducerMessageHandlerTemplateSpec<K, V> outboundChannelAdapter(org.springframework.kafka.core.ProducerFactory<K, V> producerFactory) Create an initialKafkaProducerMessageHandlerSpecwith ProducerFactory.static <K,V, R> KafkaOutboundGatewaySpec.KafkaGatewayMessageHandlerTemplateSpec<K, V, R> outboundGateway(org.springframework.kafka.core.ProducerFactory<K, V> producerFactory, org.springframework.kafka.listener.GenericMessageListenerContainer<K, R> replyContainer) Create an initialKafkaProducerMessageHandlerSpecwith ProducerFactory.static <K,V, R> KafkaOutboundGatewaySpec<K, V, R, ?> outboundGateway(org.springframework.kafka.requestreply.ReplyingKafkaTemplate<K, V, R> kafkaTemplate) Create an initialKafkaProducerMessageHandlerSpec.static KafkaPollableChannelSpecpollableChannel(org.springframework.kafka.core.KafkaTemplate<?, ?> template, KafkaMessageSource<?, ?> source) Create a spec for a pollable channel with the provided parameters.publishSubscribeChannel(org.springframework.kafka.core.KafkaTemplate<?, ?> template, org.springframework.kafka.config.KafkaListenerContainerFactory<?> containerFactory, String topic) Create a spec for a publish-subscribe channel with the provided parameters.
-
Method Details
-
outboundChannelAdapter
public static <K,V> KafkaProducerMessageHandlerSpec<K,V, outboundChannelAdapter?> (org.springframework.kafka.core.KafkaTemplate<K, V> kafkaTemplate) Create an initialKafkaProducerMessageHandlerSpec.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type.- Parameters:
kafkaTemplate- theKafkaTemplateto use- Returns:
- the KafkaProducerMessageHandlerSpec.
-
outboundChannelAdapter
public static <K,V> KafkaProducerMessageHandlerSpec.KafkaProducerMessageHandlerTemplateSpec<K,V> outboundChannelAdapter(org.springframework.kafka.core.ProducerFactory<K, V> producerFactory) Create an initialKafkaProducerMessageHandlerSpecwith ProducerFactory.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type.- Parameters:
producerFactory- theProducerFactoryJava 8 Lambda.- Returns:
- the KafkaProducerMessageHandlerSpec.
- See Also:
-
inboundChannelAdapter
public static <K,V> KafkaInboundChannelAdapterSpec<K,V> inboundChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties) Create an initialKafkaInboundChannelAdapterSpecwith the consumer factory and topics.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type.- Parameters:
consumerFactory- the consumer factory.consumerProperties- the consumerProperties.- Returns:
- the spec.
-
inboundChannelAdapter
public static <K,V> KafkaInboundChannelAdapterSpec<K,V> inboundChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch) Create an initialKafkaInboundChannelAdapterSpecwith the consumer factory and topics.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type.- Parameters:
consumerFactory- the consumer factory.consumerProperties- the consumerProperties.allowMultiFetch- true to fetch multiple records on each poll.- Returns:
- the spec.
-
inboundChannelAdapter
public static <K,V> KafkaInboundChannelAdapterSpec<K,V> inboundChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K, V> ackCallbackFactory) Create an initialKafkaInboundChannelAdapterSpecwith the consumer factory and topics with a custom ack callback factory.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type.- Parameters:
consumerFactory- the consumer factory.consumerProperties- the consumerProperties.ackCallbackFactory- the callback factory.- Returns:
- the spec.
-
inboundChannelAdapter
public static <K,V> KafkaInboundChannelAdapterSpec<K,V> inboundChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K, V> ackCallbackFactory, boolean allowMultiFetch) Create an initialKafkaInboundChannelAdapterSpecwith the consumer factory and topics with a custom ack callback factory.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type.- Parameters:
consumerFactory- the consumer factory.consumerProperties- the consumerProperties.ackCallbackFactory- the callback factory.allowMultiFetch- true to fetch multiple records on each poll.- Returns:
- the spec.
-
messageDrivenChannelAdapter
public static <K,V> KafkaMessageDrivenChannelAdapterSpec<K,V, messageDrivenChannelAdapter?> (org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> listenerContainer) Create an initialKafkaMessageDrivenChannelAdapterSpec. If the listener container is not already a bean it will be registered in the application context. If the adapter spec has anid, the bean name will be that id appended with '.container'. Otherwise, the bean name will be generated from the container class name.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type.- Parameters:
listenerContainer- theAbstractMessageListenerContainer.- Returns:
- the KafkaMessageDrivenChannelAdapterSpec.
-
messageDrivenChannelAdapter
public static <K,V> KafkaMessageDrivenChannelAdapterSpec<K,V, messageDrivenChannelAdapter?> (org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> listenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode) Create an initialKafkaMessageDrivenChannelAdapterSpec. If the listener container is not already a bean it will be registered in the application context. If the adapter spec has anid, the bean name will be that id appended with '.container'. Otherwise, the bean name will be generated from the container class name.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type.- Parameters:
listenerContainer- theAbstractMessageListenerContainer.listenerMode- theKafkaMessageDrivenChannelAdapter.ListenerMode.- Returns:
- the KafkaMessageDrivenChannelAdapterSpec.
-
messageDrivenChannelAdapter
public static <K,V> KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K,V> messageDrivenChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ContainerProperties containerProperties) Create an initialKafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type.- Parameters:
consumerFactory- theConsumerFactory.containerProperties- theContainerPropertiesto use.- Returns:
- the KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.
-
messageDrivenChannelAdapter
public static <K,V> KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K,V> messageDrivenChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ContainerProperties containerProperties, KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode) Create an initialKafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type.- Parameters:
consumerFactory- theConsumerFactory.containerProperties- theContainerPropertiesto use.listenerMode- theKafkaMessageDrivenChannelAdapter.ListenerMode.- Returns:
- the KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.
-
messageDrivenChannelAdapter
public static <K,V> KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K,V> messageDrivenChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.support.TopicPartitionOffset... topicPartitions) Create an initialKafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type.- Parameters:
consumerFactory- theConsumerFactory.topicPartitions- theTopicPartitionOffsetvararg.- Returns:
- the KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.
-
messageDrivenChannelAdapter
public static <K,V> KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K,V> messageDrivenChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode, org.springframework.kafka.support.TopicPartitionOffset... topicPartitions) Create an initialKafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type.- Parameters:
consumerFactory- theConsumerFactory.listenerMode- theKafkaMessageDrivenChannelAdapter.ListenerMode.topicPartitions- theTopicPartitionOffsetvararg.- Returns:
- the KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.
-
messageDrivenChannelAdapter
public static <K,V> KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K,V> messageDrivenChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, String... topics) Create an initialKafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type.- Parameters:
consumerFactory- theConsumerFactory.topics- the topics vararg.- Returns:
- the KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.
-
messageDrivenChannelAdapter
public static <K,V> KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K,V> messageDrivenChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode, String... topics) Create an initialKafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type.- Parameters:
consumerFactory- theConsumerFactory.listenerMode- theKafkaMessageDrivenChannelAdapter.ListenerMode.topics- the topics vararg.- Returns:
- the KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.
-
messageDrivenChannelAdapter
public static <K,V> KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K,V> messageDrivenChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, Pattern topicPattern) Create an initialKafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type.- Parameters:
consumerFactory- theConsumerFactory.topicPattern- the topicPattern vararg.- Returns:
- the KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.
-
messageDrivenChannelAdapter
public static <K,V> KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K,V> messageDrivenChannelAdapter(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode, Pattern topicPattern) Create an initialKafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type.- Parameters:
consumerFactory- theConsumerFactory.listenerMode- theKafkaMessageDrivenChannelAdapter.ListenerMode.topicPattern- the topicPattern vararg.- Returns:
- the KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.
-
outboundGateway
public static <K,V, KafkaOutboundGatewaySpec<K,R> V, outboundGatewayR, ?> (org.springframework.kafka.requestreply.ReplyingKafkaTemplate<K, V, R> kafkaTemplate) Create an initialKafkaProducerMessageHandlerSpec.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type (request).R- the Kafka message value type (reply).- Parameters:
kafkaTemplate- theReplyingKafkaTemplateto use- Returns:
- the KafkaGatewayMessageHandlerSpec.
-
outboundGateway
public static <K,V, KafkaOutboundGatewaySpec.KafkaGatewayMessageHandlerTemplateSpec<K,R> V, outboundGatewayR> (org.springframework.kafka.core.ProducerFactory<K, V> producerFactory, org.springframework.kafka.listener.GenericMessageListenerContainer<K, R> replyContainer) Create an initialKafkaProducerMessageHandlerSpecwith ProducerFactory.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type (request).R- the Kafka message value type (reply).- Parameters:
producerFactory- theProducerFactoryJava 8 Lambda.replyContainer- a listener container for replies.- Returns:
- the KafkaGatewayMessageHandlerSpec.
-
inboundGateway
public static <K,V, KafkaInboundGatewaySpec<K,R> V, inboundGatewayR, ?> (org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> container, org.springframework.kafka.core.KafkaTemplate<K, R> template) Create an initialKafkaInboundGatewaySpecwith the provided container and template. If the listener container is not already a bean it will be registered in the application context. If the adapter spec has anid, the bean name will be that id appended with '.container'. Otherwise, the bean name will be generated from the container class name.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type (request).R- the Kafka message value type (reply).- Parameters:
container- the container.template- the template.- Returns:
- the spec.
-
inboundGateway
public static <K,V, KafkaInboundGatewaySpec.KafkaInboundGatewayListenerContainerSpec<K,R> V, inboundGatewayR> (org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ContainerProperties containerProperties, org.springframework.kafka.core.ProducerFactory<K, R> producerFactory) Create an initialKafkaInboundGatewaySpecwith the provided consumer factory, container properties and producer factory.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type (request).R- the Kafka message value type (reply).- Parameters:
consumerFactory- the consumer factory.containerProperties- the container properties.producerFactory- the producer factory.- Returns:
- the spec.
-
inboundGateway
public static <K,V, KafkaInboundGatewaySpec.KafkaInboundGatewayListenerContainerSpec<K,R> V, inboundGatewayR> (KafkaMessageListenerContainerSpec<K, V> containerSpec, KafkaTemplateSpec<K, R> templateSpec) Create an initialKafkaInboundGatewaySpecwith the provided container and template specs.- Type Parameters:
K- the Kafka message key type.V- the Kafka message value type (request).R- the Kafka message value type (reply).- Parameters:
containerSpec- the container spec.templateSpec- the template spec.- Returns:
- the spec.
-
channel
public static KafkaPointToPointChannelSpec channel(org.springframework.kafka.core.KafkaTemplate<?, ?> template, org.springframework.kafka.config.KafkaListenerContainerFactory<?> containerFactory, String topic) Create a spec for a subscribable channel with the provided parameters.- Parameters:
template- the template.containerFactory- the container factory.topic- the topic.- Returns:
- the spec.
-
publishSubscribeChannel
public static KafkaPublishSubscribeChannelSpec publishSubscribeChannel(org.springframework.kafka.core.KafkaTemplate<?, ?> template, org.springframework.kafka.config.KafkaListenerContainerFactory<?> containerFactory, String topic) Create a spec for a publish-subscribe channel with the provided parameters.- Parameters:
template- the template.containerFactory- the container factory.topic- the topic.- Returns:
- the spec.
-
pollableChannel
public static KafkaPollableChannelSpec pollableChannel(org.springframework.kafka.core.KafkaTemplate<?, ?> template, KafkaMessageSource<?, ?> source) Create a spec for a pollable channel with the provided parameters.- Parameters:
template- the template.source- the source.- Returns:
- the spec.
- Since:
- 3.3
-