Class ReactiveRedisStreamMessageProducer
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.endpoint.MessageProducerSupport
org.springframework.integration.redis.inbound.ReactiveRedisStreamMessageProducer
- All Implemented Interfaces:
- Aware,- BeanFactoryAware,- BeanNameAware,- DisposableBean,- InitializingBean,- SmartInitializingSingleton,- ApplicationContextAware,- Lifecycle,- Phased,- SmartLifecycle,- ExpressionCapable,- MessageProducer,- IntegrationPattern,- NamedComponent,- IntegrationInboundManagement,- IntegrationManagement,- ManageableLifecycle,- ManageableSmartLifecycle,- TrackableComponent
A 
MessageProducerSupport for reading messages from a Redis Stream and publishing them into the provided
 output channel.
 By default this adapter reads message as a standalone client XREAD (Redis command) but can be switched to a
 Consumer Group feature XREADGROUP by setting consumerName field.
 By default the Consumer Group name is the id of this bean IntegrationObjectSupport.getBeanName().- Since:
- 5.4
- Author:
- Attoumane Ahamadi, Artem Bilan, Rohan Mukesh
- 
Nested Class SummaryNested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagementIntegrationManagement.ManagementOverrides
- 
Field SummaryFields inherited from class org.springframework.integration.endpoint.AbstractEndpointlifecycleCondition, lifecycleLockFields inherited from class org.springframework.integration.context.IntegrationObjectSupportEXPRESSION_PARSER, loggerFields inherited from interface org.springframework.integration.support.management.IntegrationManagementMETER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEFields inherited from interface org.springframework.context.SmartLifecycleDEFAULT_PHASE
- 
Constructor SummaryConstructorsConstructorDescriptionReactiveRedisStreamMessageProducer(ReactiveRedisConnectionFactory reactiveConnectionFactory, String streamKey) 
- 
Method SummaryModifier and TypeMethodDescriptionprotected voiddoStart()Take no action by default.Subclasses may implement this method to provide component type information.protected voidonInit()Subclasses may implement this for initialization logic.voidsetAutoAck(boolean autoAck) Set whether or not acknowledge message read in the Consumer Group.voidsetBatchSize(int recordsPerPoll) Configure a batch size for the COUNT option during reading.voidsetConsumerGroup(String consumerGroup) Set the name of the Consumer Group.voidsetConsumerName(String consumerName) Set the name of the consumer.voidsetCreateConsumerGroup(boolean createConsumerGroup) Create the Consumer Group if and only if it does not exist.voidsetExtractPayload(boolean extractPayload) Configure this channel adapter to extract or not value from theRecord.voidsetObjectMapper(HashMapper<?, ?, ?> hashMapper) Configure a hash mapper.voidsetOnErrorResume(Function<? super Throwable, ? extends org.reactivestreams.Publisher<Void>> resumeFunction) Configure a resume Function to resume the main sequence when polling the stream fails.voidsetPollTimeout(Duration pollTimeout) Configure a poll timeout for the BLOCK option during reading.voidsetReadOffset(ReadOffset readOffset) Define the offset from which we want to read message.voidConfigure a key, hash key and hash value serializer.voidsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions<String, ?> streamReceiverOptions) SetReactiveStreamOperationsused to customize theStreamReceiver.voidsetTargetType(Class<?> targetType) Configure a hash target type.Methods inherited from class org.springframework.integration.endpoint.MessageProducerSupportafterSingletonsInstantiated, buildErrorMessage, doStop, getErrorChannel, getErrorMessageAttributes, getErrorMessageStrategy, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, isObserved, registerObservationRegistry, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setObservationConvention, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisherMethods inherited from class org.springframework.integration.endpoint.AbstractEndpointdestroy, doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stopMethods inherited from class org.springframework.integration.context.IntegrationObjectSupportafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringMethods inherited from class java.lang.Objectclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface org.springframework.integration.support.management.IntegrationManagementdestroy, getManagedName, getManagedType, getOverrides, getThisAs, isLoggingEnabled, registerMetricsCaptor, setLoggingEnabled, setManagedName, setManagedTypeMethods inherited from interface org.springframework.integration.support.context.NamedComponentgetBeanName, getComponentName
- 
Constructor Details- 
ReactiveRedisStreamMessageProducerpublic ReactiveRedisStreamMessageProducer(ReactiveRedisConnectionFactory reactiveConnectionFactory, String streamKey) 
 
- 
- 
Method Details- 
setReadOffsetDefine the offset from which we want to read message. By default theReadOffset.latest()is used.ReadOffset.latest()is equal to '$', which is the Id used withXREADto get new data added to the stream. Note that when switching to the Consumer Group feature, we set it toReadOffset.lastConsumed()if it is still equal toReadOffset.latest().- Parameters:
- readOffset- the desired offset
 
- 
setExtractPayloadpublic void setExtractPayload(boolean extractPayload) Configure this channel adapter to extract or not value from theRecord.- Parameters:
- extractPayload- default true
 
- 
setAutoAckpublic void setAutoAck(boolean autoAck) Set whether or not acknowledge message read in the Consumer Group.trueby default.- Parameters:
- autoAck- the acknowledge option.
 
- 
setConsumerGroupSet the name of the Consumer Group. It is possible to create that Consumer Group if desired, see:createConsumerGroup. If not set, the defined bean nameIntegrationObjectSupport.getBeanName()is used.- Parameters:
- consumerGroup- the Consumer Group on which this adapter should register to listen messages.
 
- 
setConsumerName
- 
setCreateConsumerGrouppublic void setCreateConsumerGroup(boolean createConsumerGroup) Create the Consumer Group if and only if it does not exist. During the creation we also create the stream, seeMKSTREAM.- Parameters:
- createConsumerGroup- specify if we should create the Consumer Group,- falseby default
 
- 
setStreamReceiverOptionspublic void setStreamReceiverOptions(@Nullable StreamReceiver.StreamReceiverOptions<String, ?> streamReceiverOptions) SetReactiveStreamOperationsused to customize theStreamReceiver. It provides a way to set the polling timeout and the serialization context. By default the polling timeout is set to infinite andStringRedisSerializeris used. Mutually exclusive with 'pollTimeout', 'batchSize', 'onErrorResume', 'serializer', 'targetType', 'objectMapper'.- Parameters:
- streamReceiverOptions- the desired receiver options
 
- 
setPollTimeoutConfigure a poll timeout for the BLOCK option during reading. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions).- Parameters:
- pollTimeout- the timeout for polling.
- Since:
- 5.5
- See Also:
 
- 
setBatchSizepublic void setBatchSize(int recordsPerPoll) Configure a batch size for the COUNT option during reading. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions).- Parameters:
- recordsPerPoll- must be greater zero.
- Since:
- 5.5
- See Also:
 
- 
setOnErrorResumepublic void setOnErrorResume(Function<? super Throwable, ? extends org.reactivestreams.Publisher<Void>> resumeFunction) Configure a resume Function to resume the main sequence when polling the stream fails. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions). By default this function extract the failedRecordand sends anErrorMessageto the providedMessageProducerSupport.setErrorChannel(org.springframework.messaging.MessageChannel). The failed message for this record may have aIntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACKheader when manual acknowledgment is configured for this message producer.- Parameters:
- resumeFunction- must not be null.
- Since:
- 5.5
- See Also:
 
- 
setSerializerConfigure a key, hash key and hash value serializer. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions).- Parameters:
- pair- must not be null.
- Since:
- 5.5
- See Also:
 
- 
setTargetTypeConfigure a hash target type. Changes the emitted Record type to ObjectRecord. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions).- Parameters:
- targetType- must not be null.
- Since:
- 5.5
- See Also:
 
- 
setObjectMapperConfigure a hash mapper. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions).- Parameters:
- hashMapper- must not be null.
- Since:
- 5.5
- See Also:
 
- 
getComponentTypeDescription copied from class:IntegrationObjectSupportSubclasses may implement this method to provide component type information.- Specified by:
- getComponentTypein interface- NamedComponent
- Overrides:
- getComponentTypein class- IntegrationObjectSupport
 
- 
onInitprotected void onInit()Description copied from class:IntegrationObjectSupportSubclasses may implement this for initialization logic.- Overrides:
- onInitin class- MessageProducerSupport
 
- 
doStartprotected void doStart()Description copied from class:MessageProducerSupportTake no action by default. Subclasses may override this if they need lifecycle-managed behavior. Protected by 'lifecycleLock'.- Overrides:
- doStartin class- MessageProducerSupport
 
 
-