WebSocket的替代方案:什么是gRPC流及gRPC流的实现

WebSocket 的替代方案:本文将介绍 gRPC 流及其不同类型,并将其与 WebSockets 进行比较。

WebSocket的替代方案:什么是gRPC流及gRPC流的实现
gRPC 中的流式传输,来源 knoldus.com

什么是 WebSockets?

我们都知道 HTTP/HTTPS,它是客户端和服务器之间的单向请求-响应协议。HTTP 协议中的连接生命周期就是请求和响应的跨度。

WebSockets 的概念与此类似,它是客户端-服务器架构中的双向流。WebSocket 协议中的连接生命周期可以无限长,直到某些外部干扰终止连接。

恢复性应用程序接口是 HTTP 协议的一个例子,而实时聊天系统则是 WebSocket 协议的一个例子。

WebSockets 的用途

WebSockets 有多种用途,包括实时网络应用程序、聊天应用程序和游戏应用程序。

在需要通过网络连续传输数据流的应用中,我们可以使用 WebSockets。在只想一次性获取数据的情况下,我们可以使用 HTTP。

什么是 gRPC 流?

GRPC 提供三种不同的流模式:客户端流、服务器流和全双工流(包括客户端和服务器)。

在客户端流模式下,服务器将接收来自客户端的一系列信息,并在没有更多信息时做出响应。

使用服务器流式传输时,客户端发出请求,服务器回应一串信息,客户端读取每一条信息,直到没有信息为止。

全双工使客户端和服务器都能互相发送一系列信息,直到没有信息为止。

双向流适用于客户端和服务器之间需要持续共享消息的情况。

为什么使用 gRPC 流?

为什么要使用 gRPC,答案很简单。为了更容易理解,我们可以将其与 WebSockets 进行比较。
gRPC 使用 HTTP/2.0,而 WebSockets 使用 HTTP/1.1。使用 gRPC 可以提供更高的安全系统,因为它们提供内置加密,而 WebSockets 则需要访问控制。此外,gRPC 使用二进制数据格式进行通信,而 WebSockets 使用 JSON、MTTQ 等数据格式。

因此,gRPC 提供了比 WebSockets 更安全、更快速的通信渠道。

此外,gRPC 支持多路复用,使客户端可以在单个连接上发送和接收多个信息,减少了每次创建 HTTP/2 连接的开销,从而形成了一个可重复使用的连接。

如何实现 gRPC 流?

客户端流

首先,让我们了解一下什么是客户端流?

  • 它是客户端和服务器之间的通信,客户端向服务器发送信息流。
  • 信息流结束后(由客户端提示),服务器响应一次。

原始文件

我们正在使用客户端流更新驱动程序的位置,并添加了一些虚拟变量。

syntax = 'proto3';

package locationTrackingApp;

service Location {
    rpc updateLocation (stream LoactionReq) returns (LoactionRes);
}

message LoactionRepq {
    string driverId = 1;
    string longitude = 2;
    string latitudes = 3;
}

message LoactionRes {
}

实施

  • 我们将首先使用 NodeJs 实现服务器的解决方案。
  • 注意:我已经添加了 SSL 证书,以确保客户端和服务器之间的通信安全。不过,如果你愿意,也可以忽略这一点。

服务器

// grpc-js is the grpc library for node js, 
// where proto-loader is a library to compiler proto on runtime
const grpc = require("@grpc/grpc-js");
const protoLoader = require("@grpc/proto-loader");
const fs = require("fs");
// load the certificates 
function getServerCredentials() {
  const serverCert = fs.readFileSync("./Path_To_Cert/server-cert.pem");
  const serverKey = fs.readFileSync("./Path_To_Cert/server-key.pem");

  const serverCredentials = grpc.ServerCredentials.createSsl(
    null,
    [
      {
        cert_chain: serverCert,
        private_key: serverKey,
      },
    ],
    false
  );
  return serverCredentials;
}
function main() {
  const server = new grpc.Server();
  const packageDefinition = protoLoader.loadSync("./Path_To_Proto/locationStream.proto", {});
  // we get the package name from the loaded proto file and register our 
  // service defination to it
  const locationTrackingApp =
    grpc.loadPackageDefinition(packageDefinition).locationTrackingApp;
  // Add the service
  server.addService(locationTrackingApp.Location.service, {
    updateLocation: updateLocation,
  });

  const credentials = getServerCredentials();
  // PS: to remove credentials use grpc.ServerCredentials.createInsecure()
  server.bindAsync("0.0.0.0:50051", credentials, (err, port) => {
    if (err) {
      console.error(err)
      return
    }
    server.start();
    console.log(`Server running at http://0.0.0.0:${port}`);
  });
}

var driverLocations = {};

function updateLocation(call, callback) {

  call.on('data', (req) => {
    if (!driverLocations[req.driverId]) {
      driverLocations = {
        ...driverLocations,
        [req.driverId]: [{
          longitude: req.longitude,
          latitudes: req.latitudes
        }]
      }
    } else {
      driverLocations[req.driverId].push({
        longitude: req.longitude,
        latitudes: req.latitudes
      })
    }
  })

  call.on('end', () => {
    console.log("Stream Completed")
    console.log(driverLocations)
    callback(null, {});
  });
}

客户端流提供两个事件:

  • 数据:客户端写入信息的事件。
  • 结束:来自客户端的流结束事件。

我将分享两个客户端代码示例:Node 和 JAVA(以防您在 Android 环境中工作)。

NodeJS 客户端

const grpc = require("@grpc/grpc-js");
const protoLoader = require("@grpc/proto-loader");
const fs = require("fs");

function getChannelCredentials() {
  const rootCert = fs.readFileSync("./Path_To_Cert/ca-cert.pem");

  const channelCredentials = grpc.ChannelCredentials.createSsl(rootCert);

  return channelCredentials;
}

function main() {
  const packageDefinition = protoLoader.loadSync("./proto/locationStream.proto", {});

  const locationTrackingApp =
    grpc.loadPackageDefinition(packageDefinition).locationTrackingApp;
  const client = new locationTrackingApp.Location(
    "localhost:50051",
    getChannelCredentials()
  );

  // here we establish a link with our server
  const stream = client.updateLocation((error, res) => {
    if (error) {
      console.error(error)
      return;
    }
    console.log(res)
  });
  
  // with the same link / stream valiable we will write as many messages we want
  for (let i = 0; i < 10; i++) {
    // this is the "data" event
    stream.write({
      driverId: "driver",
      longitude: "2." + i,
      latitudes: "3.1"
    });
  }
  // this send the "end" event
  stream.end();
}

main();

Java客户端

 // loading the certificates and building a secure channel with the server.
  final SslContext sslCerts = loadTLSCredentials();
  final ManagedChannel channel = NettyChannelBuilder
          .forTarget("localhost:50051")
          .sslContext(sslCerts)
          .build();
  final LocationGrpc.LocationStub stub = LocationGrpc.newStub(channel);
  // are this is a stream we will use StreamObserver to 
  // observe response from server "responseObserver"
  StreamObserver<LoactionRes> responseObserver = new StreamObserver<>() {
      @Override
      // function to handle response from server
      public void onNext(LoactionRes res) {
          System.out.println(res);
      }

      @Override
      // handle error if any received from server
      public void onError(Throwable throwable) {
          throwable.printStackTrace();
          finishLatch.countDown();

      }

      @Override
      public void onCompleted() {
          System.out.println("Finished UpdateLocation");
          finishLatch.countDown();

      }
  };

  // now we will create a stream with the secure channel 
  // we will use the responseObserver to send message stream
  StreamObserver<LoactionReq> responseObserver= stub.updateLocation(responseObserver);
  System.out.println("requestObserver stream started");

  for(int i=0;i<10;i++){
      // this emmit the "data" event
      requestObserver.onNext(LoactionReq.
              newBuilder()
              .setDriverId("driver_java_1_tester")
              .setLatitudes("lat"+i)
              .setLongitude("long")
              .build());
      // java requires a minute wait as 
      // processing is done in asynchronous manner
      Thread.sleep(5);
  }
  // this emits the "end" event
  requestObserver.onCompleted();

// How to load certificates
public static SslContext loadTLSCredentials() throws SSLException {
        File serverCACertFile = new File("Path_To_Certs/ca-cert.pem");
        return GrpcSslContexts.forClient()
                .trustManager(serverCACertFile)
                .build();
    }

在 Android 环境中,我们的证书和通道创建更新为:

Resources res = getResources();            
CertificateFactory cf = CertificateFactory.getInstance("X.509");
InputStream mainstream = res.openRawResource(R.raw.ca_cert);
Certificate ca = cf.generateCertificate(mainstream);
KeyStore kStore = KeyStore.getInstance(KeyStore.getDefaultType());
kStore.load(null, null);
kStore.setCertificateEntry("ca", ca);
TrustManagerFactory tmf = TrustManagerFactory
            .getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(kStore);
TrustManager[]  trustManagers = tmf.getTrustManagers();
if (trustManagers.length != 1 || !(trustManagers[0] instanceof X509TrustManager)) {
    throw new IllegalStateException("Unexpected default trust managers:" + Arrays.toString(trustManagers));
}
SSLContext context = SSLContext.getInstance("TLS");
context.init(null, tmf.getTrustManagers(), null);
SSLSocketFactory sslSocketFactory = context.getSocketFactory();
final ManagedChannel channel = OkHttpChannelBuilder
            .forTarget("IP:PORT | domain ")
            .useTransportSecurity()
            .overrideAuthority("IP:PORT | domain")
            .sslSocketFactory(sslSocketFactory)
            .build();

在 Android 中,我们将证书添加为原始资源,并将其加载和设置为证书,我们使用证书初始化存根(gRPC 实例)。详细内容查看:如何在 Android 中使用 TLS 搭建 gRPC 客户端

版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。

(0)

相关推荐

发表回复

登录后才能评论