纵有疾风起
人生不言弃

springboot+grpc中的流式传输数据返回探索

背景:

接上篇 https://www.jianshu.com/p/c7d390efba29 中使用双向流 解决了grpc传输最大上限问题,避免了因为传输上限导致的错误,但是很显然没有将grpcServer的返回值传回页面,这篇文章我们来解决这个问题。

查看官方demo http://doc.oschina.net/grpc?t=60134页尾,找到

 public void routeChat() throws Exception {    info("*** RoutChat");    final SettableFuture<Void> finishFuture = SettableFuture.create();    StreamObserver<RouteNote> requestObserver =        asyncStub.routeChat(new StreamObserver<RouteNote>() {          @Override          public void onNext(RouteNote note) {            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()                .getLatitude(), note.getLocation().getLongitude());          }          @Override          public void onError(Throwable t) {            finishFuture.setException(t);          }          @Override          public void onCompleted() {            finishFuture.set(null);          }        });    try {      RouteNote[] requests =          {newNote("First message", 0, 0), newNote("Second message", 0, 1),              newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};      for (RouteNote request : requests) {        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()            .getLatitude(), request.getLocation().getLongitude());        requestObserver.onNext(request);      }      requestObserver.onCompleted();      finishFuture.get();      info("Finished RouteChat");    } catch (Exception t) {      requestObserver.onError(t);      logger.log(Level.WARNING, "RouteChat Failed", t);      throw t;    }  }

当然官方也只是解决了如何判定传输完了,那么我们改造一下

Code

GrpcServer

@GrpcService(HelloServiceGrpc.class)public class HelloService extends HelloServiceGrpc.HelloServiceImplBase {    private static final Logger logger = LoggerFactory.getLogger(HelloService.class);    @Override    public StreamObserver<HelloRequest> sayHello(StreamObserver<HelloResponse> responseObserver) {        return new StreamObserver<HelloRequest>() {            @Override            public void onNext(HelloRequest value) {                String name = value.getName().toStringUtf8();                logger.info("received name :" + name);            }            @Override            public void onError(Throwable t) {                logger.warn("Encountered error in recordRoute", t);            }            @Override            public void onCompleted() {                responseObserver.onNext(HelloResponse.newBuilder().setMessage("welcome to gRPC").build());                //此处为了说明问题,返回两次Response                responseObserver.onNext(HelloResponse.newBuilder().setMessage(" second message ").build());                responseObserver.onCompleted();            }        };    }}

客户端Controller

@RestControllerpublic class HelloController {    @Autowired    private HelloService service;    @GetMapping("/hello")    public String sayHello(String name) {        return service.sendMessage(name);    }}

客户端GrpcConfig

@Componentpublic class GrpcConfig {    @GrpcClient(value = "local-grpc-server")    private Channel channel;    @Bean("helloServiceStub")    public HelloServiceGrpc.HelloServiceStub getHelloServiceStub() {        return HelloServiceGrpc.newStub(channel);    }}

客户端HelloService

@Service(value = "helloService")public class HelloService{    @Autowired    private  HelloServiceGrpc.HelloServiceStub helloServiceStub;    public String sendMessage(String name) {        StringBuffer stringBuffer = new StringBuffer();        final SettableFuture<Void> finalFuture =SettableFuture.create();        StreamObserver<HelloRequest> helloRequestStreamObservers = helloServiceStub.sayHello(                new StreamObserver<HelloResponse>() {            @Override            public void onNext(HelloResponse value) {                System.out.println("onNext : " + value.getMessage());                //将返回的内容全部追加到一个字符串中                stringBuffer.append(value.getMessage());            }            @Override            public void onError(Throwable t) {                finalFuture.setException(t);            }            @Override            public void onCompleted() {                finalFuture.set(null);            }        });        for (int i = 0; i < 10; i++) {            helloRequestStreamObservers.onNext(                    HelloRequest.newBuilder()                            .setName(ByteString.copyFrom(name,Charset.forName("UTF-8")))                            .build()            );        }        helloRequestStreamObservers.onCompleted();        try {            finalFuture.get();            while(finalFuture.isDone()){                System.out.println("is done");                return stringBuffer.toString();            }        } catch (InterruptedException e) {            e.printStackTrace();        } catch (ExecutionException e) {            e.printStackTrace();        }        return null;    }}

关于com.google.common.util.concurrent.SettableFuture源码,详细看这个帖子:https://blog.csdn.net/lb7758zx/article/details/74322074

springboot+grpc中的流式传输数据返回探索插图
接口请求结果

springboot+grpc中的流式传输数据返回探索插图1
控制台执行顺序

第二种方式:通过HttpServletResponse直接写出

代码:

@RestControllerpublic class HelloController {    @Autowired    private HelloService service;    @GetMapping("/hello")    public void sayHello(String name, HttpServletResponse response) {        service.sendMessage(name, response);    }}
@Service(value = "helloService")public class HelloService{    @Autowired    private  HelloServiceGrpc.HelloServiceStub helloServiceStub;    public void sendMessage(String name, HttpServletResponse response) {        try {            final PrintWriter printWriter = response.getWriter();            final SettableFuture<Void> finalFuture =SettableFuture.create();            StreamObserver<HelloRequest> helloRequestStreamObservers = helloServiceStub.sayHello(                    new StreamObserver<HelloResponse>() {                        @Override                        public void onNext(HelloResponse value) {                            System.out.println("onNext : " + value.getMessage());                            if(null != printWriter){                                printWriter.write(value.getMessage());                            }                        }                        @Override                        public void onError(Throwable t) {                            finalFuture.setException(t);                        }                        @Override                        public void onCompleted() {                            finalFuture.set(null);                        }                    });            for (int i = 0; i < 10; i++) {                helloRequestStreamObservers.onNext(                        HelloRequest.newBuilder()                                .setName(ByteString.copyFrom(name,Charset.forName("UTF-8")))                                .build()                );            }            helloRequestStreamObservers.onCompleted();            //阻塞,等待通知            finalFuture.get();            if(finalFuture.isDone()){                printWriter.close();            }        } catch (InterruptedException | ExecutionException | IOException e) {            e.printStackTrace();        }    }}

效果和上面基本一致,大家根据需要,自行参考,欢迎多多交流。
原创帖,转载注明出处!

文章转载于:https://www.jianshu.com/p/af75feac5783

原著是一个有趣的人,若有侵权,请通知删除

未经允许不得转载:起风网 » springboot+grpc中的流式传输数据返回探索
分享到: 生成海报

评论 抢沙发

评论前必须登录!

立即登录