A gRPC stream client can send multiple messages to a server over a single, long-lived connection, and the server can process these messages asynchronously.
Here’s a go example of a client sending a stream of requests:
package main
import (
"context"
"fmt"
"io"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "your_module_path/proto" // Assuming you have a proto file named 'your_service.proto'
)
const (
address = "localhost:50051"
)
func main() {
// Set up a connection to the server.
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewYourServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Create a stream client
stream, err := c.ProcessStream(ctx)
if err != nil {
log.Fatalf("could not create stream: %v", err)
}
// Send multiple messages
for i := 0; i < 5; i++ {
req := &pb.StreamRequest{
Message: fmt.Sprintf("Message %d", i),
}
if err := stream.Send(req); err != nil {
log.Fatalf("failed to send message: %v", err)
}
log.Printf("Sent: %s", req.Message)
time.Sleep(500 * time.Millisecond) // Simulate some delay between sends
}
// Close the stream to signal completion of sending
if err := stream.CloseSend(); err != nil {
log.Fatalf("failed to close send stream: %v", err)
}
// Receive responses from the server
for {
resp, err := stream.Recv()
if err == io.EOF {
break // Server finished sending responses
}
if err != nil {
log.Fatalf("failed to receive response: %v", err)
}
log.Printf("Received: %s", resp.Message)
}
log.Println("Stream processing finished.")
}
This client establishes a connection, then initiates a bidirectional stream. It sends five StreamRequest messages, waits a bit between each, signals the end of its sending with CloseSend(), and then enters a loop to receive all the responses from the server until the server closes its end of the stream (io.EOF).
The core problem this solves is efficient, continuous communication between a client and server where multiple discrete requests need to be sent over a single, persistent connection without the overhead of establishing a new connection for each request. This is particularly useful for scenarios like real-time data feeds, chat applications, or large batch processing where individual messages are small but the total volume is high.
Internally, gRPC leverages HTTP/2 for its transport layer. This allows for multiplexing multiple logical streams over a single TCP connection. When you call c.ProcessStream(ctx), gRPC sets up a new HTTP/2 stream on the underlying connection. The stream.Send() calls write frames to this HTTP/2 stream, and stream.Recv() reads frames. CloseSend() translates to sending a RST_STREAM frame with a specific code indicating the client has finished sending, allowing the server to know when to stop reading from its perspective of the stream.
The pb.YourServiceClient and pb.StreamRequest types are generated from your Protocol Buffers definition file (e.g., your_service.proto). A typical definition for this would look something like:
syntax = "proto3";
package pb;
service YourService {
rpc ProcessStream (stream StreamRequest) returns (stream StreamResponse) {}
}
message StreamRequest {
string message = 1;
}
message StreamResponse {
string message = 1;
}
The server-side implementation would mirror this, typically using a for loop to receive messages from the stream until io.EOF is encountered, processing each message, and then sending back responses.
The key levers you control are:
context.Context: This governs the lifecycle of the RPC. Cancellation of the context will abort the stream on both client and server. Deadlines within the context can enforce time limits.stream.Send(): The mechanism for pushing messages from the client to the server.stream.Recv(): The mechanism for pulling messages from the server to the client.stream.CloseSend(): Explicitly signals to the server that the client has no more messages to send. This is crucial for bidirectional streams where the server needs to know when to stop reading.- Error Handling: Specifically checking for
io.EOFonRecv()is how you detect the server has finished sending its stream of responses.
A common point of confusion is when to call CloseSend(). For a client sending a stream of requests, you must call CloseSend() to indicate you are done sending. If you don’t, the server will never receive the io.EOF on its Recv() call and will likely keep waiting indefinitely, potentially leading to resource exhaustion or timeouts. Conversely, if the server is also sending a stream of responses, it will eventually call CloseSend() on its end, which the client will see as io.EOF when calling stream.Recv().
The next concept you’ll likely encounter is handling server-side streaming RPCs, where the client sends a single request and the server returns a stream of responses.