Efficient Large Data Transfer with gRPC Streaming
This project demonstrates an efficient method for transferring large files or data payloads between microservices using gRPC server streaming in Go. It serves as a gRPC-based alternative to traditional HTTP Range request solutions (like the one detailed in https://dev.to/dialaeke/streaming-large-files-between-microservices-a-go-implementation-4n62). It directly addresses the challenge posed by Sumit Mukhija in this tweet:
"Your microservice needs to transfer large amounts of data (e.g., files, images) between services. How do you design the communication to avoid performance bottlenecks and manage large payloads efficiently?"
The core challenge, often faced in microservice architectures, is transferring significant amounts of data (files, images, datasets) without causing performance bottlenecks, excessive memory consumption, or network congestion. This implementation leverages gRPC's strengths to provide a robust and efficient solution.
See project repo here
Why gRPC for Large File Transfers?
While HTTP Range requests offer a viable solution, gRPC provides several advantages in this context:
- Native Streaming: gRPC has first-class support for server-side streaming, making it natural to send data in chunks without complex application-level logic for chunk management.
- Efficiency (HTTP/2): gRPC typically runs over HTTP/2, benefiting from features like multiplexing and header compression, which can improve network efficiency compared to HTTP/1.1.
- Type Safety & Schema Definition: Using Protocol Buffers (
.proto
files) enforces a clear contract between client and server, reducing integration errors. - Performance: The binary nature of Protocol Buffers and the efficiency of HTTP/2 often lead to better performance than text-based protocols like JSON over HTTP/1.1.
This project specifically showcases:
- Memory Efficiency: Avoids loading large files into memory on either client or server using streaming and efficient buffer management.
- Resumable Downloads: Client tracks progress and requests the next chunk from the correct offset.
- Performance: Optimizes I/O using
io.CopyN
and pooledbytes.Buffer
.
Key Features (gRPC Implementation)
- gRPC Server Streaming: The server streams file chunks to the client using a single RPC call (
StreamFile
). - Protocol Buffers: Defines the service contract (
TransferService
) and message formats (StreamFileRequest
,StreamFileResponse
, etc.) inproto/transfer.proto
. - Chunked Transfer: Data is inherently sent in chunks managed by the server loop. Chunk size is configurable but capped on the server.
- Resumable Downloads: The client requests the download starting from the last received byte offset (
Start
field inStreamFileRequest
). - Memory Efficiency: Uses a
sync.Pool
ofbytes.Buffer
on the server combined withio.CopyN
to efficiently read and buffer data without excessive allocations. The client usesio.Copy
for efficient disk writes. - Type Safety: Leverages Go types generated from the
.proto
definition. - Selective gzip compression: The server intelligently compresses individual chunks based on file type, only when beneficial and supported by the client.
How it works
The client sends the first RPC call to get the file size. This is useful to implement resumability and to ensure we do not waste bandwidth on already downloaded files:
// os.O_APPEND allows us to append to the end of the file without
// manually seeking to it
file, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
exit("os.OpenFile", err)
}
defer file.Close()
info, err := file.Stat()
if err != nil {
exit("file.Stat", err)
}
fileSizeCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
client := transferpb.NewTransferClient(conn)
res, err := client.GetFileSize(fileSizeCtx, &transferpb.GetFileSizeRequest{FileName: fileName})
if err != nil {
exit("client.GetFileSize()", err)
}
if info.Size() >= res.Size {
slog.Info("file already downloaded")
return
}
Now, we know we haven't downloaded the file before, we can make the second RPC call to stream the data from the server. We set up the stream and inform the server where we want to resume the download from, our preferred chunk size, the file name, and if we support compression.
stream, err := client.StreamFile(ctx, &transferpb.StreamFileRequest{
Start: info.Size(),
ChunkSize: ChunkSize,
FileName: fileName,
CanDecompress: decompress,
})
if err != nil {
exit("client.StreamFile", err)
}
Then we start streaming. We use io.Copy to write to the file in order to take advantage of its internal buffer. This helps us reduce the number of syscalls we make while writing to the file. This allows us to optimize CPU usage.
for {
r, streamErr = stream.Recv()
if streamErr != nil {
break
}
// if file is compressed, we decompress it first before reading
if r.Compressed {
gr, err := gzip.NewReader(bytes.NewReader(r.Chunk))
if err != nil {
slog.Error("gzip decompression failed", "offset", bytesDownloaded, "err", err)
exit("gzip.NewReader", err)
}
defer gr.Close()
chunkReader = gr
} else {
chunkReader = bytes.NewReader(r.Chunk)
}
_, streamErr = io.Copy(file, chunkReader)
if streamErr != nil {
break
}
}
Server
Client sends us an RPC call and we respond with the size of the file. This response is used by the client for different purposes but mainly to know if this is a partially downloaded file so we can resume the download. It can theoretically be used to choose the optimal chunk size to optimize the overhead of network requests.
func (t *TransferService) GetFileSize(ctx context.Context, req *transferpb.GetFileSizeRequest) (*transferpb.GetFileSizeResponse, error) {
if strings.Contains(req.FileName, "..") {
return nil, status.Error(codes.InvalidArgument, "Invalid file name")
}
dir, err := os.Getwd()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
fileName := path.Join(dir, ContentFolderName, req.FileName)
file, err := os.Open(fileName)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &transferpb.GetFileSizeResponse{Size: info.Size()}, nil
}
The client can now send us a subsequent RPC call to ask us to stream the file to them. The client will tell us the name of the file, where to resume the stream from, their preferred chunk size, and if they support compression. To keep the system memory efficient, we do not always use the client's preferred chunk size. We have a max chunk size that we allow.
To reduce memory consumption and reduce GC pressure, we use a sync pool for our buffers. The pool allows us to reuse allocated buffers. The buffers are the temporary data structures we use to extract chunks of the file before sending it to the stream. These buffers are key to ensuring we never read all of the file into memory at any time.
Define pool
var bufPool = sync.Pool{
New: func() any {
return &bytes.Buffer{}
},
}
Each time a client request comes in, we go to our pool to "get" a buffer. After streaming we put the buffer back in the pool so other requests can reuse them. I have read that this is not a silver lining so use these pools with caution. The buffer we get has a default size. This is by design. We want to be precise in our allocations, we do not want to over-allocate, or allocate multiple times. We are going to allocate precisely what we need depending on the chunk size.
buf, _ := bufPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset()
bufPool.Put(buf)
}()
chunkSize := min(req.ChunkSize, MaxChunkSize)
buf.Grow(int(chunkSize))
Now, we begin to read the file in chunks. Each chunk we read we send to the client. If the file benefits from compression, we compress it before sending:
for {
if shouldCompress {
w.Reset(buf) // reset gzipWriter
n, err := copyData(w)
if err != nil && n <= 0 {
if err == io.EOF {
return nil
}
return status.Error(codes.Internal, err.Error())
}
err = w.Close() // flush contents to buf
if err != nil {
return status.Error(codes.Internal, err.Error())
}
err = sendResponse(true)
if err != nil {
if err == io.EOF {
return nil
}
return status.Error(codes.Internal, err.Error())
}
} else {
n, err := copyData(buf)
if err != nil && n <= 0 {
if err == io.EOF {
return nil
}
return status.Error(codes.Internal, err.Error())
}
err = sendResponse(false)
if err != nil {
if err == io.EOF {
return nil
}
return status.Error(codes.Internal, err.Error())
}
}
buf.Reset()
}
The key to all these working is the copyData function. This is where we efficiently read data from the file into our buffer for transmission. Because io.CopyN has an internal buffer, we leverage it to efficiently read from the file system. In each iteration, we efficiently read the specified chunk size amount of data into the our buffer and send it to the client. This way we again reduce the number of syscalls due to file reads while never reading the entire file into memory.
copyData := func(w io.Writer) (int64, error) {
n, err := io.CopyN(w, file, int64(chunkSize))
if err != nil && n <= 0 {
if err == io.EOF {
return n, io.EOF
}
return n, status.Error(codes.Internal, err.Error())
}
return n, nil
}
Memory Efficiency: Real-World Profiling
To validate the efficiency of this approach, we profiled the server's memory usage during the transfer of a 2GB+ file. The results were outstanding:
Showing nodes accounting for 512.05kB, 100% of 512.05kB total
flat flat% sum% cum cum%
512.05kB 100% 100% 512.05kB 100% google.golang.org/grpc/internal/transport.(*writeQuota).get (inline)
0 0% 100% 512.05kB 100% ... (other stack frames)
What does this mean?
Extremely Low Memory Usage:
Even when transferring a file over 2GB in size, the server's peak memory usage attributable to the transfer was only about 512KB. This confirms that the implementation streams data in small chunks and never loads the entire file into memory.No Memory Leaks:
The profile shows no evidence of memory leaks or excessive buffering. All allocations are related to gRPC's internal flow control, which is expected and minimal.Scalability:
Because memory usage does not scale with file size, this service can handle many concurrent large transfers without running into memory exhaustion.
Conclusion
This project demonstrates how gRPC server streaming, combined with efficient buffer management (sync.Pool
, bytes.Buffer
, io.CopyN
), provides a powerful and performant alternative to HTTP-based methods for large file transfers between services. It offers type safety, potential performance benefits from HTTP/2, and a natural way to handle streaming data.
Top comments (0)