Class KafkaMessageSource<K,V>
java.lang.Object
org.springframework.integration.util.AbstractExpressionEvaluator
org.springframework.integration.endpoint.AbstractMessageSource<Object>
org.springframework.integration.kafka.inbound.KafkaMessageSource<K,V>
- Type Parameters:
- K- the key type.
- V- the value type.
- All Implemented Interfaces:
- Aware,- BeanFactoryAware,- BeanNameAware,- DisposableBean,- InitializingBean,- Lifecycle,- MessageSource<Object>,- Pausable,- IntegrationPattern,- NamedComponent,- IntegrationInboundManagement,- IntegrationManagement,- ManageableLifecycle
public class KafkaMessageSource<K,V> extends AbstractMessageSource<Object> implements Pausable
Polled message source for kafka. Only one thread can poll for data (or
 acknowledge a message) at a time.
 
NOTE: If the application acknowledges messages out of order, the acks will be deferred until all messages prior to the offset are ack'd. If multiple records are retrieved and an earlier offset is requeued, records from the subsequent offsets will be redelivered - even if they were processed successfully. Applications should therefore implement idempotency.
 Starting with version 3.1.2, this source implements Pausable which
 allows you to pause and resume the Consumer. While the consumer is
 paused, you must continue to call AbstractMessageSource.receive() within
 max.poll.interval.ms, to prevent a rebalance.
- Since:
- 5.4
- Author:
- Gary Russell, Mark Norkin, Artem Bilan, Anshul Mehra
- 
Nested Class SummaryNested Classes Modifier and Type Class Description static classKafkaMessageSource.KafkaAckCallback<K,V>AcknowledgmentCallback for Kafka.static classKafkaMessageSource.KafkaAckCallbackFactory<K,V>AcknowledgmentCallbackFactory for KafkaAckInfo.static interfaceKafkaMessageSource.KafkaAckInfo<K,V>Information for building an KafkaAckCallback.classKafkaMessageSource.KafkaAckInfoImplInformation for building an KafkaAckCallback.Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagementIntegrationManagement.ManagementOverrides
- 
Field SummaryFields Modifier and Type Field Description static StringREMAINING_RECORDSThe number of records remaining from the previous poll.Fields inherited from class org.springframework.integration.util.AbstractExpressionEvaluatorEXPRESSION_PARSER, loggerFields inherited from interface org.springframework.integration.support.management.IntegrationManagementMETER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
- 
Constructor SummaryConstructors Constructor Description KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties)Construct an instance with the supplied parameters.KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch)Construct an instance with the supplied parameters.KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory)Construct an instance with the supplied parameters.KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory, boolean allowMultiFetch)Construct an instance with the supplied parameters.
- 
Method SummaryModifier and Type Method Description protected voidcreateConsumer()voiddestroy()protected ObjectdoReceive()Subclasses must implement this method.Collection<org.apache.kafka.common.TopicPartition>getAssignedPartitions()Return the currently assigned partitions.protected StringgetClientId()protected DurationgetCommitTimeout()StringgetComponentType()org.springframework.kafka.listener.ConsumerPropertiesgetConsumerProperties()Get a reference to the configured consumer properties; allows further customization of the properties before the source is started.protected StringgetGroupId()protected org.springframework.kafka.support.converter.RecordMessageConvertergetMessageConverter()protected Class<?>getPayloadType()protected longgetPollTimeout()protected org.apache.kafka.clients.consumer.ConsumerRebalanceListenergetRebalanceListener()booleanisPaused()Check if the endpoint is paused.protected booleanisRawMessageHeader()booleanisRunning()protected voidonInit()voidpause()Pause the endpoint.voidresume()Resume the endpoint if paused.voidsetCloseTimeout(Duration closeTimeout)Set the close timeout - default 30 seconds.voidsetMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)Set the message converter to replace the defaultMessagingMessageConverter.voidsetPayloadType(Class<?> payloadType)Set the payload type.voidsetRawMessageHeader(boolean rawMessageHeader)Set to true to include the rawConsumerRecordas headers with keysKafkaHeaders.RAW_DATAandIntegrationMessageHeaderAccessor.SOURCE_DATA.voidstart()voidstop()Methods inherited from class org.springframework.integration.endpoint.AbstractMessageSourcebuildMessage, getBeanName, getComponentName, getManagedName, getManagedType, getOverrides, isLoggingEnabled, receive, registerMetricsCaptor, setBeanName, setHeaderExpressions, setLoggingEnabled, setManagedName, setManagedTypeMethods inherited from class org.springframework.integration.util.AbstractExpressionEvaluatorafterPropertiesSet, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, getBeanFactory, getEvaluationContext, getEvaluationContext, getMessageBuilderFactory, setBeanFactory, setConversionServiceMethods inherited from class java.lang.Objectclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.springframework.integration.support.management.IntegrationManagementgetThisAsMethods inherited from interface org.springframework.integration.core.MessageSourcegetIntegrationPatternType
- 
Field Details- 
REMAINING_RECORDSThe number of records remaining from the previous poll.- Since:
- 3.2
- See Also:
- Constant Field Values
 
 
- 
- 
Constructor Details- 
KafkaMessageSourcepublic KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties)Construct an instance with the supplied parameters. Fetching multiple records per poll will be disabled.- Parameters:
- consumerFactory- the consumer factory.
- consumerProperties- the consumer properties.
- Since:
- 3.2
- See Also:
- KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)
 
- 
KafkaMessageSourcepublic KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch)Construct an instance with the supplied parameters. Set 'allowMultiFetch' to true to allow up tomax.poll.recordsto be fetched on each poll. When false (default)max.poll.recordsis coerced to 1 if the consumer factory is aDefaultKafkaConsumerFactoryor otherwise rejected with anIllegalArgumentException. IMPORTANT: When true, you must callAbstractMessageSource.receive()at a sufficient rate to consume the number of records received withinmax.poll.interval.ms. When false, you must callAbstractMessageSource.receive()withinmax.poll.interval.ms.pause()will not take effect until the records from the previous poll are consumed.- Parameters:
- consumerFactory- the consumer factory.
- consumerProperties- the consumer properties.
- allowMultiFetch- true to allow- max.poll.records > 1.
- Since:
- 3.2
 
- 
KafkaMessageSourcepublic KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory)Construct an instance with the supplied parameters. Fetching multiple records per poll will be disabled.- Parameters:
- consumerFactory- the consumer factory.
- consumerProperties- the consumer properties.
- ackCallbackFactory- the ack callback factory.
- Since:
- 3.2
- See Also:
- KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)
 
- 
KafkaMessageSourcepublic KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory, boolean allowMultiFetch)Construct an instance with the supplied parameters. Set 'allowMultiFetch' to true to allow up tomax.poll.recordsto be fetched on each poll. When false (default)max.poll.recordsis coerced to 1 if the consumer factory is aDefaultKafkaConsumerFactoryor otherwise rejected with anIllegalArgumentException. IMPORTANT: When true, you must callAbstractMessageSource.receive()at a sufficient rate to consume the number of records received withinmax.poll.interval.ms. When false, you must callAbstractMessageSource.receive()withinmax.poll.interval.ms.pause()will not take effect until the records from the previous poll are consumed.- Parameters:
- consumerFactory- the consumer factory.
- consumerProperties- the consumer properties.
- ackCallbackFactory- the ack callback factory.
- allowMultiFetch- true to allow- max.poll.records > 1.
- Since:
- 3.2
 
 
- 
- 
Method Details- 
getAssignedPartitionsReturn the currently assigned partitions.- Returns:
- the partitions.
- Since:
- 3.2.2
 
- 
onInitprotected void onInit()- Overrides:
- onInitin class- AbstractExpressionEvaluator
 
- 
getConsumerPropertiespublic org.springframework.kafka.listener.ConsumerProperties getConsumerProperties()Get a reference to the configured consumer properties; allows further customization of the properties before the source is started.- Returns:
- the properties.
- Since:
- 3.2
 
- 
getGroupId
- 
getClientId
- 
getPollTimeoutprotected long getPollTimeout()
- 
getMessageConverterprotected org.springframework.kafka.support.converter.RecordMessageConverter getMessageConverter()
- 
setMessageConverterpublic void setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)Set the message converter to replace the defaultMessagingMessageConverter.- Parameters:
- messageConverter- the converter.
 
- 
getPayloadType
- 
setPayloadTypeSet the payload type. Only applies if a type-aware message converter is provided.- Parameters:
- payloadType- the type to convert to.
 
- 
getRebalanceListenerprotected org.apache.kafka.clients.consumer.ConsumerRebalanceListener getRebalanceListener()
- 
getComponentType- Specified by:
- getComponentTypein interface- NamedComponent
 
- 
isRawMessageHeaderprotected boolean isRawMessageHeader()
- 
setRawMessageHeaderpublic void setRawMessageHeader(boolean rawMessageHeader)Set to true to include the rawConsumerRecordas headers with keysKafkaHeaders.RAW_DATAandIntegrationMessageHeaderAccessor.SOURCE_DATA. enabling callers to have access to the record to process errors.- Parameters:
- rawMessageHeader- true to include the header.
 
- 
getCommitTimeout
- 
setCloseTimeoutSet the close timeout - default 30 seconds.- Parameters:
- closeTimeout- the close timeout.
 
- 
isRunningpublic boolean isRunning()- Specified by:
- isRunningin interface- Lifecycle
- Specified by:
- isRunningin interface- ManageableLifecycle
 
- 
startpublic void start()- Specified by:
- startin interface- Lifecycle
- Specified by:
- startin interface- ManageableLifecycle
 
- 
stoppublic void stop()- Specified by:
- stopin interface- Lifecycle
- Specified by:
- stopin interface- ManageableLifecycle
 
- 
pausepublic void pause()Description copied from interface:PausablePause the endpoint.
- 
resumepublic void resume()Description copied from interface:PausableResume the endpoint if paused.
- 
isPausedpublic boolean isPaused()Description copied from interface:PausableCheck if the endpoint is paused.
- 
doReceiveDescription copied from class:AbstractMessageSourceSubclasses must implement this method. Typically the returned value will be thepayloadof type T, but the returned value may also be aMessageinstance whose payload is of type T; also can beAbstractIntegrationMessageBuilderwhich is used for additional headers population.- Specified by:
- doReceivein class- AbstractMessageSource<Object>
- Returns:
- The value returned.
 
- 
createConsumerprotected void createConsumer()
- 
destroypublic void destroy()- Specified by:
- destroyin interface- DisposableBean
- Specified by:
- destroyin interface- IntegrationManagement
- Overrides:
- destroyin class- AbstractMessageSource<Object>
 
 
-