vlambda博客
学习文章列表

基于 gRPC 和 .NET Core 的服务器流

什么是 gRPC

gRPC 是一个现代开源的高性能 RPC 框架,可以在任何环境下运行。它可以有效地连接数据中心内和跨数据中心的服务,并对负载平衡、跟踪、健康检查和认证提供可插拔的支持。gRPC 最初是由谷歌创建的,该公司使用一个名为 Stubby 的单一通用 RPC 基础设施来连接其数据中心内和跨数据中心运行的大量微服务,使用已经超过十年。2015 年 3 月,谷歌决定建立 Stubby 的下一个版本,并将其开源,结果就是现在的 gRPC,被许多企业或组织使用。

https://grpc.io/

gRPC 服务器流

服务器流式(Server Streaming)RPC,客户端向服务器发送请求,并获得一个流来读取一连串的消息。客户端从返回的流中读取信息,直到没有消息为止。gRPC 保证在单个 RPC 调用中的信息是有序的。

rpc GetLocationData (GetLocationRequest) returns (stream GetLocationResponse);

协议缓冲区(Protobuf)

gRPC 使用协议缓冲区(protocol buffers)作为接口定义语言(IDL)来定义客户端和服务器之间的契约。在下面的 proto 文件中,定义了一个 RPC 方法 GetLocations,它接收 GetLocationsRequest 消息类型并返回 GetLocationsResponse 消息类型。响应消息类型前面的 stream 关键字表示响应是流类型,而不是单个响应。

syntax = "proto3";

option csharp_namespace = "GPRCStreaming";

package location_data;

service LocationData {
rpc GetLocations (GetLocationsRequest) returns (stream GetLocationsResponse);
}

message GetLocationsRequest {
int32 dataLimit = 1;
}

message GetLocationsResponse {
int32 latitudeE7 = 1;
int32 longitudeE7 = 2;
}

创建 gRPC 服务

我们可以使用 dotnet new grpc -n threemillion 命令轻松创建一个 .NET gRPC 服务。更多关于在 ASP.NET Core 中创建 gRPC 服务器和客户端的信息可在微软文档中找到。

Create a gRPC client and server in ASP.NET Core
https://docs.microsoft.com/en-us/aspnet/core/tutorials/grpc/grpc-start?view=aspnetcore-5.0&tabs=visual-studio-code

在添加了 proto 文件并生成了 gRPC 服务资源文件后,接下来我添加了 LocationService 类。在下面的代码片段中,我有一个 LocationService 类,它继承了从 Location.proto 文件中生成的 LocationDataBase 类型。客户端可以通过 Startup.cs 文件中 Configure 方法中的 endpoints.MapGrpcService<LocationService>() 来访问 LocationService。当服务器收到 GetLocations 请求时,它首先通过 GetLocationData 方法调用读取 Data 文件夹中 LocationHistory.json 文件中的所有数据(未包含在源代码库)。该方法返回 RootLocation 类型,其中包含 List<Location> 类型的 Location 属性。Location 类由两个内部属性 LongitudeLatitude 组成。接下来,我循环浏览每个位置,然后将它们写入 responseStream 中,返回给客户端。服务器将消息写入流中,直到客户在 GetLocationRequest 对象中指定的 dataLimit

using System.Threading.Tasks;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using System.IO;
using System;
using System.Linq;

namespace GPRCStreaming
{
public class LocationService : LocationData.LocationDataBase
{
private readonly FileReader _fileReader;
private readonly ILogger<LocationService> _logger;

public LocationService(FileReader fileReader, ILogger<LocationService> logger)
{
_fileReader = fileReader;
_logger = logger;
}

public override async Task GetLocations(
GetLocationsRequest request,
IServerStreamWriter<GetLocationsResponse> responseStream,
ServerCallContext context)
{
try
{
_logger.LogInformation("Incoming request for GetLocationData");

var locationData = await GetLocationData();
var locationDataCount = locationData.Locations.Count;

var dataLimit = request.DataLimit > locationDataCount ? locationDataCount : request.DataLimit;

for (var i = 0; i <= dataLimit - 1; i++)
{
var item = locationData.Locations[i];

await responseStream.WriteAsync(new GetLocationsResponse
{
LatitudeE7 = item.LatitudeE7,
LongitudeE7 = item.LongitudeE7
});
}
}
catch (Exception exception)
{
_logger.LogError(exception, "Error occurred");
throw;
}
}

private async Task<RootLocation> GetLocationData()
{
var currentDirectory = Directory.GetCurrentDirectory();
var filePath = $"{currentDirectory}/Data/Location_History.json";

var locationData = await _fileReader.ReadAllLinesAsync(filePath);

return locationData;
}
}
}

现在,让我们运行该服务并发送一个请求。我将使用一个叫 grpcurl 的命令行工具,它可以让你与 gRPC 服务器交互。它基本上是针对 gRPC 服务器的 curl

https://github.com/fullstorydev/grpcurl

通过 grpcurl 与 gRPC 端点(endpoint)交互只有在 gRPC 反射服务被启用时才可用。这允许服务可以被查询,以发现服务器上的 gRPC 服务。扩展方法 MapGrpcReflectionService 需要引入 Microsoft.AspNetCore.Builder 的命名空间:

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UseEndpoints(endpoints =>
{
endpoints.MapGrpcService<LocationService>();

if (env.IsDevelopment())
{
endpoints.MapGrpcReflectionService();
}

endpoints.MapGet("/", async context =>
{
await context.Response.WriteAsync("Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
});
});
}
grpcurl -plaintext -d '{"dataLimit": "100000"}' localhost:80 location_data.LocationData/GetLocations

一旦服务器收到请求,它就会读取文件,然后在位置列表中循环,直到达到 dataLimit 计数,并将位置数据返回给客户端。

接下来,让我们创建一个 Blazor 客户端来调用 gRPC 服务。我们可以使用 IServiceCollection 接口上的 AddGrpcClient 扩展方法设置一个 gRPC 客户端:

public void ConfigureServices(IServiceCollection services)
{
services.AddRazorPages();
services.AddServerSideBlazor();
services.AddSingleton<WeatherForecastService>();

services.AddGrpcClient<LocationData.LocationDataClient>(client =>
{
client.Address = new Uri("http://localhost:80");
});
}

我使用 Virtualize Blazor 组件来渲染这些位置。Virtualize 组件不是一次性渲染列表中的每个项目,只有当前可见的项目才会被渲染。

ASP.NET Core Blazor component virtualization
https://docs.microsoft.com/en-us/aspnet/core/blazor/components/virtualization?view=aspnetcore-5.0

相关代码:

@page "/locationdata"

@using Grpc.Core
@using GPRCStreaming
@using threemillion.Data
@using System.Diagnostics
@using Microsoft.AspNetCore.Components.Web.Virtualization

@inject IJSRuntime JSRuntime;
@inject System.Net.Http.IHttpClientFactory _clientFactory
@inject GPRCStreaming.LocationData.LocationDataClient _locationDataClient

<table class="tableAction">
<tbody>
<tr>
<td>
<div class="data-input">
<label for="dataLimit">No of records to fetch</label>
<input id="dataLimit" type="number" @bind="_dataLimit" />
<button @onclick="FetchData" class="btn-submit">Call gRPC</button>
</div>
</td>
<td>
<p class="info">
Total records: <span class="count">@_locations.Count</span>
</p>
<p class="info">
Time taken: <span class="time">@_stopWatch.ElapsedMilliseconds</span> milliseconds
</p>
</td>
</tr>
</tbody>
</table>

<div class="tableFixHead">
<table class="table">
<thead>
<tr>
<th>Longitude</th>
<th>Latitude</th>
</tr>
</thead>
<tbody>
<Virtualize Items="@_locations" Context="locations">
<tr>
<td>@locations.LongitudeE7</td>
<td>@locations.LatitudeE7</td>
</tr>
</Virtualize>
</tbody>
</table>
</div>

@code {
private int _dataLimit = 1000;

private List<Location> _locations = new List<Location>();

private Stopwatch _stopWatch = new Stopwatch();

protected override async Task OnInitializedAsync()
{
await FetchData();
}

private async Task FetchData()
{
ResetState();

_stopWatch.Start();

using (var call = _locationDataClient.GetLocations(new GetLocationsRequest { DataLimit = _dataLimit }))
{
await foreach (var response in call.ResponseStream.ReadAllAsync())
{
_locations.Add(new Location { LongitudeE7 = response.LongitudeE7, LatitudeE7 = response.LatitudeE7 });

StateHasChanged();
}
}

_stopWatch.Stop();
}

private void ResetState()
{
_locations.Clear();

_stopWatch.Reset();

StateHasChanged();
}
}

通过在本地运行的流调用,从 gRPC 服务器接收 2,876,679 个单独的响应大约需要 8 秒钟。让我们也在 Mapbox 中加载数据:

@page "/mapbox"

@using Grpc.Core
@using GPRCStreaming
@using System.Diagnostics

@inject IJSRuntime JSRuntime;
@inject System.Net.Http.IHttpClientFactory _clientFactory
@inject GPRCStreaming.LocationData.LocationDataClient LocationDataClient

<table class="tableAction">
<tbody>
<tr>
<td>
<div class="data-input">
<label for="dataLimit">No of records to fetch</label>
<input id="dataLimit" type="number" @bind="_dataLimit" />
<button @onclick="LoadMap" class="btn-submit">Load data</button>
</div>
</td>
<td>
<p class="info">
Total records: <span class="count">@_locations.Count</span>
</p>
<p class="info">
Time taken: <span class="time">@_stopWatch.ElapsedMilliseconds</span> milliseconds
</p>
</td>
</tr>
</tbody>
</table>

<div id='map' style="width: 100%; height: 90vh;"></div>

@code {
private int _dataLimit = 100;

private List<object> _locations = new List<object>();

private Stopwatch _stopWatch = new Stopwatch();

protected override async Task OnAfterRenderAsync(bool firstRender)
{
if (!firstRender)
{
return;
}

await JSRuntime.InvokeVoidAsync("mapBoxFunctions.initMapBox");
}

private async Task LoadMap()
{
ResetState();

_stopWatch.Start();

using (var call = LocationDataClient.GetLocations(new GetLocationsRequest { DataLimit = _dataLimit }))
{

await foreach (var response in call.ResponseStream.ReadAllAsync())
{
var pow = Math.Pow(10, 7);
var longitude = response.LongitudeE7 / pow;
var latitude = response.LatitudeE7 / pow;

_locations.Add(new
{
type = "Feature",
geometry = new
{
type = "Point",
coordinates = new double[] { longitude, latitude }
}
});

StateHasChanged();
}

_stopWatch.Stop();

await JSRuntime.InvokeVoidAsync("mapBoxFunctions.addClusterData", _locations);
}
}

private void ResetState()
{
JSRuntime.InvokeVoidAsync("mapBoxFunctions.clearClusterData");

_locations.Clear();

_stopWatch.Reset();

StateHasChanged();
}
}

喜欢就来个三连,让更多人因你而受益