背景:
接上篇 https://www.jianshu.com/p/c7d390efba29 中使用双向流 解决了grpc传输最大上限问题,避免了因为传输上限导致的错误,但是很显然没有将grpcServer的返回值传回页面,这篇文章我们来解决这个问题。
查看官方demo http://doc.oschina.net/grpc?t=60134页尾,找到
public void routeChat() throws Exception { info("*** RoutChat"); final SettableFuture<Void> finishFuture = SettableFuture.create(); StreamObserver<RouteNote> requestObserver = asyncStub.routeChat(new StreamObserver<RouteNote>() { @Override public void onNext(RouteNote note) { info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation() .getLatitude(), note.getLocation().getLongitude()); } @Override public void onError(Throwable t) { finishFuture.setException(t); } @Override public void onCompleted() { finishFuture.set(null); } }); try { RouteNote[] requests = {newNote("First message", 0, 0), newNote("Second message", 0, 1), newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)}; for (RouteNote request : requests) { info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation() .getLatitude(), request.getLocation().getLongitude()); requestObserver.onNext(request); } requestObserver.onCompleted(); finishFuture.get(); info("Finished RouteChat"); } catch (Exception t) { requestObserver.onError(t); logger.log(Level.WARNING, "RouteChat Failed", t); throw t; } }
当然官方也只是解决了如何判定传输完了,那么我们改造一下
Code
GrpcServer
@GrpcService(HelloServiceGrpc.class)public class HelloService extends HelloServiceGrpc.HelloServiceImplBase { private static final Logger logger = LoggerFactory.getLogger(HelloService.class); @Override public StreamObserver<HelloRequest> sayHello(StreamObserver<HelloResponse> responseObserver) { return new StreamObserver<HelloRequest>() { @Override public void onNext(HelloRequest value) { String name = value.getName().toStringUtf8(); logger.info("received name :" + name); } @Override public void onError(Throwable t) { logger.warn("Encountered error in recordRoute", t); } @Override public void onCompleted() { responseObserver.onNext(HelloResponse.newBuilder().setMessage("welcome to gRPC").build()); //此处为了说明问题,返回两次Response responseObserver.onNext(HelloResponse.newBuilder().setMessage(" second message ").build()); responseObserver.onCompleted(); } }; }}
客户端Controller
@RestControllerpublic class HelloController { @Autowired private HelloService service; @GetMapping("/hello") public String sayHello(String name) { return service.sendMessage(name); }}
客户端GrpcConfig
@Componentpublic class GrpcConfig { @GrpcClient(value = "local-grpc-server") private Channel channel; @Bean("helloServiceStub") public HelloServiceGrpc.HelloServiceStub getHelloServiceStub() { return HelloServiceGrpc.newStub(channel); }}
客户端HelloService
@Service(value = "helloService")public class HelloService{ @Autowired private HelloServiceGrpc.HelloServiceStub helloServiceStub; public String sendMessage(String name) { StringBuffer stringBuffer = new StringBuffer(); final SettableFuture<Void> finalFuture =SettableFuture.create(); StreamObserver<HelloRequest> helloRequestStreamObservers = helloServiceStub.sayHello( new StreamObserver<HelloResponse>() { @Override public void onNext(HelloResponse value) { System.out.println("onNext : " + value.getMessage()); //将返回的内容全部追加到一个字符串中 stringBuffer.append(value.getMessage()); } @Override public void onError(Throwable t) { finalFuture.setException(t); } @Override public void onCompleted() { finalFuture.set(null); } }); for (int i = 0; i < 10; i++) { helloRequestStreamObservers.onNext( HelloRequest.newBuilder() .setName(ByteString.copyFrom(name,Charset.forName("UTF-8"))) .build() ); } helloRequestStreamObservers.onCompleted(); try { finalFuture.get(); while(finalFuture.isDone()){ System.out.println("is done"); return stringBuffer.toString(); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return null; }}
关于com.google.common.util.concurrent.SettableFuture源码,详细看这个帖子:https://blog.csdn.net/lb7758zx/article/details/74322074

接口请求结果

控制台执行顺序
第二种方式:通过HttpServletResponse直接写出
代码:
@RestControllerpublic class HelloController { @Autowired private HelloService service; @GetMapping("/hello") public void sayHello(String name, HttpServletResponse response) { service.sendMessage(name, response); }}
@Service(value = "helloService")public class HelloService{ @Autowired private HelloServiceGrpc.HelloServiceStub helloServiceStub; public void sendMessage(String name, HttpServletResponse response) { try { final PrintWriter printWriter = response.getWriter(); final SettableFuture<Void> finalFuture =SettableFuture.create(); StreamObserver<HelloRequest> helloRequestStreamObservers = helloServiceStub.sayHello( new StreamObserver<HelloResponse>() { @Override public void onNext(HelloResponse value) { System.out.println("onNext : " + value.getMessage()); if(null != printWriter){ printWriter.write(value.getMessage()); } } @Override public void onError(Throwable t) { finalFuture.setException(t); } @Override public void onCompleted() { finalFuture.set(null); } }); for (int i = 0; i < 10; i++) { helloRequestStreamObservers.onNext( HelloRequest.newBuilder() .setName(ByteString.copyFrom(name,Charset.forName("UTF-8"))) .build() ); } helloRequestStreamObservers.onCompleted(); //阻塞,等待通知 finalFuture.get(); if(finalFuture.isDone()){ printWriter.close(); } } catch (InterruptedException | ExecutionException | IOException e) { e.printStackTrace(); } }}
效果和上面基本一致,大家根据需要,自行参考,欢迎多多交流。原创帖,转载注明出处!
文章转载于:https://www.jianshu.com/p/af75feac5783
原著是一个有趣的人,若有侵权,请通知删除
还没有人抢沙发呢~