Class ZeroMqChannel
- All Implemented Interfaces:
- Aware,- BeanFactoryAware,- BeanNameAware,- DisposableBean,- InitializingBean,- ApplicationContextAware,- ExpressionCapable,- IntegrationPattern,- NamedComponent,- IntegrationManagement,- TrackableComponent,- MessageChannel,- SubscribableChannel,- InterceptableChannel
public class ZeroMqChannel extends AbstractMessageChannel implements SubscribableChannel
SubscribableChannel implementation over ZeroMQ sockets.
 It can work in two messaging models:
 - push-pull, where sent messages are distributed to subscribers in a round-robin manner
 according a respective ZeroMQ SocketType.PUSH and SocketType.PULL socket types logic;
 - pub-sub, where sent messages are distributed to all subscribers.
 
 This message channel can work in local mode, when a pair of ZeroMQ sockets of SocketType.PAIR type
 are connected between publisher (send operation) and subscriber using inter-thread transport binding.
 
 In distributed mode this channel has to be connected to an externally managed ZeroMQ proxy.
 The setConnectUrl(String) has to be as a standard ZeroMQ connect string, but with an extra port
 over the colon - representing a frontend and backend sockets pair on ZeroMQ proxy.
 For example: tcp://localhost:6001:6002.
 Another option is to provide a reference to the ZeroMqProxy instance managed in the same application:
 frontend and backend ports are evaluated from this proxy and the respective connection string is built from them.
 
This way sending and receiving operations on this channel are similar to interaction over a messaging broker.
 An internal logic of this message channel implementation is based on the project Reactor using its
 Mono, Flux and Scheduler API for better thread model and flow control to avoid
 concurrency primitives for multi-publisher(subscriber) communication within the same application.
- Since:
- 5.4
- Author:
- Artem Bilan
- 
Nested Class SummaryNested classes/interfaces inherited from class org.springframework.integration.channel.AbstractMessageChannelAbstractMessageChannel.ChannelInterceptorListNested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagementIntegrationManagement.ManagementOverrides
- 
Field SummaryFields Modifier and Type Field Description static DurationDEFAULT_CONSUME_DELAYFields inherited from class org.springframework.integration.channel.AbstractMessageChannelinterceptors, metersFields inherited from class org.springframework.integration.context.IntegrationObjectSupportEXPRESSION_PARSER, loggerFields inherited from interface org.springframework.integration.support.management.IntegrationManagementMETER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
- 
Constructor SummaryConstructors Constructor Description ZeroMqChannel(org.zeromq.ZContext context)Create a channel instance based on the providedZContextwith push/pull communication model.ZeroMqChannel(org.zeromq.ZContext context, boolean pubSub)Create a channel instance based on the providedZContextand provided communication model.
- 
Method SummaryModifier and Type Method Description voiddestroy()protected booleandoSend(Message<?> message, long timeout)Subclasses must implement this method.protected voidonInit()Subclasses may implement this for initialization logic.voidsetConnectUrl(String connectUrl)Configure a connection to the ZeroMQ proxy with the pair of ports over colon for proxy frontend and backend sockets.voidsetConsumeDelay(Duration consumeDelay)Specify aDurationto delay consumption when no data received.voidsetMessageMapper(BytesMessageMapper messageMapper)Provide aBytesMessageMapperto convert to/from messages when send or receive happens on the sockets.voidsetSendSocketConfigurer(Consumer<org.zeromq.ZMQ.Socket> sendSocketConfigurer)TheConsumercallback to configure a publishing socket.voidsetSubscribeSocketConfigurer(Consumer<org.zeromq.ZMQ.Socket> subscribeSocketConfigurer)TheConsumercallback to configure a consuming socket.voidsetZeroMqProxy(ZeroMqProxy zeroMqProxy)Specify a reference to aZeroMqProxyinstance in the same application to rely on its ports configuration and make a natural lifecycle dependency without guessing when the proxy is started.booleansubscribe(MessageHandler handler)booleanunsubscribe(MessageHandler handler)Methods inherited from class org.springframework.integration.channel.AbstractMessageChanneladdInterceptor, addInterceptor, getComponentType, getFullChannelName, getIChannelInterceptorList, getIntegrationPatternType, getInterceptors, getMetricsCaptor, getOverrides, isLoggingEnabled, registerMetricsCaptor, removeInterceptor, removeInterceptor, send, send, setDatatypes, setInterceptors, setLoggingEnabled, setMessageConverter, setShouldTrackMethods inherited from class org.springframework.integration.context.IntegrationObjectSupportafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringMethods inherited from class java.lang.Objectclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface org.springframework.integration.support.management.IntegrationManagementgetManagedName, getManagedType, getThisAs, setManagedName, setManagedTypeMethods inherited from interface org.springframework.integration.support.context.NamedComponentgetBeanName, getComponentName
- 
Field Details
- 
Constructor Details- 
ZeroMqChannelpublic ZeroMqChannel(org.zeromq.ZContext context)Create a channel instance based on the providedZContextwith push/pull communication model.- Parameters:
- context- the- ZContextto use.
 
- 
ZeroMqChannelpublic ZeroMqChannel(org.zeromq.ZContext context, boolean pubSub)Create a channel instance based on the providedZContextand provided communication model.- Parameters:
- context- the- ZContextto use.
- pubSub- the communication model: push/pull or pub/sub.
 
 
- 
- 
Method Details- 
setConnectUrlConfigure a connection to the ZeroMQ proxy with the pair of ports over colon for proxy frontend and backend sockets. Mutually exclusive with thesetZeroMqProxy(ZeroMqProxy).- Parameters:
- connectUrl- the connection string in format- PROTOCOL://HOST:FRONTEND_PORT:BACKEND_PORT, e.g.- tcp://localhost:6001:6002
 
- 
setZeroMqProxySpecify a reference to aZeroMqProxyinstance in the same application to rely on its ports configuration and make a natural lifecycle dependency without guessing when the proxy is started. Mutually exclusive with thesetConnectUrl(String).- Parameters:
- zeroMqProxy- the- ZeroMqProxyinstance to use
 
- 
setConsumeDelaySpecify aDurationto delay consumption when no data received.- Parameters:
- consumeDelay- the- Durationto delay consumption when empty; defaults to- DEFAULT_CONSUME_DELAY.
 
- 
setMessageMapperProvide aBytesMessageMapperto convert to/from messages when send or receive happens on the sockets.- Parameters:
- messageMapper- the- BytesMessageMapperto use; defaults to- EmbeddedJsonHeadersMessageMapper.
 
- 
setSendSocketConfigurerTheConsumercallback to configure a publishing socket. The send socket is connected to the frontend socket of ZeroMQ proxy (if any).- Parameters:
- sendSocketConfigurer- the- Consumerto use.
 
- 
setSubscribeSocketConfigurerpublic void setSubscribeSocketConfigurer(Consumer<org.zeromq.ZMQ.Socket> subscribeSocketConfigurer)TheConsumercallback to configure a consuming socket. The subscribe socket is connected to the backend socket of ZeroMQ proxy (if any).- Parameters:
- subscribeSocketConfigurer- the- Consumerto use.
 
- 
onInitprotected void onInit()Description copied from class:IntegrationObjectSupportSubclasses may implement this for initialization logic.- Overrides:
- onInitin class- AbstractMessageChannel
 
- 
doSendDescription copied from class:AbstractMessageChannelSubclasses must implement this method. A non-negative timeout indicates how long to wait if the channel is at capacity (if the value is 0, it must return immediately with or without success). A negative timeout value indicates that the method should block until either the message is accepted or the blocking thread is interrupted.- Specified by:
- doSendin class- AbstractMessageChannel
- Parameters:
- message- The message.
- timeout- The timeout.
- Returns:
- true if the send was successful.
 
- 
subscribe- Specified by:
- subscribein interface- SubscribableChannel
 
- 
unsubscribe- Specified by:
- unsubscribein interface- SubscribableChannel
 
- 
destroypublic void destroy()- Specified by:
- destroyin interface- DisposableBean
- Specified by:
- destroyin interface- IntegrationManagement
- Overrides:
- destroyin class- AbstractMessageChannel
 
 
-