Class ReactiveRedisStreamMessageProducer
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.endpoint.MessageProducerSupport
org.springframework.integration.redis.inbound.ReactiveRedisStreamMessageProducer
- All Implemented Interfaces:
Aware,BeanFactoryAware,BeanNameAware,DisposableBean,InitializingBean,SmartInitializingSingleton,ApplicationContextAware,Lifecycle,Phased,SmartLifecycle,ExpressionCapable,MessageProducer,IntegrationPattern,NamedComponent,IntegrationInboundManagement,IntegrationManagement,ManageableLifecycle,ManageableSmartLifecycle,TrackableComponent
A
MessageProducerSupport for reading messages from a Redis Stream and publishing them into the provided
output channel.
By default this adapter reads message as a standalone client XREAD (Redis command) but can be switched to a
Consumer Group feature XREADGROUP by setting consumerName field.
By default the Consumer Group name is the id of this bean IntegrationObjectSupport.getBeanName().- Since:
- 5.4
- Author:
- Attoumane Ahamadi, Artem Bilan, Rohan Mukesh
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
IntegrationManagement.ManagementOverrides -
Field Summary
Fields inherited from class org.springframework.integration.endpoint.AbstractEndpoint
lifecycleCondition, lifecycleLockFields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, loggerFields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEFields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE -
Constructor Summary
ConstructorsConstructorDescriptionReactiveRedisStreamMessageProducer(ReactiveRedisConnectionFactory reactiveConnectionFactory, String streamKey) -
Method Summary
Modifier and TypeMethodDescriptionprotected voiddoStart()Take no action by default.Subclasses may implement this method to provide component type information.protected voidonInit()Subclasses may implement this for initialization logic.voidsetAutoAck(boolean autoAck) Set whether or not acknowledge message read in the Consumer Group.voidsetBatchSize(int recordsPerPoll) Configure a batch size for the COUNT option during reading.voidsetConsumerGroup(String consumerGroup) Set the name of the Consumer Group.voidsetConsumerName(String consumerName) Set the name of the consumer.voidsetCreateConsumerGroup(boolean createConsumerGroup) Create the Consumer Group if and only if it does not exist.voidsetExtractPayload(boolean extractPayload) Configure this channel adapter to extract or not value from theRecord.voidsetObjectMapper(HashMapper<?, ?, ?> hashMapper) Configure a hash mapper.voidsetOnErrorResume(Function<? super Throwable, ? extends org.reactivestreams.Publisher<Void>> resumeFunction) Configure a resume Function to resume the main sequence when polling the stream fails.voidsetPollTimeout(Duration pollTimeout) Configure a poll timeout for the BLOCK option during reading.voidsetReadOffset(ReadOffset readOffset) Define the offset from which we want to read message.voidConfigure a key, hash key and hash value serializer.voidsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions<String, ?> streamReceiverOptions) SetReactiveStreamOperationsused to customize theStreamReceiver.voidsetTargetType(Class<?> targetType) Configure a hash target type.Methods inherited from class org.springframework.integration.endpoint.MessageProducerSupport
afterSingletonsInstantiated, buildErrorMessage, doStop, getErrorChannel, getErrorMessageAttributes, getErrorMessageStrategy, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, isObserved, registerObservationRegistry, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setObservationConvention, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisherMethods inherited from class org.springframework.integration.endpoint.AbstractEndpoint
destroy, doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stopMethods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface org.springframework.integration.support.management.IntegrationManagement
destroy, getManagedName, getManagedType, getOverrides, getThisAs, isLoggingEnabled, registerMetricsCaptor, setLoggingEnabled, setManagedName, setManagedTypeMethods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName
-
Constructor Details
-
ReactiveRedisStreamMessageProducer
public ReactiveRedisStreamMessageProducer(ReactiveRedisConnectionFactory reactiveConnectionFactory, String streamKey)
-
-
Method Details
-
setReadOffset
Define the offset from which we want to read message. By default theReadOffset.latest()is used.ReadOffset.latest()is equal to '$', which is the Id used withXREADto get new data added to the stream. Note that when switching to the Consumer Group feature, we set it toReadOffset.lastConsumed()if it is still equal toReadOffset.latest().- Parameters:
readOffset- the desired offset
-
setExtractPayload
public void setExtractPayload(boolean extractPayload) Configure this channel adapter to extract or not value from theRecord.- Parameters:
extractPayload- default true
-
setAutoAck
public void setAutoAck(boolean autoAck) Set whether or not acknowledge message read in the Consumer Group.trueby default.- Parameters:
autoAck- the acknowledge option.
-
setConsumerGroup
Set the name of the Consumer Group. It is possible to create that Consumer Group if desired, see:createConsumerGroup. If not set, the defined bean nameIntegrationObjectSupport.getBeanName()is used.- Parameters:
consumerGroup- the Consumer Group on which this adapter should register to listen messages.
-
setConsumerName
Set the name of the consumer. When a consumer name is provided, this adapter is switched to the Consumer Group feature. Note that this value should be unique in the group.- Parameters:
consumerName- the consumer name in the Consumer Group
-
setCreateConsumerGroup
public void setCreateConsumerGroup(boolean createConsumerGroup) Create the Consumer Group if and only if it does not exist. During the creation we also create the stream, seeMKSTREAM.- Parameters:
createConsumerGroup- specify if we should create the Consumer Group,falseby default
-
setStreamReceiverOptions
public void setStreamReceiverOptions(@Nullable StreamReceiver.StreamReceiverOptions<String, ?> streamReceiverOptions) SetReactiveStreamOperationsused to customize theStreamReceiver. It provides a way to set the polling timeout and the serialization context. By default the polling timeout is set to infinite andStringRedisSerializeris used. Mutually exclusive with 'pollTimeout', 'batchSize', 'onErrorResume', 'serializer', 'targetType', 'objectMapper'.- Parameters:
streamReceiverOptions- the desired receiver options
-
setPollTimeout
Configure a poll timeout for the BLOCK option during reading. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions).- Parameters:
pollTimeout- the timeout for polling.- Since:
- 5.5
- See Also:
-
setBatchSize
public void setBatchSize(int recordsPerPoll) Configure a batch size for the COUNT option during reading. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions).- Parameters:
recordsPerPoll- must be greater zero.- Since:
- 5.5
- See Also:
-
setOnErrorResume
public void setOnErrorResume(Function<? super Throwable, ? extends org.reactivestreams.Publisher<Void>> resumeFunction) Configure a resume Function to resume the main sequence when polling the stream fails. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions). By default this function extract the failedRecordand sends anErrorMessageto the providedMessageProducerSupport.setErrorChannel(org.springframework.messaging.MessageChannel). The failed message for this record may have aIntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACKheader when manual acknowledgment is configured for this message producer.- Parameters:
resumeFunction- must not be null.- Since:
- 5.5
- See Also:
-
setSerializer
Configure a key, hash key and hash value serializer. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions).- Parameters:
pair- must not be null.- Since:
- 5.5
- See Also:
-
setTargetType
Configure a hash target type. Changes the emitted Record type to ObjectRecord. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions).- Parameters:
targetType- must not be null.- Since:
- 5.5
- See Also:
-
setObjectMapper
Configure a hash mapper. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions).- Parameters:
hashMapper- must not be null.- Since:
- 5.5
- See Also:
-
getComponentType
Description copied from class:IntegrationObjectSupportSubclasses may implement this method to provide component type information.- Specified by:
getComponentTypein interfaceNamedComponent- Overrides:
getComponentTypein classIntegrationObjectSupport
-
onInit
protected void onInit()Description copied from class:IntegrationObjectSupportSubclasses may implement this for initialization logic.- Overrides:
onInitin classMessageProducerSupport
-
doStart
protected void doStart()Description copied from class:MessageProducerSupportTake no action by default. Subclasses may override this if they need lifecycle-managed behavior. Protected by 'lifecycleLock'.- Overrides:
doStartin classMessageProducerSupport
-