Class IntegrationReactiveUtils
java.lang.Object
org.springframework.integration.util.IntegrationReactiveUtils
Utilities for adapting integration components to/from reactive types.
- Since:
- 5.3
- Author:
- Artem Bilan
- 
Field SummaryFieldsModifier and TypeFieldDescriptionstatic final DurationA default delay before repeating an empty sourceMonoas 1 secondDuration.static final StringThe subscriber context entry forFlux.delayElements(java.time.Duration)from theMono.repeatWhenEmpty(java.util.function.Function).static final booleanThe indicator thatio.micrometer:context-propagationlibrary is on classpath.
- 
Method SummaryModifier and TypeMethodDescriptionstatic reactor.util.context.ContextViewCapture a ReactorContextViewfrom the current thread local state according to theContextSnapshotFactorylogic.static <T> reactor.core.publisher.Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel) Adapt a providedMessageChannelinto aFluxsource: - aFluxMessageChannelis returned as is because it is already aPublisher; - aSubscribableChannelis subscribed with aMessageHandlerfor theSinks.Many.tryEmitNext(Object)which is returned from this method; - aPollableChannelis wrapped into aMessageSourcelambda and reusesmessageSourceToFlux(MessageSource).static <T> reactor.core.publisher.Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageSource) Wrap a providedMessageSourceinto aFluxfor pulling the on demand.static @Nullable AutoCloseablesetThreadLocalsFromReactorContext(reactor.util.context.ContextView context) Populate thread local variables from the provided ReactorContextViewaccording to theContextSnapshotFactorylogic.
- 
Field Details- 
DELAY_WHEN_EMPTY_KEYThe subscriber context entry forFlux.delayElements(java.time.Duration)from theMono.repeatWhenEmpty(java.util.function.Function).- See Also:
 
- 
DEFAULT_DELAY_WHEN_EMPTY
- 
isContextPropagationPresentpublic static final boolean isContextPropagationPresentThe indicator thatio.micrometer:context-propagationlibrary is on classpath.- Since:
- 6.2.5
 
 
- 
- 
Method Details- 
captureReactorContextpublic static reactor.util.context.ContextView captureReactorContext()Capture a ReactorContextViewfrom the current thread local state according to theContextSnapshotFactorylogic. If noio.micrometer:context-propagationlibrary is on classpath, theContext.empty()is returned.- Returns:
- the Reactor ContextViewfrom the current thread local state orContext.empty().
- Since:
- 6.2.5
 
- 
setThreadLocalsFromReactorContextpublic static @Nullable AutoCloseable setThreadLocalsFromReactorContext(reactor.util.context.ContextView context) Populate thread local variables from the provided ReactorContextViewaccording to theContextSnapshotFactorylogic.- Parameters:
- context- the Reactor- ContextViewto populate from.
- Returns:
- the ContextSnapshot.Scopeas aAutoCloseableto not pollute the target classpath. Can be cast if necessary. Or null if there is noio.micrometer:context-propagationlibrary is on classpath.
- Since:
- 6.2.5
 
- 
messageSourceToFluxpublic static <T> reactor.core.publisher.Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageSource) Wrap a providedMessageSourceinto aFluxfor pulling the on demand. WhenMessageSource.receive()returnsnull, the sourceMonogoes to theMono.repeatWhenEmpty(java.util.function.Function<reactor.core.publisher.Flux<java.lang.Long>, ? extends org.reactivestreams.Publisher<?>>)state and performs adelaybased on theDELAY_WHEN_EMPTY_KEYDurationentry in the subscriber context or falls back to 1-second duration. If a produced message has anIntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACKheader it is ack'ed in theMono.doOnSuccess(java.util.function.Consumer<? super T>)and nack'ed in theMono.doOnError(java.util.function.Consumer<? super java.lang.Throwable>).- Type Parameters:
- T- the expected payload type.
- Parameters:
- messageSource- the- MessageSourceto adapt.
- Returns:
- a Fluxwhich pulls messages from theMessageSourceon demand.
 
- 
messageChannelToFluxpublic static <T> reactor.core.publisher.Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel) Adapt a providedMessageChannelinto aFluxsource: - aFluxMessageChannelis returned as is because it is already aPublisher; - aSubscribableChannelis subscribed with aMessageHandlerfor theSinks.Many.tryEmitNext(Object)which is returned from this method; - aPollableChannelis wrapped into aMessageSourcelambda and reusesmessageSourceToFlux(MessageSource).- Type Parameters:
- T- the expected payload type.
- Parameters:
- messageChannel- the- MessageChannelto adapt.
- Returns:
- a Fluxwhich uses a providedMessageChannelas a source for events to publish.
 
 
-