Class KafkaProducerMessageHandler<K,V> 
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.handler.MessageHandlerSupport
org.springframework.integration.handler.AbstractMessageHandler
org.springframework.integration.handler.AbstractMessageProducingHandler
org.springframework.integration.handler.AbstractReplyProducingMessageHandler
org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler<K,V> 
- Type Parameters:
- K- the key type.
- V- the value type.
- All Implemented Interfaces:
- org.reactivestreams.Subscriber<Message<?>>,- Aware,- BeanClassLoaderAware,- BeanFactoryAware,- BeanNameAware,- DisposableBean,- InitializingBean,- ApplicationContextAware,- Lifecycle,- Ordered,- ExpressionCapable,- Orderable,- MessageProducer,- HeaderPropagationAware,- IntegrationPattern,- NamedComponent,- IntegrationManagement,- ManageableLifecycle,- TrackableComponent,- MessageHandler,- reactor.core.CoreSubscriber<Message<?>>
public class KafkaProducerMessageHandler<K,V> 
extends AbstractReplyProducingMessageHandler
implements ManageableLifecycle
A Message Handler for Apache Kafka; when supplied with a 
ReplyingKafkaTemplate it is used as
 the handler in an outbound gateway. When supplied with a simple KafkaTemplate
 it used as the handler in an outbound channel adapter.
 
 The handler also supports receiving a pre-built
 ProducerRecord payload. In that case, most configuration properties
 (setTopicExpression(Expression) etc.) are ignored. If the handler is used as
 gateway, the ProducerRecord will have its headers enhanced to add the
 KafkaHeaders.REPLY_TOPIC unless it already contains such a header. The handler
 will not map any additional headers; providing such a payload assumes the headers have
 already been mapped.
- Since:
- 5.4
- Author:
- Soby Chacko, Artem Bilan, Gary Russell, Marius Bogoevici, Biju Kunjummen, Tom van den Berge
- 
Nested Class SummaryNested ClassesModifier and TypeClassDescriptionstatic interfaceCreates aProducerRecordfrom aMessageand/or properties derived from configuration and/or the message.Nested classes/interfaces inherited from class org.springframework.integration.handler.AbstractReplyProducingMessageHandlerAbstractReplyProducingMessageHandler.RequestHandlerNested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagementIntegrationManagement.ManagementOverrides
- 
Field SummaryFields inherited from class org.springframework.integration.handler.AbstractMessageProducingHandlermessagingTemplateFields inherited from class org.springframework.integration.context.IntegrationObjectSupportEXPRESSION_PARSER, loggerFields inherited from interface org.springframework.integration.support.management.IntegrationManagementMETER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEFields inherited from interface org.springframework.core.OrderedHIGHEST_PRECEDENCE, LOWEST_PRECEDENCE
- 
Constructor SummaryConstructorsConstructorDescriptionKafkaProducerMessageHandler(org.springframework.kafka.core.KafkaTemplate<K, V> kafkaTemplate) 
- 
Method SummaryModifier and TypeMethodDescriptionprotected voiddoInit()Subclasses may implement this method to provide component type information.protected MessageChannelorg.springframework.kafka.support.KafkaHeaderMapperorg.springframework.kafka.core.KafkaTemplate<?,?> protected MessageChannelprotected MessageChannelprotected ObjecthandleRequestMessage(Message<?> message) Subclasses must implement this method to handle the request Message.booleanvoidprocessSendResult(Message<?> message, org.apache.kafka.clients.producer.ProducerRecord<K, V> producerRecord, ListenableFuture<org.springframework.kafka.support.SendResult<K, V>> future, MessageChannel metadataChannel) voidsetErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) Set the error message strategy implementation to use when sending error messages after send failures.voidsetFlushExpression(Expression flushExpression) Specify a SpEL expression that evaluates to aBooleanto determine whether the producer should be flushed after the send.voidsetFuturesChannel(MessageChannel futuresChannel) Set the futures channel.voidsetFuturesChannelName(String futuresChannelName) Set the futures channel name.voidsetHeaderMapper(org.springframework.kafka.support.KafkaHeaderMapper headerMapper) Set the header mapper to use.voidsetMessageKeyExpression(Expression messageKeyExpression) voidsetPartitionIdExpression(Expression partitionIdExpression) voidsetProducerRecordCreator(KafkaProducerMessageHandler.ProducerRecordCreator<K, V> producerRecordCreator) Set aKafkaProducerMessageHandler.ProducerRecordCreatorto create theProducerRecord.voidsetReplyMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) Set a message converter for gateway replies.voidsetReplyPayloadType(Type payloadType) When using a type-aware message converter (such asStringJsonMessageConverter, set the payload type the converter should create.voidsetSendFailureChannel(MessageChannel sendFailureChannel) Set the failure channel.voidsetSendFailureChannelName(String sendFailureChannelName) Set the failure channel name.voidsetSendSuccessChannel(MessageChannel sendSuccessChannel) Set the success channel.voidsetSendSuccessChannelName(String sendSuccessChannelName) Set the Success channel name.final voidsetSendTimeout(long sendTimeout) Specify a timeout in milliseconds for how long thisKafkaProducerMessageHandlershould wait for send operation results.voidsetSendTimeoutExpression(Expression sendTimeoutExpression) Specify a SpEL expression to evaluate a timeout in milliseconds for how long thisKafkaProducerMessageHandlershould wait for send operation results.voidsetSync(boolean sync) Abooleanindicating if theKafkaProducerMessageHandlershould wait for the send operation results or not.voidsetTimeoutBuffer(int timeoutBuffer) Set a buffer, in milliseconds, added to the configureddelivery.timeout.msto determine the minimum time to wait for the send future completion whensyncis true.voidsetTimestampExpression(Expression timestampExpression) Specify a SpEL expression to evaluate a timestamp that will be added in the Kafka record.voidsetTopicExpression(Expression topicExpression) voidsetUseTemplateConverter(boolean useTemplateConverter) Set to true to use the template's message converter to create theProducerRecordinstead of theproducerRecordCreator.voidstart()voidstop()Methods inherited from class org.springframework.integration.handler.AbstractReplyProducingMessageHandlerdoInvokeAdvisedRequestHandler, getBeanClassLoader, getIntegrationPatternType, getRequiresReply, handleMessageInternal, hasAdviceChain, onInit, setAdviceChain, setBeanClassLoader, setRequiresReplyMethods inherited from class org.springframework.integration.handler.AbstractMessageProducingHandleraddNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, shouldCopyRequestHeaders, shouldSplitOutput, updateNotPropagatedHeadersMethods inherited from class org.springframework.integration.handler.AbstractMessageHandlerhandleMessage, onComplete, onError, onNext, onSubscribeMethods inherited from class org.springframework.integration.handler.MessageHandlerSupportbuildSendTimer, destroy, getManagedName, getManagedType, getMetricsCaptor, getOrder, getOverrides, isLoggingEnabled, registerMetricsCaptor, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrackMethods inherited from class org.springframework.integration.context.IntegrationObjectSupportafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringMethods inherited from class java.lang.Objectclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface reactor.core.CoreSubscribercurrentContextMethods inherited from interface org.springframework.integration.support.management.IntegrationManagementgetThisAsMethods inherited from interface org.springframework.integration.support.context.NamedComponentgetBeanName, getComponentName
- 
Constructor Details- 
KafkaProducerMessageHandler
 
- 
- 
Method Details- 
setTopicExpression
- 
setMessageKeyExpression
- 
setPartitionIdExpression
- 
setTimestampExpressionSpecify a SpEL expression to evaluate a timestamp that will be added in the Kafka record. The resulting value should be aLongtype representing epoch time in milliseconds.- Parameters:
- timestampExpression- the- Expressionfor timestamp to wait for result fo send operation.
 
- 
setFlushExpressionSpecify a SpEL expression that evaluates to aBooleanto determine whether the producer should be flushed after the send. Defaults to looking for aBooleanvalue in aKafkaIntegrationHeaders.FLUSHheader; false if absent.- Parameters:
- flushExpression- the- Expression.
 
- 
setHeaderMapperpublic void setHeaderMapper(org.springframework.kafka.support.KafkaHeaderMapper headerMapper) Set the header mapper to use.- Parameters:
- headerMapper- the mapper; can be null to disable header mapping.
 
- 
getHeaderMapperpublic org.springframework.kafka.support.KafkaHeaderMapper getHeaderMapper()
- 
getKafkaTemplatepublic org.springframework.kafka.core.KafkaTemplate<?,?> getKafkaTemplate()
- 
setSyncpublic void setSync(boolean sync) Abooleanindicating if theKafkaProducerMessageHandlershould wait for the send operation results or not. Defaults tofalse. Insyncmode a downstream send operation exception will be re-thrown.- Parameters:
- sync- the send mode; async by default.
 
- 
setSendTimeoutpublic final void setSendTimeout(long sendTimeout) Specify a timeout in milliseconds for how long thisKafkaProducerMessageHandlershould wait for send operation results. Defaults to the kafkadelivery.timeout.msproperty + 5 seconds. The timeout is applied Also applies when sending to the success or failure channels.- Overrides:
- setSendTimeoutin class- AbstractMessageProducingHandler
- Parameters:
- sendTimeout- the timeout to wait for result for a send operation.
 
- 
setSendTimeoutExpressionSpecify a SpEL expression to evaluate a timeout in milliseconds for how long thisKafkaProducerMessageHandlershould wait for send operation results. Defaults to the kafkadelivery.timeout.msproperty + 5 seconds. The timeout is applied only insyncmode. If this expression yields a result that is less than that value, the higher value is used.- Parameters:
- sendTimeoutExpression- the- Expressionfor timeout to wait for result for a send operation.
- See Also:
 
- 
setSendFailureChannelSet the failure channel. After a send failure, anErrorMessagewill be sent to this channel with a payload of aKafkaSendFailureExceptionwith the failed message and cause.- Parameters:
- sendFailureChannel- the failure channel.
 
- 
setSendFailureChannelNameSet the failure channel name. After a send failure, anErrorMessagewill be sent to this channel name with a payload of aKafkaSendFailureExceptionwith the failed message and cause.- Parameters:
- sendFailureChannelName- the failure channel name.
 
- 
setSendSuccessChannelSet the success channel.- Parameters:
- sendSuccessChannel- the Success channel.
 
- 
setSendSuccessChannelNameSet the Success channel name.- Parameters:
- sendSuccessChannelName- the Success channel name.
 
- 
setFuturesChannelSet the futures channel.- Parameters:
- futuresChannel- the futures channel.
 
- 
setFuturesChannelNameSet the futures channel name.- Parameters:
- futuresChannelName- the futures channel name.
 
- 
setErrorMessageStrategySet the error message strategy implementation to use when sending error messages after send failures. Cannot be null.- Parameters:
- errorMessageStrategy- the implementation.
 
- 
setReplyMessageConverterpublic void setReplyMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) Set a message converter for gateway replies.- Parameters:
- messageConverter- the converter.
- See Also:
 
- 
setReplyPayloadTypeWhen using a type-aware message converter (such asStringJsonMessageConverter, set the payload type the converter should create. Defaults toObject.- Parameters:
- payloadType- the type.
- See Also:
 
- 
setProducerRecordCreatorpublic void setProducerRecordCreator(KafkaProducerMessageHandler.ProducerRecordCreator<K, V> producerRecordCreator) Set aKafkaProducerMessageHandler.ProducerRecordCreatorto create theProducerRecord. Ignored ifuseTemplateConverteris true.- Parameters:
- producerRecordCreator- the creator.
- See Also:
 
- 
setTimeoutBufferpublic void setTimeoutBuffer(int timeoutBuffer) Set a buffer, in milliseconds, added to the configureddelivery.timeout.msto determine the minimum time to wait for the send future completion whensyncis true.- Parameters:
- timeoutBuffer- the buffer.
- See Also:
 
- 
setUseTemplateConverterpublic void setUseTemplateConverter(boolean useTemplateConverter) Set to true to use the template's message converter to create theProducerRecordinstead of theproducerRecordCreator.- Parameters:
- useTemplateConverter- true to use the converter.
- Since:
- 5.5.5
- See Also:
 
- 
getComponentTypeDescription copied from class:IntegrationObjectSupportSubclasses may implement this method to provide component type information.- Specified by:
- getComponentTypein interface- NamedComponent
- Overrides:
- getComponentTypein class- MessageHandlerSupport
 
- 
getSendFailureChannel
- 
getSendSuccessChannel
- 
getFuturesChannel
- 
doInitprotected void doInit()- Overrides:
- doInitin class- AbstractReplyProducingMessageHandler
 
- 
startpublic void start()- Specified by:
- startin interface- Lifecycle
- Specified by:
- startin interface- ManageableLifecycle
 
- 
stoppublic void stop()- Specified by:
- stopin interface- Lifecycle
- Specified by:
- stopin interface- ManageableLifecycle
 
- 
isRunningpublic boolean isRunning()- Specified by:
- isRunningin interface- Lifecycle
- Specified by:
- isRunningin interface- ManageableLifecycle
 
- 
handleRequestMessageDescription copied from class:AbstractReplyProducingMessageHandlerSubclasses must implement this method to handle the request Message. The return value may be a Message, a MessageBuilder, or any plain Object. The base class will handle the final creation of a reply Message from any of those starting points. If the return value is null, the Message flow will end here.- Specified by:
- handleRequestMessagein class- AbstractReplyProducingMessageHandler
- Parameters:
- message- The request message.
- Returns:
- The result of handling the message, or null.
 
- 
processSendResultpublic void processSendResult(Message<?> message, org.apache.kafka.clients.producer.ProducerRecord<K, V> producerRecord, ListenableFuture<org.springframework.kafka.support.SendResult<K, throws InterruptedException, ExecutionExceptionV>> future, MessageChannel metadataChannel) 
 
-