|
This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 7.0.2! |
gRPC Support
Starting with version 7.1, Spring Integration provides inbound and outbound gateways to communicate via gRPC protocol.
This dependency is required for the project:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-grpc</artifactId>
<version>7.1.0-M1</version>
</dependency>
implementation "org.springframework.integration:spring-integration-grpc:7.1.0-M1"
Spring Integration components for gRPC are not generated from Protocol buffers, and they are not type-safe as typical gRPC service and stub implementations.
This is mostly due to the generic nature of the Spring Integration framework itself, where the unit of work is a Message abstraction and the payload type of this message is usually out of integration component internal logic scope.
Therefore, gRPC messages for service calls are sent and received as is without conversion assumptions.
For example, if gRPC service methods are like this:
service TestHelloWorld {
// Sends a greeting
rpc SayHello(HelloRequest) returns (HelloReply) {}
// Sends a greeting and something else
rpc StreamSayHello(HelloRequest) returns (stream HelloReply) {}
// Sends a greeting to everyone present
rpc HelloToEveryOne(stream HelloRequest) returns (HelloReply) {}
// Streams requests and replies
rpc BidiStreamHello(stream HelloRequest) returns (stream HelloReply) {}
}
The HelloRequest will be a request message payload on the inbound gateway (server) side, and has to be on the outbound gateway (client) side request.
Therefore, the HelloReply has to be a reply message payload on the inbound gateway, and will be received on the outbound gateway.
The GrpcHeaders class contains convenient constants for header names used (and populated) in messages before and after gRPC gateways.
For example, the GrpcHeaders.METHOD_TYPE header contains a io.grpc.MethodDescriptor.MethodType enum value on the server side (inbound gateway) for easier downstream routing.
Another useful header is a GrpcHeaders.SERVICE_METHOD which indicates what gRPC service method was called on the server, or what gRPC service method to call from the client stub.
The GrpcHeaders.SERVICE_METHOD header on the inbound gateway has a value of the gRPC service method name exactly as it is declared in the Protobuf (see .proto example above) and how it is stored into the io.grpc.MethodDescriptor of the service definition.
|
Inbound Gateway for gRPC
The GrpcInboundGateway is a MessagingGatewaySupport implementation to receive gRPC requests, send messages to the downstream flow, and produce gRPC responses.
For initialization, the instance of this gateway requires only an abstract gRPC service class implementing BindableService, usually generated from Protobuf and comes with a *ImplBase class name.
Only standard gRPC services are supported: a generated AsyncService contract is what GrpcInboundGateway logic is based on.
The Reactor and Kotlin-based service generation don’t make sense in Spring Integration logic since those types are not exposed from the gateway definition.
|
The gateway uses the mentioned AsyncService interface to create a proxy and intercept gRPC service methods.
The following example demonstrates how to configure a GrpcInboundGateway:
@Bean
GrpcInboundGateway helloWorldService() {
return new GrpcInboundGateway(TestHelloWorldGrpc.TestHelloWorldImplBase.class);
}
The GrpcInboundGateway implements a BindableService and exposes a ServerServiceDefinition based on the mentioned proxy for an AsyncService contract of the gRPC service.
Therefore, an instance of this gateway has to be registered into a ServerBuilder and there is no need for any other *ImplBase implementations in the application.
With Spring gRPC and its auto-discovery for BindableService implementations, the GrpcInboundGateway has to be declared as a top-level bean.
Therefore, Java DSL API like IntegrationFlow.from(new GrpcInboundGateway(TestHelloWorldGrpc.TestHelloWorldImplBase.class)) is not recommended because such a BindableService implementation won’t make it visible for respective Spring gRPC infrastructure.
|
The GrpcInboundGateway uses a sendAndReceiveMessageReactive() API to interact with the downstream flow and adapts a Mono reply to the gRPC StreamObserver.
As mentioned before, the request message payload is exactly a gRPC request message, and it expects a reply in the form of a gRPC response message.
The downstream logic can be type-safe and deal with gRPC messages in a similar way as if *ImplBase would be implemented manually.
The MethodDescriptor.MethodType.UNARY and MethodDescriptor.MethodType.BIDI_STREAMING are the same from the downstream handling logic perspective.
In other words, the BIDI_STREAMING is handled as a loop on request items and the gateway produces a response item immediately into the response StreamObserver.
For different BIDI_STREAMING logic, the regular gRPC service implementation is recommended.
The MethodDescriptor.MethodType.CLIENT_STREAMING mode produces a message with a Flux as a payload of gRPC request items.
For the MethodDescriptor.MethodType.SERVER_STREAMING mode, a reply payload can be a single gRPC response message or a Flux of them.
The following example demonstrates an IntegrationFlow implementation for the mentioned TestHelloWorldGrpc.TestHelloWorldImplBase service:
@Bean
IntegrationFlow grpcIntegrationFlow(GrpcInboundGateway helloWorldService) {
return IntegrationFlow.from(helloWorldService)
.route(Message.class, message ->
message.getHeaders().get(GrpcHeaders.SERVICE_METHOD, String.class),
router -> router
.subFlowMapping("SayHello", flow -> flow
.transform(this::requestReply))
.subFlowMapping("StreamSayHello", flow -> flow
.transform(this::streamReply))
.subFlowMapping("HelloToEveryOne", flow -> flow
.transformWith(transformSpec -> transformSpec
.transformer(this::streamRequest)
.async(true)))
.subFlowMapping("BidiStreamHello", flow -> flow
.transform(this::requestReply))
)
.get();
}
private HelloReply requestReply(HelloRequest helloRequest) {
return newHelloReply("Hello " + helloRequest.getName());
}
private Flux<HelloReply> streamReply(HelloRequest helloRequest) {
return Flux.just(
newHelloReply("Hello " + helloRequest.getName()),
newHelloReply("Hello again!"));
}
private Mono<HelloReply> streamRequest(Flux<HelloRequest> request) {
return request
.map(HelloRequest::getName)
.collectList()
.map(names -> StringUtils.collectionToDelimitedString(names, ", "))
.map("Hello "::concat)
.map(TestConfig::newHelloReply);
}
private static HelloReply newHelloReply(String message) {
return HelloReply.newBuilder().setMessage(message).build();
}
The routing is done on the GrpcHeaders.SERVICE_METHOD header populated by the GrpcInboundGateway.
All the downstream transformer business methods are type-safe in regard to gRPC messages for the TestHelloWorldGrpc.TestHelloWorldImplBase service.
Outbound Gateway for gRPC
The GrpcOutboundGateway is an AbstractReplyProducingMessageHandler implementation to send gRPC requests to a remote gRPC server and receive responses acting as a gRPC stub.
For initialization, the instance of this gateway requires a gRPC Channel and the gRPC service class (e.g., TestHelloWorldGrpc.class).
The gateway dynamically invokes gRPC methods obtained from the service’s ServiceDescriptor.
It supports the following gRPC communication patterns:
-
Unary: Single request → if
asyncis true thenMonois returned else it is the response object -
Server streaming: Single request →
Fluxof multiple responses -
Client streaming: Multiple requests →
Monowith single response -
Bidirectional streaming: Multiple requests →
Fluxof multiple responses
The GrpcOutboundGateway is asynchronous by default.
Can be turned off by setAsync(false) at the gateway configuration.
See more information in the Asynchronous Service Activator.
|
Method Name Configuration
The method name to invoke can be configured in three ways:
-
Auto-detection for services with a single method:
@Bean public GrpcOutboundGateway grpcOutboundGateway(ManagedChannel channel) { // When TestSingleMethodGrpc has only one method, it will be auto-detected return new GrpcOutboundGateway(channel, TestSingleMethodGrpc.class); } -
Explicit method name using
setMethodName():@Bean public GrpcOutboundGateway grpcOutboundGateway(ManagedChannel channel) { GrpcOutboundGateway gateway = new GrpcOutboundGateway(channel, TestHelloWorldGrpc.class); gateway.setMethodName("SayHello"); return gateway; } -
Dynamic resolution via
setMethodNameExpression():@Bean public GrpcOutboundGateway dynamicMethodGateway(ManagedChannel channel) { GrpcOutboundGateway gateway = new GrpcOutboundGateway(channel, TestHelloWorldGrpc.class); gateway.setMethodNameExpression(new SpelExpressionParser().parseExpression("payload.class.simpleName")); return gateway; } -
Default method resolution if neither a method name nor a method name expression is configured, and the service offers multiple methods, the gateway will look for the
GrpcHeaders.SERVICE_METHODheader in the input message to determine which method to invoke. If theGrpcHeaders.SERVICE_METHODheader is missing, anIllegalStateExceptionis thrown.@Bean public GrpcOutboundGateway dynamicMethodGateway(ManagedChannel channel) { // Looks for GrpcHeaders.SERVICE_METHOD header in the input message return new GrpcOutboundGateway(channel, TestSingleMethodGrpc.class); }
Request Payload Handling
The GrpcOutboundGateway automatically detects the method type from the MethodDescriptor and handles the invocation appropriately:
-
Unary methods accept a single gRPC request message, returning a
Mono<ResponseType>in the async mode (by default). Ifasyncis set tofalse, then the response object is returned into a reply message payload as is. -
Server streaming methods accept a single gRPC request message and return a
Flux<ResponseType>. -
Client streaming and Bidirectional streaming methods accept flexible input types:
-
Flux<RequestType> -
Mono<RequestType> -
Stream<RequestType> -
Collection<RequestType> -
RequestTypes[] -
Single
RequestTypeobject
-
Client streaming methods return a Mono<ResponseType>, while bidirectional streaming methods return a Flux<ResponseType> containing the response.