Class KafkaMessageDrivenChannelAdapter<K,V>
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.endpoint.MessageProducerSupport
org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter<K,V>
- Type Parameters:
K- the key type.V- the value type.
- All Implemented Interfaces:
Aware,BeanFactoryAware,BeanNameAware,DisposableBean,InitializingBean,SmartInitializingSingleton,ApplicationContextAware,Lifecycle,Phased,SmartLifecycle,ExpressionCapable,OrderlyShutdownCapable,MessageProducer,Pausable,IntegrationPattern,NamedComponent,ManageableLifecycle,ManageableSmartLifecycle,TrackableComponent
public class KafkaMessageDrivenChannelAdapter<K,V> extends MessageProducerSupport implements OrderlyShutdownCapable, Pausable
Message-driven channel adapter.
- Since:
- 5.4
- Author:
- Marius Bogoevici, Gary Russell, Artem Bilan, Urs Keller
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classKafkaMessageDrivenChannelAdapter.ListenerModeThe listener mode for the container, record or batch. -
Field Summary
Fields inherited from class org.springframework.integration.endpoint.AbstractEndpoint
lifecycleCondition, lifecycleLockFields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, logger -
Constructor Summary
Constructors Constructor Description KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer)Construct an instance with modeKafkaMessageDrivenChannelAdapter.ListenerMode.record.KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode mode)Construct an instance with the provided mode. -
Method Summary
Modifier and Type Method Description intafterShutdown()Called after normal shutdown of schedulers, executors etc, and after the shutdown delay has elapsed, but before any forced shutdown of any remaining active scheduler/executor threads.Can optionally return the number of active messages still in process.intbeforeShutdown()Called before shutdown begins.protected voiddoStart()Take no action by default.protected voiddoStop()Take no action by default.StringgetComponentType()Subclasses may implement this method to provide component type information.protected AttributeAccessorgetErrorMessageAttributes(Message<?> message)Populate anAttributeAccessorto be used when building an error message with theerrorMessageStrategy.booleanisPaused()Check if the endpoint is paused.protected voidonInit()Subclasses may implement this for initialization logic.voidpause()Pause the endpoint.voidresume()Resume the endpoint if paused.voidsetAckDiscarded(boolean ackDiscarded)Abooleanflag to indicate ifFilteringMessageListenerAdaptershould acknowledge discarded records or not.voidsetBatchMessageConverter(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter)Set the message converter to use with a batch-based consumer.voidsetBindSourceRecord(boolean bindSourceRecord)Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA.voidsetFilterInRetry(boolean filterInRetry)Thebooleanflag to specify the order howRetryingMessageListenerAdapterandFilteringMessageListenerAdapterare wrapped to each other, if both of them are present.voidsetMessageConverter(org.springframework.kafka.support.converter.MessageConverter messageConverter)Set the message converter; must be aRecordMessageConverterorBatchMessageConverterdepending on mode.voidsetOnPartitionsAssignedSeekCallback(BiConsumer<Map<org.apache.kafka.common.TopicPartition,Long>,org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback)Specify aBiConsumerfor seeks management duringConsumerSeekAware.onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)call from theKafkaMessageListenerContainer.voidsetPayloadType(Class<?> payloadType)When using a type-aware message converter such asStringJsonMessageConverter, set the payload type the converter should create.voidsetRecordFilterStrategy(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K,V> recordFilterStrategy)Specify aRecordFilterStrategyto wrapKafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListenerintoFilteringMessageListenerAdapter.voidsetRecordMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)Set the message converter to use with a record-based consumer.voidsetRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback)ARecoveryCallbackinstance for retry operation; if null, the exception will be thrown to the container after retries are exhausted (unless an error channel is configured).voidsetRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)Specify aRetryTemplateinstance to wrapKafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListenerintoRetryingMessageListenerAdapter.Methods inherited from class org.springframework.integration.endpoint.MessageProducerSupport
afterSingletonsInstantiated, buildErrorMessage, getErrorChannel, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisherMethods inherited from class org.springframework.integration.endpoint.AbstractEndpoint
destroy, doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stopMethods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface org.springframework.integration.support.management.ManageableLifecycle
isRunning, start, stopMethods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName
-
Constructor Details
-
KafkaMessageDrivenChannelAdapter
public KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer)Construct an instance with modeKafkaMessageDrivenChannelAdapter.ListenerMode.record.- Parameters:
messageListenerContainer- the container.
-
KafkaMessageDrivenChannelAdapter
public KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode mode)Construct an instance with the provided mode.- Parameters:
messageListenerContainer- the container.mode- the mode.
-
-
Method Details
-
setMessageConverter
public void setMessageConverter(org.springframework.kafka.support.converter.MessageConverter messageConverter)Set the message converter; must be aRecordMessageConverterorBatchMessageConverterdepending on mode.- Parameters:
messageConverter- the converter.
-
setRecordMessageConverter
public void setRecordMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)Set the message converter to use with a record-based consumer.- Parameters:
messageConverter- the converter.
-
setBatchMessageConverter
public void setBatchMessageConverter(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter)Set the message converter to use with a batch-based consumer.- Parameters:
messageConverter- the converter.
-
setRecordFilterStrategy
public void setRecordFilterStrategy(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K,V> recordFilterStrategy)Specify aRecordFilterStrategyto wrapKafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListenerintoFilteringMessageListenerAdapter.- Parameters:
recordFilterStrategy- theRecordFilterStrategyto use.
-
setAckDiscarded
public void setAckDiscarded(boolean ackDiscarded)Abooleanflag to indicate ifFilteringMessageListenerAdaptershould acknowledge discarded records or not. Does not make sense ifsetRecordFilterStrategy(RecordFilterStrategy)isn't specified.- Parameters:
ackDiscarded- true to ack (commit offset for) discarded messages.
-
setRetryTemplate
public void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)Specify aRetryTemplateinstance to wrapKafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListenerintoRetryingMessageListenerAdapter.- Parameters:
retryTemplate- theRetryTemplateto use.
-
setRecoveryCallback
public void setRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback)ARecoveryCallbackinstance for retry operation; if null, the exception will be thrown to the container after retries are exhausted (unless an error channel is configured). Does not make sense ifsetRetryTemplate(RetryTemplate)isn't specified.- Parameters:
recoveryCallback- the recovery callback.
-
setFilterInRetry
public void setFilterInRetry(boolean filterInRetry)Thebooleanflag to specify the order howRetryingMessageListenerAdapterandFilteringMessageListenerAdapterare wrapped to each other, if both of them are present. Does not make sense if only one ofRetryTemplateorRecordFilterStrategyis present, or any.- Parameters:
filterInRetry- the order forRetryingMessageListenerAdapterandFilteringMessageListenerAdapterwrapping. Defaults tofalse.
-
setPayloadType
When using a type-aware message converter such asStringJsonMessageConverter, set the payload type the converter should create. Defaults toObject.- Parameters:
payloadType- the type.
-
setOnPartitionsAssignedSeekCallback
public void setOnPartitionsAssignedSeekCallback(BiConsumer<Map<org.apache.kafka.common.TopicPartition,Long>,org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback)Specify aBiConsumerfor seeks management duringConsumerSeekAware.onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)call from theKafkaMessageListenerContainer. This is called from the internalMessagingMessageListenerAdapterimplementation.- Parameters:
onPartitionsAssignedCallback- theBiConsumerto use- See Also:
ConsumerSeekAware.onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition, java.lang.Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback)
-
setBindSourceRecord
public void setBindSourceRecord(boolean bindSourceRecord)Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA. Does not apply to batch listeners.- Parameters:
bindSourceRecord- true to bind.
-
getComponentType
Description copied from class:IntegrationObjectSupportSubclasses may implement this method to provide component type information.- Specified by:
getComponentTypein interfaceNamedComponent- Overrides:
getComponentTypein classIntegrationObjectSupport
-
onInit
protected void onInit()Description copied from class:IntegrationObjectSupportSubclasses may implement this for initialization logic.- Overrides:
onInitin classMessageProducerSupport
-
doStart
protected void doStart()Description copied from class:MessageProducerSupportTake no action by default. Subclasses may override this if they need lifecycle-managed behavior. Protected by 'lifecycleLock'.- Overrides:
doStartin classMessageProducerSupport
-
doStop
protected void doStop()Description copied from class:MessageProducerSupportTake no action by default. Subclasses may override this if they need lifecycle-managed behavior.- Overrides:
doStopin classMessageProducerSupport
-
pause
public void pause()Description copied from interface:PausablePause the endpoint. -
resume
public void resume()Description copied from interface:PausableResume the endpoint if paused. -
isPaused
public boolean isPaused()Description copied from interface:PausableCheck if the endpoint is paused. -
beforeShutdown
public int beforeShutdown()Description copied from interface:OrderlyShutdownCapableCalled before shutdown begins. Implementations should stop accepting new messages. Can optionally return the number of active messages in process.- Specified by:
beforeShutdownin interfaceOrderlyShutdownCapable- Returns:
- The number of active messages if available.
-
afterShutdown
public int afterShutdown()Description copied from interface:OrderlyShutdownCapableCalled after normal shutdown of schedulers, executors etc, and after the shutdown delay has elapsed, but before any forced shutdown of any remaining active scheduler/executor threads.Can optionally return the number of active messages still in process.- Specified by:
afterShutdownin interfaceOrderlyShutdownCapable- Returns:
- The number of active messages if available.
-
getErrorMessageAttributes
Description copied from class:MessageProducerSupportPopulate anAttributeAccessorto be used when building an error message with theerrorMessageStrategy.- Overrides:
getErrorMessageAttributesin classMessageProducerSupport- Parameters:
message- the message.- Returns:
- the attributes.
-