狠狠色丁香婷婷综合尤物/久久精品综合一区二区三区/中国有色金属学报/国产日韩欧美在线观看 - 国产一区二区三区四区五区tv

LOGO OA教程 ERP教程 模切知識(shí)交流 PMS教程 CRM教程 開(kāi)發(fā)文檔 其他文檔  
 
網(wǎng)站管理員

基于HTTP2/3的流模式消息交換如何實(shí)現(xiàn)?

freeflydom
2024年2月24日 15:32 本文熱度 1381

我想很多人已經(jīng)體驗(yàn)過(guò)GRPC提供的三種流式消息交換(Client Stream、Server Stream和Duplex Stream)模式,在.NET Core上構(gòu)建的GRPC應(yīng)用本質(zhì)上是采用HTTP2/HTTP3協(xié)議的ASP.NET Core應(yīng)用,我們當(dāng)然也可以在一個(gè)普通的ASP.NET Core應(yīng)用實(shí)現(xiàn)這些流模式。不僅如此,HttpClient也提供了響應(yīng)的支持,這篇文章通過(guò)一個(gè)簡(jiǎn)單的實(shí)例提供了相應(yīng)的實(shí)現(xiàn),源代碼從這里下載。

一、雙向流的效果
二、[服務(wù)端]流式請(qǐng)求/響應(yīng)的讀寫(xiě)
三、[客戶(hù)端]流式響應(yīng)/請(qǐng)求的讀寫(xiě)

一、雙向流的效果

在提供具體實(shí)現(xiàn)之前,我們不妨先來(lái)演示一下最終的效果。我們通過(guò)下面這段代碼構(gòu)建了一個(gè)簡(jiǎn)單的ASP.NET Core應(yīng)用,如代碼片段所示,在調(diào)用WebApplication的靜態(tài)方法CreateBuilder將WebApplicationBuilder創(chuàng)建出來(lái)后,我們調(diào)用其擴(kuò)展方法UseKestrel將默認(rèn)終結(jié)點(diǎn)的監(jiān)聽(tīng)協(xié)議設(shè)置為Http1AndHttp2AndHttp3,這樣我們的應(yīng)用將提供針對(duì)不同HTTP協(xié)議的全面支持。

var url = "http://localhost:9999";
var builder = WebApplication.CreateBuilder(args);
builder.WebHost
    .UseKestrel(kestrel=> kestrel.ConfigureEndpointDefaults(listen=>listen.Protocols = HttpProtocols.Http1AndHttp2AndHttp3))
    .UseUrls(url);
var app = builder.Build();
app.MapPost("/", httpContext=> HandleRequestAsync(httpContext, async (request, writer) => {
    Console.WriteLine($"[Server]Receive request message: {request}");
    await writer.WriteStringAsync(request);
}));
await app.StartAsync();
await SendStreamRequestAsync(url, ["foo", "bar", "baz", "qux"], reply => {
    Console.WriteLine($"[Client]Receive reply message: {reply}\n");
    return Task.CompletedTask;
});

我們針對(duì)根路徑(/)注冊(cè)了一個(gè)HTTP方法為POST的路由終結(jié)點(diǎn),終結(jié)點(diǎn)處理器調(diào)用HanleRequestAsync來(lái)處理請(qǐng)求。這個(gè)方法提供一個(gè)Func<string, PipeWriter, Task>類(lèi)型的參數(shù)作為處理器,該委托的第一個(gè)參數(shù)表示接收到的單條請(qǐng)求消息,PipeWriter用來(lái)寫(xiě)入響應(yīng)內(nèi)容。在這里我們將接收到的消息進(jìn)行簡(jiǎn)單格式化后將其輸出到控制臺(tái)上,隨之將其作為響應(yīng)內(nèi)容進(jìn)行回寫(xiě)。

在應(yīng)用啟動(dòng)之后,我們調(diào)用SendStreamRequestAsync方法以流的方式發(fā)送請(qǐng)求,并處理接收到的響應(yīng)內(nèi)容。該方法的第一個(gè)參數(shù)為請(qǐng)求發(fā)送的目標(biāo)URL,第二個(gè)參數(shù)是一個(gè)字符串?dāng)?shù)組,我們將以流的方式逐個(gè)發(fā)送每個(gè)字符串。最后的參數(shù)是一個(gè)Func<string,Task>類(lèi)型的委托,用來(lái)處理接收到的響應(yīng)內(nèi)容(字符串),在這里我們依然是將格式化的響應(yīng)內(nèi)容直接打印在控制臺(tái)上。

程序啟動(dòng)后控制臺(tái)上將出現(xiàn)如上圖所示的輸出,客戶(hù)端/服務(wù)端接收內(nèi)容的交錯(cuò)輸出體現(xiàn)了我們希望的“雙向流式”消息交換模式。我們將在后續(xù)介紹HanleRequestAsync和SendStreamRequestAsync方法的實(shí)現(xiàn)邏輯。

二、[服務(wù)端]流式請(qǐng)求/響應(yīng)的讀寫(xiě)

HanleRequestAsync方法定義如下。如代碼片段所示,我們利用請(qǐng)求的BodyReader和響應(yīng)的BodyWriter來(lái)對(duì)請(qǐng)求和響應(yīng)內(nèi)容進(jìn)行讀寫(xiě),它們的類(lèi)型分別是PipeReader和PipeWriter。在一個(gè)循環(huán)中,在利用BodyReader將請(qǐng)求緩沖區(qū)內(nèi)容讀取出來(lái)后,我們將得到的ReadOnlySequence<byte>對(duì)象作為參數(shù)調(diào)用輔助方法TryReadMessage讀取單條請(qǐng)求消息,并調(diào)用handler參數(shù)表示的處理器進(jìn)行處理。當(dāng)請(qǐng)求內(nèi)容接收完畢后,循環(huán)終止。

static async Task HandleRequestAsync(HttpContext httpContext, Func<string, PipeWriter, Task> handler)
{
    var reader = httpContext.Request.BodyReader;
    var writer = httpContext.Response.BodyWriter;
    while (true)
    {
        var result = await reader.ReadAsync();
        var buffer = result.Buffer;
        while (TryReadMessage(ref buffer, out var message))
        {
            await handler(message, writer);
        }
        reader.AdvanceTo(buffer.Start, buffer.End);
        if (result.IsCompleted)
        {
            break;
        }
    }
}

由于客戶(hù)端發(fā)送的單條字符串消息長(zhǎng)度不限,為了精準(zhǔn)地將其讀出來(lái),我們需要在輸出編碼后的消息內(nèi)容前添加4個(gè)字節(jié)的整數(shù)來(lái)表示消息的長(zhǎng)度。所以在如下所示的TryReadMessage方法中,我們會(huì)先將字節(jié)長(zhǎng)度讀取出來(lái),再據(jù)此將消息自身內(nèi)容讀取出來(lái),最終通過(guò)解碼得到消息字符串。

static bool TryReadMessage(ref ReadOnlySequence<byte> buffer, [NotNullWhen(true)]out string? message)
{
    var reader = new SequenceReader<byte>(buffer);
    if (!reader.TryReadLittleEndian(out int length))
    {
        message = default;
        return false;
    }
    message = Encoding.UTF8.GetString(buffer.Slice(4, length));
    buffer = buffer.Slice(length + 4);
    return true;
}

響應(yīng)消息的寫(xiě)入是通過(guò)如下針對(duì)PipeWriter的WriteStringAsync擴(kuò)展方法實(shí)現(xiàn)的,這里的PipeWriter就是響應(yīng)的BodyWriter,針對(duì)“Length + Payload“的消息寫(xiě)入也體現(xiàn)在這里。

public static class Extensions
{
    public static ValueTask<FlushResult> WriteStringAsync(this PipeWriter writer, string content)
    {
        var length = Encoding.UTF8.GetByteCount(content);
        var span = writer.GetSpan(4 + length);
        BitConverter.TryWriteBytes(span, length);
        Encoding.UTF8.GetBytes(content, span.Slice(4));
        writer.Advance(4 + length);
        return writer.FlushAsync();
    }
}

三、[客戶(hù)端]流式響應(yīng)/請(qǐng)求的讀寫(xiě)

客戶(hù)端利用HttpClient發(fā)送請(qǐng)求。針對(duì)HttpClient的請(qǐng)求通過(guò)一個(gè)HttpRequestMessage對(duì)象表示,其主體內(nèi)容體現(xiàn)為一個(gè)HttpContent。流式請(qǐng)求的發(fā)送是通過(guò)如下這個(gè)StreamContent類(lèi)型實(shí)現(xiàn)的,它派生于HttpContent。我們重寫(xiě)了SerializeToStreamAsync方法,利用自定義的StreamContentWriter將內(nèi)容寫(xiě)入請(qǐng)求輸出流。

public class StreamContent(StreamContentWriter writer) : HttpContent
{
    private readonly StreamContentWriter _writer = writer;
    protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context) 
        => _writer.SetOutputStream(stream).WaitAsync();
    protected override bool TryComputeLength(out long length) => (length = -1) != -1;
}
public class StreamContentWriter
{
    private readonly TaskCompletionSource<Stream> _streamSetSource = new();
    private readonly TaskCompletionSource _streamEndSource = new();
    public StreamContentWriter SetOutputStream(Stream outputStream)
    {
        _streamSetSource.SetResult(outputStream);
        return this;
    }
    public async Task WriteAsync(string content)
    {
        var stream = await _streamSetSource.Task;
        await PipeWriter.Create(stream).WriteStringAsync(content);
    }
    public void Complete() => _streamEndSource.SetResult();
    public Task WaitAsync() => _streamEndSource.Task;
}

StreamContentWriter提供了四個(gè)方法,SetOutputStream方法用來(lái)設(shè)置請(qǐng)求輸出流,上面重寫(xiě)的SerializeToStreamAsync調(diào)用了此方法。單條字符串消息的寫(xiě)入實(shí)現(xiàn)在WriteAsync方法中,它最終調(diào)用的依然是上面提供的WriteStringAsync擴(kuò)展方法。整個(gè)流式請(qǐng)求的過(guò)程通過(guò)一個(gè)TaskCompletionSource對(duì)象提供的Task來(lái)表示,當(dāng)客戶(hù)端完成所有輸出后,會(huì)調(diào)用Complete方法,該方法進(jìn)一步調(diào)用這個(gè)TaskCompletionSource對(duì)象的SetResult方法。由于WaitAsync方法返回TaskCompletionSource對(duì)象提供的Task,SerializeToStreamAsync方法會(huì)調(diào)用此方法等待”客戶(hù)端輸出流“的終結(jié)。

如下的代碼片段體現(xiàn)了SendStreamRequestAsync方法的實(shí)現(xiàn)。在這里我們創(chuàng)建了一個(gè)表示流式請(qǐng)求的HttpRequestMessage對(duì)象,我們將協(xié)議版本設(shè)置為HTTP2,作為主體內(nèi)容的HttpContent正式根據(jù)StreamContentWriter對(duì)象創(chuàng)建的StreamContent對(duì)象。

 static async Task SendStreamRequestAsync(string url,string[] lines, Func<string, Task> handler)
{
    using var httpClient = new HttpClient();
    var writer = new StreamContentWriter();
    var request = new HttpRequestMessage(HttpMethod.Post, url)
    {
        Version = HttpVersion.Version20,
        VersionPolicy = HttpVersionPolicy.RequestVersionExact,
        Content = new StreamingWeb.StreamContent(writer)
    };
    var task = httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
    _ = Task.Run(async () =>
    {
        var response = await task;
        var reader = PipeReader.Create(await response.Content.ReadAsStreamAsync());
        while (true)
        {
            var result = await reader.ReadAsync();
            var buffer = result.Buffer;
            while (TryReadMessage(ref buffer, out var message))
            {
                await handler(message);
            }
            reader.AdvanceTo(buffer.Start, buffer.End);
            if (result.IsCompleted)
            {
                break;
            }
        }
    });
    foreach (string line in lines)
    {
        await writer.WriteAsync($"{line} ({DateTimeOffset.UtcNow})");
        await Task.Delay(1000);
    }
    writer.Complete();
}

我們將這個(gè)HttpRequestMessage作為請(qǐng)求利用HttpClient發(fā)送出去,實(shí)際上發(fā)送的內(nèi)容最終是通過(guò)調(diào)用StreamContentWriter對(duì)象的WriteAsync方法輸出的,我們每隔1秒發(fā)送一條消息。HttpClient將請(qǐng)求發(fā)出去之后會(huì)得到一個(gè)通過(guò)HttpResponseMessage對(duì)象表示的響應(yīng),在一個(gè)異步執(zhí)行的Task中,我們根據(jù)響應(yīng)流創(chuàng)建一個(gè)PipeReader對(duì)象,并在一個(gè)循環(huán)中調(diào)用上面定義的TryReadMessage方法逐條讀取接收到的單條消息進(jìn)行處理。


本文作者:Artech,轉(zhuǎn)自https://www.cnblogs.com/artech/p/18021662/streaming_messaging


該文章在 2024/2/24 15:32:33 編輯過(guò)
關(guān)鍵字查詢(xún)
相關(guān)文章
正在查詢(xún)...
點(diǎn)晴ERP是一款針對(duì)中小制造業(yè)的專(zhuān)業(yè)生產(chǎn)管理軟件系統(tǒng),系統(tǒng)成熟度和易用性得到了國(guó)內(nèi)大量中小企業(yè)的青睞。
點(diǎn)晴PMS碼頭管理系統(tǒng)主要針對(duì)港口碼頭集裝箱與散貨日常運(yùn)作、調(diào)度、堆場(chǎng)、車(chē)隊(duì)、財(cái)務(wù)費(fèi)用、相關(guān)報(bào)表等業(yè)務(wù)管理,結(jié)合碼頭的業(yè)務(wù)特點(diǎn),圍繞調(diào)度、堆場(chǎng)作業(yè)而開(kāi)發(fā)的。集技術(shù)的先進(jìn)性、管理的有效性于一體,是物流碼頭及其他港口類(lèi)企業(yè)的高效ERP管理信息系統(tǒng)。
點(diǎn)晴WMS倉(cāng)儲(chǔ)管理系統(tǒng)提供了貨物產(chǎn)品管理,銷(xiāo)售管理,采購(gòu)管理,倉(cāng)儲(chǔ)管理,倉(cāng)庫(kù)管理,保質(zhì)期管理,貨位管理,庫(kù)位管理,生產(chǎn)管理,WMS管理系統(tǒng),標(biāo)簽打印,條形碼,二維碼管理,批號(hào)管理軟件。
點(diǎn)晴免費(fèi)OA是一款軟件和通用服務(wù)都免費(fèi),不限功能、不限時(shí)間、不限用戶(hù)的免費(fèi)OA協(xié)同辦公管理系統(tǒng)。
Copyright 2010-2025 ClickSun All Rights Reserved