Introdução
Em artigos anterior, demonstrei o uso básico do Proto.Actor, um framework para o Modelo de Atores em .NET. Neste artigo, vamos construir um exemplo mais complexo: um servidor TCP usando três atores para gerenciar conexões, recepção de bytes e processamento de dados.
Visão Geral do Projeto
Atores Principais
-
WaitForTcpConnectionActor:
- Escuta por novas conexões TCP.
- Cria uma instância de
ReceiveBytesActor
para cada conexão.
-
ReceiveBytesActor:
- Recebe bytes do socket.
- Cria uma instância de
ProcessActor
para desserializar, logar os dados e reiniciá-lo até 3 vezes em caso de falha.
-
ProcessActor:
- Desserializa os bytes recebidos em um objeto
Sample
e o imprime no console.
- Desserializa os bytes recebidos em um objeto
Requisitos
- .NET 8+
- Pacotes NuGet:
Iniciando o Sistema de Atores
Configuração do sistema de atores para criar o WaitForTcpConnectionActor
e encerrá-lo ao pressionar Ctrl+C
:
using Proto;
using TcpServer;
var system = new ActorSystem();
var cancellationTokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, _) =>
{
cancellationTokenSource.Cancel();
};
system.Root.Spawn(Props.FromProducer(() => new WaitForTcpConnectionActor(9091)));
while (!cancellationTokenSource.IsCancellationRequested)
{
await Task.Delay(1_000);
}
await system.ShutdownAsync();
Ator de Espera por Conexão TCP
O Modelo de Atores funciona com os atores se comunicando por meio de mensagens. Vamos usar mensagens fornecido como Started e Terminated, além de nossas proprias messagens como a WaitForNextConnection.
Conforme mostrado no artigo anterior, um ator pode ser definido da seguinte forma:
public class WaitForTcpConnectionActor(int port) : IActor
{
public async Task ReceiveAsync(IContext context)
{
}
}
Iniciando o Listener TCP
O primeiro passo é iniciar o servidor TCP usando a mensagem Started
:
public class WaitForTcpConnectionActor(int port) : IActor
{
private TcpListener? _listener;
public async Task ReceiveAsync(IContext context)
{
if (context.Message is Started)
{
Console.WriteLine("Escutando na porta 9091");
_listener = TcpListener.Create(port);
_listener.Start();
}
}
}
Aguardando Conexões
O Modelo de Atores funciona com os atores se comunicando por meio de mensagens. Vamos usar mensagens integradas como Started
e Terminated, além de uma mensagem personalizada chamada WaitForNextConnection
.
Conforme mostrado no artigo anterior, um ator pode ser definido da seguinte forma:
public class WaitForTcpConnectionActor(int port) : IActor
{
public async Task ReceiveAsync(IContext context)
{
}
}
Iniciando o TCP Listener
O primeiro passo é iniciar nosso servidor TCP. Para isso, usamos a mensagem Started
:
public class WaitForTcpConnectionActor(int port) : IActor
{
private TcpListener? _listener;
public async Task ReceiveAsync(IContext context)
{
if(context.Message is Started)
{
Console.WriteLine("Listening on port 9091");
_listener = TcpListener.Create(port);
_listener.Start();
}
}
}
Em seguida, aguarde uma conexão, enviando uma mensagem ao próprio ator:
public class WaitForTcpConnectionActor(int port) : IActor
{
private TcpListener? _listener;
public async Task ReceiveAsync(IContext context)
{
if(context.Message is Started)
{
_listener = TcpListener.Create(port);
_listener.Start();
context.Send(context.Self, new WaitForNextConnection());
}
}
}
Aguardando uma Conexão TCP
Agora que estamos escutando conexões, precisamos aceitá-las e criar novos atores para processar cada uma:
public class WaitForTcpConnectionActor(int port) : IActor
{
...
public async Task ReceiveAsync(IContext context)
{
if(context.Message is Started)
{
...
}
else if(context.Message is WaitForNextConnection)
{
var socket = await _listener!.AcceptSocketAsync(cancellationToken);
var actor = context.Spawn(Props.FromProducer(() => new ReceiveBytesActor()))
.WithChildSupervisorStrategy(new OneForOneStrategy(
(_, exception) =>
{
Console.WriteLine("Error: {0}", exception);
return SupervisorDirective.Restart;
},
3,
TimeSpan.FromSeconds(1)));;
context.Send(actor, new SocketAccepted(socket));
context.Send(context.Self, new WaitForNextConnection());
}
}
}
Supervisão de Atores
Configuramos uma estratégia OneForOneStrategy para supervisionar instâncias de ReceiveBytesActor
:
Se um ator filho falhar, ele será reiniciado até 3 vezes em 1 segundo.
Isso garante que erros transitórios (ex: mensagens malformadas) não derrubem todo o sistema.
Notificando Conclusão
Quando o processamento é concluído, o ator pai recebe a mensagem ProcessCompleted
. Isso sinaliza ao pai para parar explicitamente o ator filho, garantindo liberação adequada de recursos e evitando vazamentos de memória.
public class WaitForTcpConnectionActor(int port) : IActor
{
...
public async Task ReceiveAsync(IContext context)
{
if(context.Message is Started)
{
...
}
else if(context.Message is { Message: Terminated, Sender: not null }))
{
_listener?.Dispose();
}
else if(context.Message is ProcessCompleted)
{
await context.StopAsync(Sender);
}
else if(context.Message is WaitForNextConnection)
{
...
}
}
}
Liberação de Recursos
Quando uma conexão é processada:
- A mensagem ProcessCompleted indica conclusão.
- O ator pai para o filho e aciona a liberação de recursos.
Desligamento Elegante
Ao desligar o sistema de atores, devemos liberar corretamente o TcpListener
para evitar vazamentos de recursos:
public class WaitForTcpConnectionActor(int port) : IActor
{
...
public async Task ReceiveAsync(IContext context)
{
if(context.Message is Started)
{
...
}
else if(context.Message is { Message: Terminated, Sender: not null }))
{
_listener?.Dispose();
}
else if(context.Message is ProcessCompleted)
{
....
}
else if(context.Message is WaitForNextConnection)
{
...
}
}
}
Recebendo Bytes
O próximo passo é receber bytes de um socket.
Tratando SocketAccepted
Quando uma nova conexão é aceita, o ator armazena o socket e lê os bytes disponíveis:
public class ReceiveBytesActor : IActor
{
private Socket? _socket;
private byte[]? _buffer;
public async Task ReceiveAsync(IContext context)
{
if(context.Message is SocketAccepted socket)
{
_socket = socket;
_buffer = new byte[_socket.Available];
await _socket.ReceiveAsync(_buffer);
var props = Props.FromProducer(() => new ProcessActor());
var actor = context.SpawnNamed(props, "json-serializer");
context.Send(actor, new SocketReceived(_buffer!));
}
}
}
Notificando Conclusão
Após o processamento, o ator para o ProcessActor
filho e notifica seu pai para liberar recursos:
public class ReceiveBytesActor : IActor
{
public async Task ReceiveAsync(IContext context)
{
if(context.Message is SocketAccepted socket)
{
// Lógica de recepção de bytes
}
else if(context.Message is ProcessCompleted)
{
await context.StopAsync(Sender);
context.Send(context.Parent!, new ProcessCompleted());
}
}
}
Encerramento Elegante do Socket
Quando o ator é encerrado, ele descarta o socket e para todos os atores filhos para evitar vazamentos de memória:
public class ReceiveBytesActor : IActor
{
public async Task ReceiveAsync(IContext context)
{
if(context.Message is Terminated)
{
_buffer = null;
_socket?.Dispose();
await context.Children.StopMany(context);
}
else if(context.Message is SocketAccepted socket)
{
// Lógica de recepção de bytes
}
else if(context.Message is ProcessCompleted)
{
// Notificação de conclusão
}
}
}
Reenviando o Buffer Recebido
Se o ProcessActor
falhar e reiniciar, o ReceiveBytesActor
reenvia o buffer armazenado para reprocessamento:
public class ReceiveBytesActor : IActor
{
public async Task ReceiveAsync(IContext context)
{
if(context.Message is Terminated)
{
// Liberação de recursos
}
else if(context.Message is SocketAccepted socket)
{
// Lógica de recepção de bytes
}
else if(context.Message is ProcessCompleted)
{
// Notificação de conclusão
}
else if(context.Message is ResendBufferReceived)
{
context.Send(Sender, new ResendBufferReceived(_buffer!));
}
}
}
Processamento de Dados
O último ator desserializa e registra os dados recebidos.
Mensagem BufferReceived
A mensagem BufferReceived
contém os bytes brutos do socket. Este ator desserializa os dados em um objeto Sample
e imprime no console. Após o processamento, ele notifica o ator pai (ReceiveBytesActor
) por meio da mensagem ProcessCompleted
para liberar recursos:
public class ProcessActor : IActor
{
public Task ReceiveAsync(IContext context)
{
if (context.Message is BufferReceived socketReceived)
{
var json = JsonSerializer.Deserialize<Sample>(socketReceived.Data)!;
Console.WriteLine("Recebida com ID: {0} e nome: {1}", json.Id, json.Name);
context.Send(context.Parent!, new ProcessCompleted(context.Self));
}
return Task.CompletedTask;
}
}
Reinicialização (Restarting
)
Quando um ator é reiniciado, o Proto.Actor
envia a mensagem Restarting
para o próprio ator. Isso permite que o ator notifique seu ator pai para retransmitir a mensagem original (ou estado), garantindo que o ator reiniciado possa reprocessá-la:
public class ProcessActor : IActor
{
public Task ReceiveAsync(IContext context)
{
if (context.Message is Restarting)
{
context.Send(context.Parent!, new ResendBufferReceived());
}
else if (context.Message is BufferReceived socketReceived)
{
...
}
return Task.CompletedTask;
}
}
Cliente TCP
Implementação de um cliente TCP simples que envia entrada do usuário (convertida em JSON):
using System.Net.Sockets;
using System.Text.Json;
using TcpServer.Client;
var id = 0;
while (true)
{
Console.Write("Digite um nome (q para sair/f para não serializar): ");
var name = Console.ReadLine();
if (string.IsNullOrWhiteSpace(name))
{
continue;
}
if (name == "q")
{
break;
}
try
{
var connection = new TcpClient();
await connection.ConnectAsync("localhost", 9091);
var stream = connection.GetStream();
if (name == "f")
{
await stream.WriteAsync(new[] { (byte)'f' });
}
else
{
await JsonSerializer.SerializeAsync(stream, new Sample
{
Id = id++,
Name = name
});
}
connection.Close();
}
catch (Exception e)
{
Console.WriteLine("Erro: {0}", e.Message);
}
}
Conclusão
O Proto.Actor é uma ferramenta poderosa para construir sistemas tolerantes a falhas, demonstrando como o Modelo de Atores simplifica concorrência, gerenciamento de recursos e recuperação de erros. Neste exemplo, exploramos:
-
Estratégias de Supervisão: Uso de
OneForOneStrategy
para reiniciar atores falhos até 3 vezes. -
Gerenciamento do Ciclo de Vida dos Atores: Tratamento de mensagens como
Started
,Terminated
eRestarting
. -
Resiliência de Mensagens: Retentativas de operações falhas via
ResendSocketAccepted
eProcessCompleted
.
Considerações para Produção
-
Leitura de Bytes:
- O método
_socket.Available
é útil para demonstrações, mas pode ser inconsistente em produção. UseMemoryStream
para lidar com dados de tamanho variável.
- O método
-
Tratamento de Erros:
- Em sistemas reais, encapsule operações de socket em blocos
try-catch
e implemente logs detalhados.
- Em sistemas reais, encapsule operações de socket em blocos
-
Liberação de Recursos:
- Use timeouts ou verificações de heartbeat para evitar conexões órfãs.
Top comments (0)