Introduction
In a previous article, I demonstrated basic usage of Proto.Actor - actor model framework for .NET. This article builds a more complex example: a TCP socket server using three actors to handle connections, byte reception, and data processing.
Project Overview
Core Actors
-
WaitForTcpConnectionActor
:- Listens for new TCP connections.
- Spawns
ReceiveBytesActor
for each connection.
-
ReceiveBytesActor
:- Receives bytes from the socket.
- Spawns
ProcessActor
to deserialize, log data and restarts it up to 3 times on failure.
-
ProcessActor
:- Deserializes incoming bytes into a Sample object and prints it to the console.
Requirements
- .NET 8+
- NuGet Packages :
Starting the Actor System
Set up the actor system to spawn the WaitForTcpConnectionActor
and stop it when pressing Ctrl+C
:
using Proto;
using TcpServer;
var system = new ActorSystem();
var cancellationTokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, _) =>
{
cancellationTokenSource.Cancel();
};
system.Root.Spawn(Props.FromProducer(() => new WaitForTcpConnectionActor(9091)));
while (!cancellationTokenSource.IsCancellationRequested)
{
await Task.Delay(1_000);
}
await system.ShutdownAsync();
Wait For TCP Connection Actor
The actor model works by having actors communicate via messages. We’ll use built-in messages like Started
and Terminated
, as well as a custom message WaitForNextConnection
.
As shown in the previous article, an actor can be defined as:
public class WaitForTcpConnectionActor(int port) : IActor
{
public async Task ReceiveAsync(IContext context)
{
}
}
Starting TCP Listener
The first step is to start our TCP server. For this, we use the Started
message:
public class WaitForTcpConnectionActor(int port) : IActor
{
private TcpListener? _listener;
public async Task ReceiveAsync(IContext context)
{
if(context.Message is Started)
{
Console.WriteLine("Listening on port 9091");
_listener = TcpListener.Create(port);
_listener.Start();
}
}
}
Then wait for a connection, for this we will send a message to the actor itself:
public class WaitForTcpConnectionActor(int port) : IActor
{
private TcpListener? _listener;
public async Task ReceiveAsync(IContext context)
{
if(context.Message is Started)
{
_listener = TcpListener.Create(port);
_listener.Start();
context.Send(context.Self, new WaitForNextConnection());
}
}
}
Waiting for a TCP connection
Now that we’re listening for connections, we need to accept them and spawn a new actor to process each one:
public class WaitForTcpConnectionActor(int port) : IActor
{
...
public async Task ReceiveAsync(IContext context)
{
if(context.Message is Started)
{
...
}
else if(context.Message is WaitForNextConnection)
{
var socket = await _listener!.AcceptSocketAsync(cancellationToken);
var actor = context.Spawn(Props.FromProducer(() => new ReceiveBytesActor()))
.WithChildSupervisorStrategy(new OneForOneStrategy(
(_, exception) =>
{
Console.WriteLine("Error: {0}", exception);
return SupervisorDirective.Restart;
},
3,
TimeSpan.FromSeconds(1)));;
context.Send(actor, new SocketAccepted(socket));
context.Send(context.Self, new WaitForNextConnection());
}
}
}
Actor Supervision
We configure a OneForOneStrategy
to supervise ReceiveBytesActor
instances:
If a child actor fails, it will restart up to 3 times within 1 second.
This ensures transient errors (e.g., malformed messages) don’t crash the entire system.
Notifying Completion
When processing completes, the parent actor receives a ProcessCompleted
message from the child actor. This signals the parent to stop the child actor explicitly, ensuring proper resource cleanup and avoiding memory leaks.
public class WaitForTcpConnectionActor(int port) : IActor
{
...
public async Task ReceiveAsync(IContext context)
{
if(context.Message is Started)
{
...
}
else if(context.Message is { Message: Terminated, Sender: not null }))
{
_listener?.Dispose();
}
else if(context.Message is ProcessCompleted)
{
await context.StopAsync(Sender);
}
else if(context.Message is WaitForNextConnection)
{
...
}
}
}
Resource Cleanup
When a connection is processed:
- The
ProcessCompleted
message signals completion. - The parent actor stops the child and triggers cleanup via terminating the actor.
Graceful Shutdown
When the actor system shuts down, we must properly dispose of the TCP listener to avoid resource leaks.
public class WaitForTcpConnectionActor(int port) : IActor
{
...
public async Task ReceiveAsync(IContext context)
{
if(context.Message is Started)
{
...
}
else if(context.Message is { Message: Terminated, Sender: not null }))
{
_listener?.Dispose();
}
else if(context.Message is ProcessCompleted)
{
....
}
else if(context.Message is WaitForNextConnection)
{
...
}
}
}
Receiving bytes
The next step is receiving bytes from a socket.
Handling SocketAccepted
When a new connection is accepted, the actor stores the socket and reads available bytes.
public class ReceiveBytesActor : IActor
{
private Socket? _socket;
private byte[]? _buffer;
public async Task ReceiveAsync(IContext context)
{
if(context.Message is SocketAccepted socket)
{
_socket = socket;
_buffer = new byte[_socket.Available];
await _socket.ReceiveAsync(_buffer);
var props = Props.FromProducer(() => new ProcessActor());
var actor = context.SpawnNamed(props, "json-serializer");
context.Send(actor, new SocketReceived(_buffer!));
}
}
}
Notifying Completion
After processing, the actor stops the child ProcessActor
and notifies its parent to release resources.
public class ReceiveBytesActor : IActor
{
...
public async Task ReceiveAsync(IContext context)
{
if(context.Message is SocketAccepted socket)
{
...
}
else if(context.Message is ProcessCompleted)
{
await context.StopAsync(Sender);
context.Send(context.Parent!, new ProcessCompleted());
}
}
}
Closing the Socket Gracefully
When the actor terminates, it disposes of the socket and stops all child actors to prevent leaks.
public class ReceiveBytesActor : IActor
{
...
public async Task ReceiveAsync(IContext context)
{
if(context.Message is Terminated)
{
_buffer = null;
_socket?.Dispose();
await context.Children.StopMany(context);
}
else if(context.Message is SocketAccepted socket)
{
...
}
else if(context.Message is ProcessCompleted)
{
...
}
}
}
Resend Buffer Received
If the ProcessActor
fails and restarts, the ReceiveBytesActor
resends the buffered data to reprocess it:
public class ReceiveBytesActor : IActor
{
...
public async Task ReceiveAsync(IContext context)
{
if(context.Message is Terminated)
{
...
}
else if(context.Message is SocketAccepted socket)
{
...
}
else if(context.Message is ProcessCompleted)
{
...
}
else if(context.Message is ResendBufferReceived)
{
context.Send(Sender, new ResendBufferReceived(_buffer!));
}
}
}
Process Actor
The final actor deserializes and logs the data
BufferReceived Message
The BufferReceived
message contains the raw bytes from the socket. This actor deserializes the data into a Sample
object and prints it to the console. After processing, it notifies the parent actor (ReceiveBytesActor
) via a ProcessCompleted
message to clean up resources.
public class ProcessActor : IActor
{
public Task ReceiveAsync(IContext context)
{
if (context.Message is BufferReceived socketReceived)
{
var json = JsonSerializer.Deserialize<Sample>(socketReceived.Data)!;
Console.WriteLine("Received sample with id: {0} and name: {1}", json.Id, json.Name);
context.Send(context.Parent!, new ProcessCompleted(context.Self));
}
return Task.CompletedTask;
}
}
Restarting
When an actor is restarted, Proto.Actor
sends the Restarting
message to the actor itself. This gives the actor an opportunity to notify its parent to retransmit the original message (or state) so the restarted actor can reprocess it.
public class ProcessActor : IActor
{
public Task ReceiveAsync(IContext context)
{
if (context.Message is Restarting)
{
context.Send(context.Parent!, new ResendBufferReceived());
}
else if (context.Message is BufferReceived socketReceived)
{
...
}
return Task.CompletedTask;
}
}
TCP Client
Finally, let’s implement a simple TCP client that sends user input (converted to JSON) to the server:
using System.Net.Sockets;
using System.Text.Json;
using TcpServer.Client;
var id = 0;
while (true)
{
Console.Write("Type a name (q to quit/f to non json): ");
var name = Console.ReadLine();
if (string.IsNullOrWhiteSpace(name))
{
continue;
}
if (name == "q")
{
break;
}
try
{
var connection = new TcpClient();
await connection.ConnectAsync("localhost", 9091);
var stream = connection.GetStream();
if (name == "f")
{
await stream.WriteAsync(new[] { (byte)'f' });
}
else
{
await JsonSerializer.SerializeAsync(stream, new Sample
{
Id = id++,
Name = name,
});
}
connection.Close();
}
catch (Exception e)
{
Console.WriteLine("Error: {0}",e);
}
}
Conclusion
The Proto.Actor
library is a powerful tool for building fault-tolerant systems, demonstrating how the actor model simplifies concurrency, resource management, and error recovery. By implementing a TCP socket server with three actors (WaitForTcpConnectionActor
, ReceiveBytesActor
, and ProcessActor
), we’ve explored key concepts such as:
- Supervision Strategies: Using
OneForOneStrategy
to restart failed actors up to 3 times, ensuring transient errors don’t crash the system. - Actor Lifecycle Management: Handling messages like
Started
,Terminated
, andRestarting
to manage state and resources safely. - Message Resilience: Retrying failed operations via
ResendSocketAccepted
andProcessCompleted
to maintain data integrity.
Learning vs. Production Trade-offs
This example intentionally simplifies complex scenarios for educational purposes. For instance:
- Using
_socket.Available
: While convenient for demos, it’s unreliable in production due to potential partial reads. A better approach involves dynamic buffering (e.g.,MemoryStream
) to handle variable-length data. - Error Handling: The current implementation lacks robust exception handling for edge cases (e.g., malformed input). In production, wrap socket operations in try-catch blocks and log errors systematically.
- Resource Cleanup: The
ProcessCompleted
message ensures sockets and actors are disposed of properly, but in real systems, consider adding timeouts or heartbeat checks to avoid orphaned connections.
Top comments (0)