DEV Community

Caio Pizzol
Caio Pizzol

Posted on

Como Processar 60+ milhões de CNPJs com Python: Arquitetura e Decisões Técnicas

Spoiler: Não é só fazer pd.read_csv() e torcer que vai dar certo

Se você já tentou processar os dados públicos de CNPJ da Receita Federal, sabe que não é trivial. São 60+ milhões de empresas em 37 arquivos ZIP totalizando ~7GB comprimidos que expandem para ~21GB. Neste post, compartilho as decisões arquiteturais do cnpj-data-pipeline.

O Problema Real

A Receita Federal disponibiliza dados mensalmente com desafios únicos:

  • 37 arquivos fragmentados: 10 Empresas + 10 Estabelecimentos + 10 Sócios + 7 tabelas de referência
  • Dependências entre tabelas: Estabelecimentos referenciam Empresas, Municípios e Motivos
  • Encoding ISO-8859-1: Precisa converter para UTF-8
  • Memória limitada: Arquivos de até 2GB impossíveis de carregar inteiros
  • Formato inconsistente: Datas como "00000000", decimais com vírgula

Arquitetura: Patterns que Fazem Sentido

1. Factory Pattern para Extensibilidade

# src/database/factory.py
ADAPTERS: Dict[str, Type[DatabaseAdapter]] = {
    "postgresql": PostgreSQLAdapter,
    # Preparado para: MySQL, BigQuery, SQLite
}

def create_database_adapter(config) -> DatabaseAdapter:
    backend = config.database_backend.value
    adapter_class = ADAPTERS.get(backend)
    if not adapter_class:
        raise ValueError(f"Unsupported database backend: {backend}")
    return adapter_class(config)
Enter fullscreen mode Exit fullscreen mode

Por quê? Adicionar um novo banco é questão de implementar a interface DatabaseAdapter sem tocar no resto do código.

2. Strategy Pattern para Downloads

Com 47 arquivos, downloads sequenciais são lentos. Implementei duas estratégias:

# src/download_strategies/parallel.py
class ParallelDownloadStrategy(DownloadStrategy):
    def download_files(self, directory: str, files: List[str]) -> Iterator[Path]:
        # Separa referências de dados
        reference_files, data_files = self._categorize_files(files)

        # Referências primeiro (sequencial por segurança)
        for csv in self._download_sequential(reference_files):
            yield csv

        # Dados em paralelo
        with ThreadPoolExecutor(max_workers=4) as executor:
            futures = {
                executor.submit(self.download_single_file, dir, f): f 
                for f in data_files
            }
            for future in as_completed(futures):
                yield future.result()
Enter fullscreen mode Exit fullscreen mode

3. Ordenação Inteligente por Dependências

A ordem de processamento importa quando há foreign keys:

# src/downloader.py
def organize_files_by_dependencies(self, files: List[str]) -> Tuple[List[str], Dict]:
    # Tabelas de referência primeiro (sem FKs)
    REFERENCE_TABLES = {
        "Cnaes.zip", "Motivos.zip", "Municipios.zip", 
        "Naturezas.zip", "Paises.zip", "Qualificacoes.zip"
    }

    # Ordem respeitando dependências
    ORDERED_PATTERNS = [
        "Empresas",          # Depende de: naturezas_juridicas
        "Estabelecimentos",  # Depende de: empresas, municipios, motivos
        "Socios",           # Depende de: empresas
        "Simples",          # Depende de: empresas
    ]

    # Organiza arquivos por categoria
    reference_files = [f for f in files if f in REFERENCE_TABLES]
    data_files = {pattern: [] for pattern in ORDERED_PATTERNS}

    for filename in files:
        for pattern in ORDERED_PATTERNS:
            if filename.startswith(pattern):
                data_files[pattern].append(filename)
                break

    # Monta ordem final
    ordered_files = reference_files[:]
    for pattern in ORDERED_PATTERNS:
        ordered_files.extend(sorted(data_files[pattern]))

    return ordered_files, categorization_info
Enter fullscreen mode Exit fullscreen mode

Esta estratégia evita erros de foreign key constraint ao garantir que tabelas pai sejam carregadas antes das filhas.

Otimização de Memória

Detecção Automática de Recursos

# src/config.py
def _detect_strategy(self) -> ProcessingStrategy:
    memory_gb = psutil.virtual_memory().total / (1024**3)

    if memory_gb < 8:
        return ProcessingStrategy.MEMORY_CONSTRAINED
    elif memory_gb < 32:
        return ProcessingStrategy.HIGH_MEMORY
    else:
        return ProcessingStrategy.DISTRIBUTED

@property
def optimal_chunk_size(self) -> int:
    memory_gb = psutil.virtual_memory().total / (1024**3)

    # Chunks adaptados à memória disponível
    if self.processing_strategy == ProcessingStrategy.MEMORY_CONSTRAINED:
        return 100_000 if memory_gb < 4 else 500_000
    elif self.processing_strategy == ProcessingStrategy.HIGH_MEMORY:
        return 2_000_000
    else:
        return 5_000_000
Enter fullscreen mode Exit fullscreen mode

Processamento em Streaming com Polars

# src/processor.py
def _process_chunked(self, file_path: Path, db, table_name: str):
    chunk_size = self.config.optimal_chunk_size
    offset = 0

    while True:
        chunk_df = pl.read_csv(
            file_path,
            separator=";",
            encoding="utf8",
            has_header=False,
            skip_rows=offset,
            n_rows=chunk_size,
        )

        if len(chunk_df) == 0:
            break

        # Transformações: conversão de tipos, limpeza de datas
        chunk_df = self._apply_transformations(chunk_df, file_type)

        # Bulk insert otimizado
        db.bulk_upsert(chunk_df, table_name)

        offset += len(chunk_df)
        del chunk_df
        gc.collect()  # Força liberação de memória
Enter fullscreen mode Exit fullscreen mode

Tratamento de Dados Governamentais

Problemas Encontrados e Soluções

# Datas inválidas: "00000000" → NULL
df = df.with_columns(
    pl.when(pl.col(date_col) == "0")
    .then(None)
    .otherwise(pl.col(date_col))
)

# Decimais com vírgula: "1234,56" → 1234.56
df = df.with_columns(
    pl.col(numeric_col).str.replace(",", ".").cast(pl.Float64)
)

# Encoding corrompido: detecção e conversão automática
def _convert_file_encoding_chunked(self, input_file: Path):
    chunk_size = self.config.encoding_chunk_size  # 50MB chunks
    with open(input_file, "r", encoding="ISO-8859-1") as infile:
        with open(output_file, "w", encoding="UTF-8") as outfile:
            while chunk := infile.read(chunk_size):
                outfile.write(chunk)
Enter fullscreen mode Exit fullscreen mode

Retry com Backoff Exponencial

def retry_db_connection(max_retries=3, base_delay=1.0):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except (psycopg2.OperationalError, psycopg2.DatabaseError) as e:
                    if attempt == max_retries:
                        raise
                    delay = base_delay * (2**attempt)
                    time.sleep(delay)
        return wrapper
    return decorator
Enter fullscreen mode Exit fullscreen mode

Resultados em Produção

Configuração Tempo de Processamento
VPS 4GB RAM ~6 horas
Server 16GB RAM ~2 horas
Server 64GB+ RAM ~1 hora
  • Volume processado: 7GB → 21GB → PostgreSQL (~15GB indexado)
  • Registros: 63M empresas + 66M estabelecimentos + 26M sócios
  • Confiabilidade: Processamento mensal automatizado via cron

Lições Aprendidas

  1. Ordem importa: Processar arquivos respeitando dependências evita horas debugando constraint violations

  2. Memória > CPU: Melhor processar 100k registros por vez consistentemente do que tentar 10M e travar

  3. Polars > Pandas: Para este volume, Polars usa 3x menos memória e é 2x mais rápido

  4. Patterns justificados: Factory e Strategy não são overengineering quando você sabe que vai expandir

Próximos Passos

  1. Suporte MySQL e SQLite (em desenvolvimento)
  2. Filtros de processamento (processar apenas um estado/CNAE específico)
  3. Exportação para Parquet (para análises em Pandas/Spark)
  4. Interface natural language (cnpj.chat)

Conclusão

Processar dados em escala não é sobre força bruta - é sobre arquitetura inteligente e trade-offs conscientes. O código está disponível em github.com/cnpj-chat/cnpj-data-pipeline.

Se você está lidando com dados públicos brasileiros ou grandes volumes de CSV, espero que essas técnicas sejam úteis. E se quiser contribuir, PRs são sempre bem-vindos!


Tem alguma pergunta ou sugestão? Me encontre no GitHub ou LinkedIn. Se este post foi útil, considere dar uma estrela no projeto.

Top comments (0)