Class KafkaMessageSource<K,V>
java.lang.Object
org.springframework.integration.util.AbstractExpressionEvaluator
org.springframework.integration.endpoint.AbstractMessageSource<Object>
org.springframework.integration.kafka.inbound.KafkaMessageSource<K,V>
- Type Parameters:
K- the key type.V- the value type.
- All Implemented Interfaces:
Aware,BeanClassLoaderAware,BeanFactoryAware,BeanNameAware,DisposableBean,InitializingBean,Lifecycle,MessageSource<Object>,Pausable,IntegrationPattern,NamedComponent,IntegrationInboundManagement,IntegrationManagement,ManageableLifecycle
public class KafkaMessageSource<K,V>
extends AbstractMessageSource<Object>
implements Pausable, BeanClassLoaderAware
Polled message source for Apache Kafka.
Only one thread can poll for data (or acknowledge a message) at a time.
NOTE: If the application acknowledges messages out of order, the acks will be deferred until all messages prior to the offset are ack'd. If multiple records are retrieved and an earlier offset is requeued, records from the subsequent offsets will be redelivered - even if they were processed successfully. Applications should therefore implement idempotency.
Starting with version 3.1.2, this source implements Pausable which
allows you to pause and resume the Consumer. While the consumer is
paused, you must continue to call AbstractMessageSource.receive() within
max.poll.interval.ms, to prevent a rebalance.
- Since:
- 5.4
- Author:
- Gary Russell, Mark Norkin, Artem Bilan, Anshul Mehra, Christian Tzolov, Ngoc Nhan
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classAcknowledgmentCallback for Kafka.static final recordAcknowledgmentCallbackFactory for KafkaAckInfo.static interfaceInformation for building an KafkaAckCallback.classInformation for building an KafkaAckCallback.Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
IntegrationManagement.ManagementOverrides -
Field Summary
FieldsModifier and TypeFieldDescriptionbooleanstatic final StringThe number of records remaining from the previous poll.Fields inherited from class org.springframework.integration.util.AbstractExpressionEvaluator
EXPRESSION_PARSER, loggerFields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME -
Constructor Summary
ConstructorsConstructorDescriptionKafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties) Construct an instance with the supplied parameters.KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch) Construct an instance with the supplied parameters.KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K, V> ackCallbackFactory) Construct an instance with the supplied parameters.KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K, V> ackCallbackFactory, boolean allowMultiFetch) Construct an instance with the supplied parameters. -
Method Summary
Modifier and TypeMethodDescriptionprotected voidvoiddestroy()protected ObjectSubclasses must implement this method.Collection<org.apache.kafka.common.TopicPartition>Return the currently assigned partitions.protected Stringprotected Durationorg.springframework.kafka.listener.ConsumerPropertiesGet a reference to the configured consumer properties; allows further customization of the properties before the source is started.protected Stringprotected org.springframework.kafka.support.converter.RecordMessageConverterprotected Class<?>protected longprotected org.apache.kafka.clients.consumer.ConsumerRebalanceListenerbooleanisPaused()Check if the endpoint is paused.protected booleanbooleanprotected voidonInit()voidpause()Pause the endpoint.voidresume()Resume the endpoint if paused.voidsetBeanClassLoader(ClassLoader classLoader) voidsetCloseTimeout(Duration closeTimeout) Set the close timeout - default 30 seconds.voidsetMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) Set the message converter to replace the defaultMessagingMessageConverter.voidsetPayloadType(Class<?> payloadType) Set the payload type.voidsetRawMessageHeader(boolean rawMessageHeader) Set to true to include the rawConsumerRecordas headers with keysKafkaHeaders.RAW_DATAandIntegrationMessageHeaderAccessor.SOURCE_DATA.voidstart()voidstop()Methods inherited from class org.springframework.integration.endpoint.AbstractMessageSource
buildMessage, getBeanName, getComponentName, getManagedName, getManagedType, getOverrides, isLoggingEnabled, receive, registerMetricsCaptor, setBeanName, setHeaderExpressions, setLoggingEnabled, setManagedName, setManagedTypeMethods inherited from class org.springframework.integration.util.AbstractExpressionEvaluator
afterPropertiesSet, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, getBeanFactory, getEvaluationContext, getEvaluationContext, getMessageBuilderFactory, setBeanFactory, setConversionService, setSimpleEvaluationContextMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.springframework.integration.support.management.IntegrationManagement
getThisAs, isObserved, registerObservationRegistryMethods inherited from interface org.springframework.integration.core.MessageSource
getIntegrationPatternType
-
Field Details
-
REMAINING_RECORDS
The number of records remaining from the previous poll.- Since:
- 3.2
- See Also:
-
newAssignment
public volatile boolean newAssignment
-
-
Constructor Details
-
KafkaMessageSource
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties) Construct an instance with the supplied parameters. Fetching multiple records per poll will be disabled.- Parameters:
consumerFactory- the consumer factory.consumerProperties- the consumer properties.- See Also:
-
KafkaMessageSource
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch) Construct an instance with the supplied parameters. Set 'allowMultiFetch' to true to allow up tomax.poll.recordsto be fetched on each poll. When false (default)max.poll.recordsis coerced to 1 if the consumer factory is aDefaultKafkaConsumerFactoryor otherwise rejected with anIllegalArgumentException. IMPORTANT: When true, you must callAbstractMessageSource.receive()at a sufficient rate to consume the number of records received withinmax.poll.interval.ms. When false, you must callAbstractMessageSource.receive()withinmax.poll.interval.ms.pause()will not take effect until the records from the previous poll are consumed.- Parameters:
consumerFactory- the consumer factory.consumerProperties- the consumer properties.allowMultiFetch- true to allowmax.poll.records > 1.
-
KafkaMessageSource
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K, V> ackCallbackFactory) Construct an instance with the supplied parameters. Fetching multiple records per poll will be disabled.- Parameters:
consumerFactory- the consumer factory.consumerProperties- the consumer properties.ackCallbackFactory- the ack callback factory.- See Also:
-
KafkaMessageSource
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K, V> ackCallbackFactory, boolean allowMultiFetch) Construct an instance with the supplied parameters. Set 'allowMultiFetch' to true to allow up tomax.poll.recordsto be fetched on each poll. When false (default)max.poll.recordsis coerced to 1 if the consumer factory is aDefaultKafkaConsumerFactoryor otherwise rejected with anIllegalArgumentException. IMPORTANT: When true, you must callAbstractMessageSource.receive()at a sufficient rate to consume the number of records received withinmax.poll.interval.ms. When false, you must callAbstractMessageSource.receive()withinmax.poll.interval.ms.pause()will not take effect until the records from the previous poll are consumed.- Parameters:
consumerFactory- the consumer factory.consumerProperties- the consumer properties.ackCallbackFactory- the ack callback factory.allowMultiFetch- true to allowmax.poll.records > 1.
-
-
Method Details
-
getAssignedPartitions
Return the currently assigned partitions.- Returns:
- the partitions.
-
setBeanClassLoader
- Specified by:
setBeanClassLoaderin interfaceBeanClassLoaderAware
-
onInit
protected void onInit()- Overrides:
onInitin classAbstractExpressionEvaluator
-
getConsumerProperties
public org.springframework.kafka.listener.ConsumerProperties getConsumerProperties()Get a reference to the configured consumer properties; allows further customization of the properties before the source is started.- Returns:
- the properties.
-
getGroupId
-
getClientId
-
getPollTimeout
protected long getPollTimeout() -
getMessageConverter
protected org.springframework.kafka.support.converter.RecordMessageConverter getMessageConverter() -
setMessageConverter
public void setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) Set the message converter to replace the defaultMessagingMessageConverter.- Parameters:
messageConverter- the converter.
-
getPayloadType
-
setPayloadType
Set the payload type. Only applies if a type-aware message converter is provided.- Parameters:
payloadType- the type to convert to.
-
getRebalanceListener
protected org.apache.kafka.clients.consumer.ConsumerRebalanceListener getRebalanceListener() -
getComponentType
- Specified by:
getComponentTypein interfaceNamedComponent
-
isRawMessageHeader
protected boolean isRawMessageHeader() -
setRawMessageHeader
public void setRawMessageHeader(boolean rawMessageHeader) Set to true to include the rawConsumerRecordas headers with keysKafkaHeaders.RAW_DATAandIntegrationMessageHeaderAccessor.SOURCE_DATA. enabling callers to have access to the record to process errors.- Parameters:
rawMessageHeader- true to include the header.
-
getCommitTimeout
-
setCloseTimeout
Set the close timeout - default 30 seconds.- Parameters:
closeTimeout- the close timeout.
-
isRunning
public boolean isRunning()- Specified by:
isRunningin interfaceLifecycle- Specified by:
isRunningin interfaceManageableLifecycle
-
start
public void start()- Specified by:
startin interfaceLifecycle- Specified by:
startin interfaceManageableLifecycle
-
stop
public void stop()- Specified by:
stopin interfaceLifecycle- Specified by:
stopin interfaceManageableLifecycle
-
pause
public void pause()Description copied from interface:PausablePause the endpoint. -
resume
public void resume()Description copied from interface:PausableResume the endpoint if paused. -
isPaused
public boolean isPaused()Description copied from interface:PausableCheck if the endpoint is paused. -
doReceive
Description copied from class:AbstractMessageSourceSubclasses must implement this method. Typically the returned value will be thepayloadof type T, but the returned value may also be aMessageinstance whose payload is of type T; also can beAbstractIntegrationMessageBuilderwhich is used for additional headers population.- Specified by:
doReceivein classAbstractMessageSource<Object>- Returns:
- The value returned.
-
createConsumer
protected void createConsumer() -
destroy
public void destroy()- Specified by:
destroyin interfaceDisposableBean- Specified by:
destroyin interfaceIntegrationManagement- Overrides:
destroyin classAbstractMessageSource<Object>
-