Class KafkaMessageDrivenChannelAdapter<K,V>
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.endpoint.MessageProducerSupport
org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter<K,V>
- Type Parameters:
K- the key type.V- the value type.
- All Implemented Interfaces:
Aware,BeanFactoryAware,BeanNameAware,DisposableBean,InitializingBean,SmartInitializingSingleton,ApplicationContextAware,Lifecycle,Phased,SmartLifecycle,ComponentSourceAware,ExpressionCapable,OrderlyShutdownCapable,MessageProducer,Pausable,IntegrationPattern,KafkaInboundEndpoint,NamedComponent,IntegrationInboundManagement,IntegrationManagement,ManageableLifecycle,ManageableSmartLifecycle,TrackableComponent
public class KafkaMessageDrivenChannelAdapter<K,V>
extends MessageProducerSupport
implements KafkaInboundEndpoint, OrderlyShutdownCapable, Pausable
Message-driven channel adapter.
- Since:
- 5.4
- Author:
- Marius Bogoevici, Gary Russell, Artem Bilan, Urs Keller
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic enumThe listener mode for the container, record or batch.Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
IntegrationManagement.ManagementOverrides -
Field Summary
Fields inherited from class org.springframework.integration.endpoint.AbstractEndpoint
lifecycleLockFields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, loggerFields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEFields inherited from interface org.springframework.integration.kafka.inbound.KafkaInboundEndpoint
ATTRIBUTES_HOLDER, CONTEXT_ACKNOWLEDGMENT, CONTEXT_CONSUMER, CONTEXT_RECORDFields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE -
Constructor Summary
ConstructorsConstructorDescriptionKafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer) Construct an instance with modeKafkaMessageDrivenChannelAdapter.ListenerMode.record.KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode mode) Construct an instance with the provided mode. -
Method Summary
Modifier and TypeMethodDescriptionintCalled after normal shutdown of schedulers, executors etc, and after the shutdown delay has elapsed, but before any forced shutdown of any remaining active scheduler/executor threads.Can optionally return the number of active messages still in process.intCalled before shutdown begins.protected voiddoStart()Take no action by default.protected voiddoStop()Take no action by default.Subclasses may implement this method to provide component type information.protected AttributeAccessorgetErrorMessageAttributes(Message<?> message) Populate anAttributeAccessorto be used when building an error message with theerrorMessageStrategy.booleanisPaused()Check if the endpoint is paused.protected voidonInit()Subclasses may implement this for initialization logic.voidpause()Pause the endpoint.voidresume()Resume the endpoint if paused.voidsetAckDiscarded(boolean ackDiscarded) Abooleanflag to indicate ifFilteringMessageListenerAdaptershould acknowledge discarded records or not.voidsetBatchMessageConverter(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter) Set the message converter to use with a batch-based consumer.voidsetBindSourceRecord(boolean bindSourceRecord) Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA.voidsetFilterInRetry(boolean filterInRetry) Thebooleanflag to specify the order in which the filter and retry operations are performed.voidsetMessageConverter(org.springframework.kafka.support.converter.MessageConverter messageConverter) Set the message converter; must be aRecordMessageConverterorBatchMessageConverterdepending on mode.voidsetOnPartitionsAssignedSeekCallback(BiConsumer<Map<org.apache.kafka.common.TopicPartition, Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) Specify aBiConsumerfor seeks management duringcall from theinvalid reference
ConsumerSeekAware.ConsumerSeekCallback#onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)KafkaMessageListenerContainer.voidsetPayloadType(Class<?> payloadType) When using a type-aware message converter such asStringJsonMessageConverter, set the payload type the converter should create.voidsetRecordFilterStrategy(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K, V> recordFilterStrategy) Specify aRecordFilterStrategyto wrapKafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListenerintoFilteringMessageListenerAdapter.voidsetRecordMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) Set the message converter to use with a record-based consumer.voidsetRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback) ARecoveryCallbackinstance for retry operation; if null, the exception will be thrown to the container after retries are exhausted (unless an error channel is configured).voidsetRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate) Specify aRetryTemplateinstance to use for retrying deliveries.Methods inherited from class org.springframework.integration.endpoint.MessageProducerSupport
afterSingletonsInstantiated, buildErrorMessage, getErrorChannel, getErrorMessageStrategy, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, getRequiredOutputChannel, isObserved, registerObservationRegistry, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setObservationConvention, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisherMethods inherited from class org.springframework.integration.endpoint.AbstractEndpoint
destroy, doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stopMethods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentDescription, getComponentName, getComponentSource, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentDescription, setComponentName, setComponentSource, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface org.springframework.integration.support.management.IntegrationManagement
destroy, getManagedName, getManagedType, getOverrides, getThisAs, isLoggingEnabled, registerMetricsCaptor, setLoggingEnabled, setManagedName, setManagedTypeMethods inherited from interface org.springframework.integration.kafka.inbound.KafkaInboundEndpoint
doWithRetryMethods inherited from interface org.springframework.integration.support.management.ManageableLifecycle
isRunning, start, stopMethods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName
-
Constructor Details
-
KafkaMessageDrivenChannelAdapter
public KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer) Construct an instance with modeKafkaMessageDrivenChannelAdapter.ListenerMode.record.- Parameters:
messageListenerContainer- the container.
-
KafkaMessageDrivenChannelAdapter
public KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode mode) Construct an instance with the provided mode.- Parameters:
messageListenerContainer- the container.mode- the mode.
-
-
Method Details
-
setMessageConverter
public void setMessageConverter(org.springframework.kafka.support.converter.MessageConverter messageConverter) Set the message converter; must be aRecordMessageConverterorBatchMessageConverterdepending on mode.- Parameters:
messageConverter- the converter.
-
setRecordMessageConverter
public void setRecordMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) Set the message converter to use with a record-based consumer.- Parameters:
messageConverter- the converter.
-
setBatchMessageConverter
public void setBatchMessageConverter(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter) Set the message converter to use with a batch-based consumer.- Parameters:
messageConverter- the converter.
-
setRecordFilterStrategy
public void setRecordFilterStrategy(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K, V> recordFilterStrategy) Specify aRecordFilterStrategyto wrapKafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListenerintoFilteringMessageListenerAdapter.- Parameters:
recordFilterStrategy- theRecordFilterStrategyto use.
-
setAckDiscarded
public void setAckDiscarded(boolean ackDiscarded) Abooleanflag to indicate ifFilteringMessageListenerAdaptershould acknowledge discarded records or not. Does not make sense ifsetRecordFilterStrategy(RecordFilterStrategy)isn't specified.- Parameters:
ackDiscarded- true to ack (commit offset for) discarded messages.
-
setRetryTemplate
public void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate) Specify aRetryTemplateinstance to use for retrying deliveries.IMPORTANT: This form of retry is blocking and could cause a rebalance if the aggregate retry delays across all polled records might exceed the
max.poll.interval.ms. Instead, consider adding aDefaultErrorHandlerto the listener container, configured with aKafkaErrorSendingMessageRecoverer.- Parameters:
retryTemplate- theRetryTemplateto use.
-
setRecoveryCallback
public void setRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback) ARecoveryCallbackinstance for retry operation; if null, the exception will be thrown to the container after retries are exhausted (unless an error channel is configured). Only used if asetRetryTemplate(RetryTemplate)is specified. Default is anErrorMessageSendingRecovererif an error channel has been provided. Set to null if you wish to throw the exception back to the container after retries are exhausted.- Parameters:
recoveryCallback- the recovery callback.
-
setFilterInRetry
public void setFilterInRetry(boolean filterInRetry) Thebooleanflag to specify the order in which the filter and retry operations are performed. Does not make sense if only one ofRetryTemplateorRecordFilterStrategyis present, or none. When true, the filter is called for each retry; when false, the filter is only called once for each delivery from the container.- Parameters:
filterInRetry- true to filter for each retry. Defaults tofalse.
-
setPayloadType
-
setOnPartitionsAssignedSeekCallback
public void setOnPartitionsAssignedSeekCallback(BiConsumer<Map<org.apache.kafka.common.TopicPartition, Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) Specify aBiConsumerfor seeks management duringcall from theinvalid reference
ConsumerSeekAware.ConsumerSeekCallback#onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)KafkaMessageListenerContainer. This is called from the internalMessagingMessageListenerAdapterimplementation.- Parameters:
onPartitionsAssignedCallback- theBiConsumerto use- See Also:
-
setBindSourceRecord
public void setBindSourceRecord(boolean bindSourceRecord) Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA. Does not apply to batch listeners.- Parameters:
bindSourceRecord- true to bind.
-
getComponentType
Description copied from class:IntegrationObjectSupportSubclasses may implement this method to provide component type information.- Specified by:
getComponentTypein interfaceNamedComponent- Overrides:
getComponentTypein classIntegrationObjectSupport
-
onInit
protected void onInit()Description copied from class:IntegrationObjectSupportSubclasses may implement this for initialization logic.- Overrides:
onInitin classMessageProducerSupport
-
doStart
protected void doStart()Description copied from class:MessageProducerSupportTake no action by default. Subclasses may override this if they need lifecycle-managed behavior. Protected by 'lifecycleLock'.- Overrides:
doStartin classMessageProducerSupport
-
doStop
protected void doStop()Description copied from class:MessageProducerSupportTake no action by default. Subclasses may override this if they need lifecycle-managed behavior.- Overrides:
doStopin classMessageProducerSupport
-
pause
-
resume
-
isPaused
-
beforeShutdown
public int beforeShutdown()Description copied from interface:OrderlyShutdownCapableCalled before shutdown begins. Implementations should stop accepting new messages. Can optionally return the number of active messages in process.- Specified by:
beforeShutdownin interfaceOrderlyShutdownCapable- Returns:
- The number of active messages if available.
-
afterShutdown
public int afterShutdown()Description copied from interface:OrderlyShutdownCapableCalled after normal shutdown of schedulers, executors etc, and after the shutdown delay has elapsed, but before any forced shutdown of any remaining active scheduler/executor threads.Can optionally return the number of active messages still in process.- Specified by:
afterShutdownin interfaceOrderlyShutdownCapable- Returns:
- The number of active messages if available.
-
getErrorMessageAttributes
Description copied from class:MessageProducerSupportPopulate anAttributeAccessorto be used when building an error message with theerrorMessageStrategy.- Overrides:
getErrorMessageAttributesin classMessageProducerSupport- Parameters:
message- the message.- Returns:
- the attributes.
-