Skip to content

[API Proposal]: Support parsing server-sent events (SSE) #98105

Closed
@stephentoub

Description

@stephentoub

Background and motivation

SSE is becoming more and more popular, especially with prominent services like OpenAI relying on it for streaming responses. The format is very simple, but it still takes some amount of code to properly handle parsing the SSE format. We should have a built-in helper in either System.Net or System.Formats that take care of it for the developer (optionally then other higher-level helpers could be layered on top).

API Proposal

namespace System.Formats.Sse;

public readonly struct SseItem<T>
{
    public SseItem(T data, string eventType);
    public T Data { get; }
    public string EventType { get; }

    public const string EventTypeDefault = "message"; // defined as the default by SSE spec
}

public static class SseParser
{
    public static SseEnumerable<string> Parse(Stream sseStream); // equivalent to the other overload using Encoding.UTF8.GetString
    public static SseEnumerable<T> Parse<T>(Stream sseStream, SseItemParser<T> itemParser);
}

public delegate T SseItemParser<out T>(string eventType, ReadOnlySpan<byte> data);

public sealed class SseEnumerable<T> : IAsyncEnumerable<SseItem<T>>, IEnumerable<SseItem<T>>
{
    public IEnumerator<SseItem<T>> GetEnumerator();
    public IAsyncEnumerator<SseItem<T>> GetAsyncEnumerator(CancellationToken cancellationToken = default);

    public string LastEventId { get; }
    public TimeSpan ReconnectionInterval { get; }
}
  • Ownership of the Stream is not transferred to the enumerable. Parse{Async} will not dispose of the stream, nor will the returned enumerable, and it is up to the consumer to dispose of the stream when all use of it is done.
  • The enumerable will only be usable for a single enumeration. Calling Get{Async}Enumerable multiple times will fail. We could make it work in the future if really important, but for that to make sense you'd need to be working with a Stream that was seekable, and that's rare for these scenarios, which are typically networking-based.
  • ReconnectionInterval defaults to Timeout.InfiniteTimeSpan. The SSE specification states that if no retry is specified in the stream, the interval may be chosen by the client. By using InfiniteTimeSpan, this conveys to a client consuming this parser that they may choose their own value.
  • LastEventId defaults to empty string, per the SSE spec.

Open Issues

  • Single type that implements both IAsyncEnumerable and IEnumerable?
  • Single method that returns that single type? Naming?
  • ReconnectionInterval / LastEventId on each event or on the enumerable?
  • Assembly: netstandard2.0 support is needed for a variety of consumers, including the Azure SDK and Semantic Kernel. Proposal is to ship this as a new System.Formats.Sse nuget package that includes a netstandard2.0 asset. If there's an existing package it'd make sense to include this in, we could do that instead.

API Usage

HttpClient client = ...;
using Stream responseStream = await client.GetStreamAsync(...);
await foreach (SseItem<string> item in SseParser.Parse(responseStream, (_, bytes) => Encoding.Utf8.GetString(bytes)))
{
    Console.WriteLine(item.Data);
}
HttpClient client = ...;
using Stream responseStream = await client.GetStreamAsync(...);
await foreach (SseItem<T> item in SseParser.Parse(responseStream, (_, bytes) => JsonSerializer.Deserialize<T>(bytes)))
{
    Console.WriteLine(item.Data);
}

Alternative Designs

  • Always returning a byte[] instead of a callback that parses a span into a T
  • Also exposing a Serialize/Format method to put SseItem<T> instances onto a Stream, which ASP.NET could then use. This can come later when needed by ASP.NET.

Risks

  • We should have equivalent support in ASP.NET for serializing out an SSE stream, and it should use whatever shared SseItem<T> type is in the shared libraries, so we need to ensure it's designed accordingly.

Metadata

Metadata

Assignees

Labels

api-approvedAPI was approved in API review, it can be implementedarea-System.NetblockingMarks issues that we want to fast track in order to unblock other important workin-prThere is an active PR which will close this issue when it is merged

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions