ASP.NET Core 3.0 gRPC 双向流

2019 年 10 月 13 日 DotNet


(给DotNet加星标,提升.Net技能


转自:晓晨Master
cnblogs.com/stulzq/p/11590088.html

一、前言


在前文《ASP.NET Core 3.0 使用gRPC》中有提到gRPC支持双向流调用,支持实时推送消息这也是gRPC的一大特点且gRPC在对双向流的控制支持上也是非常强大的。


二、什么是 gRPC 流


gRPC 有四种服务类型,分别是:简单 RPC(Unary RPC)、服务端流式 RPC (Server streaming RPC)、客户端流式 RPC (Client streaming RPC)、双向流式 RPC(Bi-directional streaming RPC)。它们主要有以下特点:



三、为什么 gRPC 支持流


gRPC 通信是基于 HTTP/2 实现的,它的双向流映射到 HTTP/2 流。HTTP/2 具有流的概念,流是为了实现HTTP/2的多路复用。流是服务器和客户端在HTTP/2连接内用于交换帧数据的独立双向序列,逻辑上可看做一个较为完整的交互处理单元,即表达一次完整的资源请求、响应数据交换流程;一个业务处理单元,在一个流内进行处理完毕,这个流生命周期完结。


特点如下:


  • 一个HTTP/2连接可同时保持多个打开的流,任一端点交换帧


  • 流可被客户端或服务器单独或共享创建和使用


  • 流可被任一端关闭


  • 在流内发送和接收数据都要按照顺序


  • 流的标识符自然数表示,1~2^31-1区间,有创建流的终端分配


  • 流与流之间逻辑上是并行、独立存在


摘自 HTTP/2笔记之流和多路复用 by 聂永


四、gRPC中使用双向流调用


我们在前文中编写的RPC属于简单RPC,没有包含流调用,下面直接讲一下双向流,根据第二小节列举的四种服务类型,如果我们掌握了简单RPC和双向流RPC,那么服务端流式 RPC和客户端流式 RPC自然也就没有问题了。


这里我们继续使用前文的代码,要实现的目标是一次给多个猫洗澡。


1、首先在 LuCat.proto 定义两个rpc,一个 Count 用于统计猫的数量,一个 双向流 RPC BathTheCat 用于给猫洗澡


syntax = "proto3";
option csharp_namespace = "AspNetCoregRpcService";
import "google/protobuf/empty.proto";
package LuCat; //定义包名

//定义服务
service LuCat{
//定义给猫洗澡双向流rpc
rpc BathTheCat(stream BathTheCatReq) returns ( stream BathTheCatResp);
//定义统计猫数量简单rpc
rpc Count(google.protobuf.Empty) returns (CountCatResult);
}

message SuckingCatResult{
string message=1;
}
message BathTheCatReq{
int32 id=1;
}
message BathTheCatResp{
string message=1;
}
message CountCatResult{
int32 Count=1;
}


2、添加服务的实现


这里安利下Resharper,非常方便



private readonly ILogger<LuCatService> _logger;
private static readonly List<string> Cats=new List<string>(){"英短银渐层","英短金渐层","美短","蓝猫","狸花猫","橘猫"};
private static readonly Random Rand=new Random(DateTime.Now.Millisecond);
public LuCatService(ILogger<LuCatService> logger)
{
_logger = logger;
}

public override async Task BathTheCat(IAsyncStreamReader<BathTheCatReq> requestStream, IServerStreamWriter<BathTheCatResp> responseStream, ServerCallContext context)
{
var bathQueue=new Queue<int>();
while (await requestStream.MoveNext())
{
//将要洗澡的猫加入队列
bathQueue.Enqueue(requestStream.Current.Id);
_logger.LogInformation($"Cat {requestStream.Current.Id} Enqueue.");
}
//遍历队列开始洗澡
while (bathQueue.TryDequeue(out var catId))
{
await responseStream.WriteAsync(new BathTheCatResp() { Message = $"铲屎的成功给一只{Cats[catId]}洗了澡!" });
await Task.Delay(500);//此处主要是为了方便客户端能看出流调用的效果
}
}

public override Task<CountCatResult> Count(Empty request, ServerCallContext context)
{
return Task.FromResult(new CountCatResult()
{
Count = Cats.Count
});
}


BathTheCat 方法会接收多个客户端发来的CatId,然后将他们加入队列中,等客户端发送完成后开始依次洗澡并返回给客户端。


3、客户端实现


随机发送10个猫Id给服务端,然后接收10个洗澡结果。


var channel = GrpcChannel.ForAddress("https://localhost:5001");
var catClient = new LuCat.LuCatClient(channel);
//获取猫总数
var catCount = await catClient.CountAsync(new Empty());
Console.WriteLine($"一共{catCount.Count}只猫。");
var rand = new Random(DateTime.Now.Millisecond);

var bathCat = catClient.BathTheCat();
//定义接收吸猫响应逻辑
var bathCatRespTask = Task.Run(async() =>
{
await foreach (var resp in bathCat.ResponseStream.ReadAllAsync())
{
Console.WriteLine(resp.Message);
}
});
//随机给10个猫洗澡
for (int i = 0; i < 10; i++)
{
await bathCat.RequestStream.WriteAsync(new BathTheCatReq() {Id = rand.Next(0, catCount.Count)});
}
//发送完毕
await bathCat.RequestStream.CompleteAsync();
Console.WriteLine("客户端已发送完10个需要洗澡的猫id");
Console.WriteLine("接收洗澡结果:");
//开始接收响应
await bathCatRespTask;
Console.WriteLine("洗澡完毕");


4、运行



可以看到双向流调用成功,客户端发送了10个猫洗澡请求对象,服务端返回了10个猫洗澡结果对象。且是实时推送的,这就是 gRPC 的双向流调用。


这里大家可以自行改进来演变成客户端流式或者服务端流式调用。客户端发送一个猫Id列表,然后服务端返回每个猫洗澡结果,这就是服务端流式调用。客户端依次发送猫Id,然后服务端一次性返回所有猫的洗澡结果(给所有猫洗澡看做是一个业务,返回这个业务的结果),就是客户端流式调用。这里我就不再演示了。


五、流控制


gRPC 的流式调用支持对流进行主动取消的控制,进而可以衍生出流超时限制等控制。


在流式调用,可以传一个 CancellationToken 参数,它就是我们用来对流进行取消控制的:



改造一下我们在第四小节的代码:


1、客户端


var cts = new CancellationTokenSource();
//指定在2.5s后进行取消操作
cts.CancelAfter(TimeSpan.FromSeconds(2.5));
var bathCat = catClient.BathTheCat(cancellationToken: cts.Token);
//定义接收吸猫响应逻辑
var bathCatRespTask = Task.Run(async() =>
{
try
{
await foreach (var resp in bathCat.ResponseStream.ReadAllAsync())
{
Console.WriteLine(resp.Message);
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
Console.WriteLine("Stream cancelled.");
}
});


2、服务端


//遍历队列开始洗澡
while (!context.CancellationToken.IsCancellationRequested && bathQueue.TryDequeue(out var catId))
{
_logger.LogInformation($"Cat {catId} Dequeue.");
await responseStream.WriteAsync(new BathTheCatResp() { Message = $"铲屎的成功给一只{Cats[catId]}洗了澡!" });
await Task.Delay(500);//此处主要是为了方便客户端能看出流调用的效果
}


3、运行



设置的是双向流式调用2.5s后取消流,从客户端调用结果看到,并没有收到全部10个猫的洗澡返回结果,流就已经被取消了,这就是 gRPC 的流控制。


六、结束


这里流式调用可以实现实时推送,服务端到客户端或者客户端到服务端实时推送消息,但是这个和传统意义上的长连接主动推送、广播消息不一样,不管你是服务端流式、客户端流式还是双向流式,必须要由客户端进行发起,通过客户端请求来建立流通信。


本文所用代码:https://github.com/stulzq/BlogDemos/tree/master/AspNetCoreGrpcWithStream


推荐阅读

(点击标题可跳转阅读)

GRPC与.NET Core

.NET World—gPRC概览

C# 8.0 两个有趣的新特性以及gRPC


看完本文有收获?请转发分享给更多人

关注「DotNet」加星标,提升.Net技能 

好文章,我在看❤️

登录查看更多
0

相关内容

【2020新书】使用高级C# 提升你的编程技能,412页pdf
专知会员服务
56+阅读 · 2020年6月26日
模型优化基础,Sayak Paul,67页ppt
专知会员服务
75+阅读 · 2020年6月8日
【北京大学】面向5G的命名数据网络物联网研究综述
专知会员服务
36+阅读 · 2020年4月26日
自动结构变分推理,Automatic structured variational inference
专知会员服务
38+阅读 · 2020年2月10日
【新书】Java企业微服务,Enterprise Java Microservices,272页pdf
TensorFlow Lite指南实战《TensorFlow Lite A primer》,附48页PPT
专知会员服务
69+阅读 · 2020年1月17日
【干货】大数据入门指南:Hadoop、Hive、Spark、 Storm等
专知会员服务
95+阅读 · 2019年12月4日
利用 AutoML 的功能构建和部署 TensorFlow.js 模型
TensorFlow
6+阅读 · 2019年12月16日
浅谈 Kubernetes 在生产环境中的架构
DevOps时代
11+阅读 · 2019年5月8日
React Native 分包哪家强?看这文就够了!
程序人生
13+阅读 · 2019年1月16日
一天精通无人中级篇:遥控器协议 S-BUS
无人机
51+阅读 · 2018年12月20日
.NET Core 环境下构建强大且易用的规则引擎
浅谈浏览器 http 的缓存机制
前端大全
6+阅读 · 2018年1月21日
Spark的误解-不仅Spark是内存计算,Hadoop也是内存计算
VrR-VG: Refocusing Visually-Relevant Relationships
Arxiv
6+阅读 · 2019年8月26日
QuAC : Question Answering in Context
Arxiv
4+阅读 · 2018年8月21日
Bidirectional Attention for SQL Generation
Arxiv
4+阅读 · 2018年6月21日
VIP会员
相关资讯
利用 AutoML 的功能构建和部署 TensorFlow.js 模型
TensorFlow
6+阅读 · 2019年12月16日
浅谈 Kubernetes 在生产环境中的架构
DevOps时代
11+阅读 · 2019年5月8日
React Native 分包哪家强?看这文就够了!
程序人生
13+阅读 · 2019年1月16日
一天精通无人中级篇:遥控器协议 S-BUS
无人机
51+阅读 · 2018年12月20日
.NET Core 环境下构建强大且易用的规则引擎
浅谈浏览器 http 的缓存机制
前端大全
6+阅读 · 2018年1月21日
Spark的误解-不仅Spark是内存计算,Hadoop也是内存计算
Top
微信扫码咨询专知VIP会员