Skip to content

vishalm/AURA

Repository files navigation

AURA — Adaptive User Response Agent

Real-time, AI-powered personalization engine that adapts to every user's unique aura. Dynamically segments users and delivers hyper-personalized website experiences.

License: MIT Python 3.10+ Node 18+

AURA = Adaptive User Response Agent

Table of Contents


Overview

AURA (Adaptive User Response Agent) is a real-time personalization platform that perceives each user's unique digital aura and adapts content accordingly. The system:

  • Perceives user behavior through event streams
  • Segments users into dynamic micro-segments based on intent and behavior
  • Decides which content to show using contextual multi-armed bandits
  • Learns continuously through reinforcement learning

Key Features

  • 🎯 Real-time micro-segmentation with intent inference
  • 🤖 Contextual bandit decisioning (Thompson Sampling / LinUCB)
  • 📊 Built-in experimentation framework (A/B + MAB)
  • 🔒 Privacy-first with GDPR/CCPA/DPDP compliance
  • ⚡ <100ms p95 latency at 500+ RPS
  • 📈 10-20% conversion rate uplift

Supported Surfaces

  • Homepage hero modules
  • Blog/article index ordering
  • Product listing pages
  • Promotional banners

Architecture

System Overview

flowchart TB
    subgraph Client["Client Layer"]
        SDK[Web/App SDK]
        Tracker[Event Tracker]
    end
    
    subgraph Edge["Edge Layer"]
        Collector[Event Collector]
        CDN[CDN/Cache]
    end
    
    subgraph Data["Data Layer"]
        CDP[CDP/Analytics]
        Stream[Event Stream]
        Warehouse[Data Warehouse]
        FeatureStore[Feature Store]
        VectorDB[Vector Database]
    end
    
    subgraph AI["AI Agent Core"]
        Perception[Perception Service]
        Segmentation[Segmentation Engine]
        Decisioning[Decisioning API]
        ContentIndex[Content Index]
    end
    
    subgraph Exp["Experimentation"]
        FlagService[Feature Flags]
        Analytics[Analytics Engine]
        Learning[Learning Loop]
    end
    
    SDK --> Tracker --> Collector
    Collector --> CDP --> Stream
    Stream --> Warehouse --> FeatureStore
    Stream --> Perception
    FeatureStore --> Segmentation
    ContentIndex --> VectorDB
    Segmentation --> Decisioning
    VectorDB --> Decisioning
    Decisioning --> CDN
    CDN --> SDK
    Decisioning --> FlagService
    FlagService --> Analytics
    Analytics --> Learning
    Learning -.->|Model Updates| Segmentation
    Learning -.->|Model Updates| Decisioning
Loading

Technology Stack

Layer Technology Purpose
Client JavaScript SDK Event tracking, consent management
Edge Cloudflare Workers / Lambda@Edge Request routing, caching
API FastAPI (Python 3.10+) Decisioning & ranking service
Data Pipeline Apache Kafka / Google Pub/Sub Event streaming
Storage BigQuery / Snowflake Data warehouse
Features Feast / Tecton Feature store
Vectors Pinecone / pgvector Embedding search
Search Elasticsearch / OpenSearch Content indexing
ML scikit-learn, PyTorch Segmentation & bandits
Experiments GrowthBook / Optimizely A/B testing framework
Monitoring Datadog / Prometheus + Grafana Observability

Prerequisites

System Requirements

  • Python: 3.10 or higher
  • Node.js: 18.x or higher
  • Docker: 24.x or higher
  • Kubernetes: 1.28+ (production)

External Services

  • Data warehouse (BigQuery, Snowflake, or Redshift)
  • Feature store (Feast or Tecton)
  • Vector database (Pinecone, Weaviate, or pgvector)
  • Search engine (Elasticsearch or OpenSearch)
  • Message queue (Kafka, Pub/Sub, or Kinesis)
  • CMS with API access (Contentful, Strapi, or Sanity)

Access Requirements

  • Cloud provider account (AWS, GCP, or Azure)
  • API keys for third-party services
  • Database credentials
  • CDN configuration access

Quick Start

1. Clone & Install

# Clone repository
git clone https://github.com/your-org/ai-content-agent.git
cd ai-content-agent

# Install dependencies
make install

# Copy environment template
cp .env.example .env

2. Configure Services

Edit .env with your credentials:

# Data Services
WAREHOUSE_URL=bigquery://project/dataset
FEATURE_STORE_URL=feast://localhost:6565
VECTOR_DB_URL=pinecone://api-key@index-name

# Event Streaming
KAFKA_BROKERS=localhost:9092
KAFKA_TOPIC=content-events

# CMS
CMS_API_URL=https://api.contentful.com
CMS_API_KEY=your_api_key

3. Start Local Stack

# Start infrastructure (Kafka, Redis, Postgres)
docker-compose up -d

# Initialize databases
make db-migrate

# Start services
make run-local

4. Verify Installation

# Health check
curl http://localhost:8000/health

# Test ranking endpoint
curl -X POST http://localhost:8000/v1/rank \
  -H "Content-Type: application/json" \
  -d '{
    "surface_id": "homepage.hero",
    "user_ctx": {
      "anon_id": "test-user-123",
      "device": "mobile",
      "consent": ["analytics", "personalization"]
    }
  }'

Installation

Development Environment

# Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install Python dependencies
pip install -r requirements.txt
pip install -r requirements-dev.txt

# Install Node dependencies (for SDK)
cd sdk && npm install && cd ..

# Install pre-commit hooks
pre-commit install

Docker Development

# Build images
docker-compose build

# Start all services
docker-compose up

# Run tests
docker-compose run --rm api pytest

Production Setup

# Build production image
docker build -t ai-content-agent:latest .

# Deploy to Kubernetes
kubectl apply -f k8s/

# Verify deployment
kubectl get pods -n personalization

Configuration

Environment Variables

Core Services

# API Service
API_PORT=8000
API_WORKERS=4
API_LOG_LEVEL=info

# Feature Store
FEAST_REPO_PATH=/app/feature_repo
FEAST_ONLINE_STORE=redis://localhost:6379

# Vector DB
PINECONE_API_KEY=your_key
PINECONE_ENVIRONMENT=us-west1-gcp
PINECONE_INDEX=content-embeddings

Data Pipeline

# Event Collection
EVENT_BATCH_SIZE=1000
EVENT_FLUSH_INTERVAL=5

# Warehouse
BQ_PROJECT_ID=your-project
BQ_DATASET=personalization
BQ_CREDENTIALS_PATH=/secrets/bq-key.json

ML Configuration

# Segmentation
SEGMENT_MODEL_PATH=/models/segments/v1
SEGMENT_CONFIDENCE_THRESHOLD=0.7
SEGMENT_REFRESH_INTERVAL=3600

# Decisioning
BANDIT_ALGORITHM=thompson_sampling
EXPLORATION_RATE=0.08
DIVERSITY_CONSTRAINT=max_1_per_topic
FRESHNESS_HALF_LIFE=14

Privacy & Compliance

# Consent Management
CMP_PROVIDER=onetrust
CONSENT_REQUIRED=true
PII_SCRUBBING=enabled

# Data Retention
EVENT_RETENTION_DAYS=395
FEATURE_RETENTION_DAYS=90
MODEL_RETENTION_VERSIONS=5

Content Schema

The system expects content in the CMS with these fields:

content:
  id: string (PK)
  title: string
  url: string
  summary: text
  body: text
  
  # NLP-derived (auto-generated)
  topics: array<string>
  personas: array<string>  # beginner, intermediate, advanced
  industries: array<string>
  sentiment: float  # -1 to 1
  complexity: float  # Flesch-Kincaid grade
  
  # Metadata
  published_at: timestamp
  updated_at: timestamp
  freshness_score: float
  quality_score: float
  
  # Embeddings
  embedding: vector<float>[384]
  
  # Constraints
  jurisdictions: array<string>  # US, EU, UK, etc.
  policy_flags: array<string>  # legal, medical, age_restricted
  device_suitability: array<string>  # mobile, desktop, tablet

Policy Configuration

Create config/policies.yaml:

decisioning:
  exploration:
    global_budget: 0.08
    per_user_cap: 0.15
    throttle_high_traffic: true
  
  diversity:
    max_per_topic_cluster: 1
    min_topic_distance: 0.3
    enforce_in_top_n: 5
  
  freshness:
    decay_function: exponential
    half_life_days: 14
    minimum_score: 0.3
  
  eligibility:
    check_jurisdiction: true
    check_consent: true
    check_policy_flags: true
    check_device: true

segmentation:
  cold_start:
    use_rules: true
    use_popularity: true
    min_events: 3
  
  confidence:
    assignment_threshold: 0.7
    holdout_below: 0.5
  
  fairness:
    k_anonymity_threshold: 50
    exclude_protected_attributes: true

API Reference

Ranking API

POST /v1/rank

Get personalized content ranking for a surface.

Request:

{
  "surface_id": "homepage.hero",
  "user_ctx": {
    "anon_id": "anon_abc123",
    "session_id": "sess_xyz789",
    "device": "mobile",
    "referrer": "https://google.com",
    "page_type": "index",
    "consent": ["analytics", "personalization"],
    "geo": {
      "country": "US",
      "region": "CA"
    }
  },
  "candidates": ["c123", "c456", "c789"],  // optional
  "limit": 5,
  "context": {
    "campaign": "summer_sale",
    "time_of_day": "evening"
  }
}

Response:

{
  "request_id": "req_a1b2c3",
  "surface": "homepage.hero",
  "items": [
    {
      "content_id": "c123",
      "score": 0.81,
      "reason": "beginner+mobile+intent:learn",
      "metadata": {
        "title": "Getting Started Guide",
        "url": "/guides/getting-started",
        "topics": ["tutorial", "basics"]
      }
    },
    {
      "content_id": "c987",
      "score": 0.72,
      "reason": "case-study+finance+high-confidence",
      "metadata": {
        "title": "Finance Industry Case Study",
        "url": "/case-studies/finance"
      }
    }
  ],
  "segment": {
    "id": "seg_learn_mobile_new",
    "confidence": 0.85,
    "intent": "learn"
  },
  "experiment": {
    "variant": "treatment_b",
    "holdout": false
  },
  "policy_version": "2025-10-01",
  "cache_ttl": 300
}

Status Codes:

  • 200 - Success
  • 400 - Invalid request
  • 403 - Insufficient consent
  • 429 - Rate limit exceeded
  • 500 - Internal error

POST /v1/feedback

Submit reward signal for a ranking request.

Request:

{
  "request_id": "req_a1b2c3",
  "item_id": "c123",
  "reward": {
    "type": "click",
    "value": 1.0,
    "timestamp": "2025-10-09T14:30:00Z"
  },
  "context": {
    "position": 1,
    "dwell_time": 45.2,
    "scroll_depth": 0.8
  }
}

Response:

{
  "status": "accepted",
  "feedback_id": "fb_xyz789"
}

Segmentation API

GET /v1/segments/:user_id

Get segment assignment for a user.

Response:

{
  "user_id": "anon_abc123",
  "segments": [
    {
      "id": "seg_learn_mobile_new",
      "name": "Learning-focused Mobile Users (New)",
      "confidence": 0.85,
      "features": {
        "intent": "learn",
        "device_class": "mobile",
        "recency": "new",
        "price_sensitivity": "medium"
      },
      "assigned_at": "2025-10-09T14:25:00Z"
    }
  ]
}

Content API

GET /v1/content/search

Search content by semantic similarity.

Request:

{
  "query": "how to optimize conversion rates",
  "filters": {
    "topics": ["marketing", "analytics"],
    "personas": ["intermediate"],
    "jurisdictions": ["US", "UK"]
  },
  "limit": 10
}

Response:

{
  "results": [
    {
      "content_id": "c456",
      "title": "Conversion Rate Optimization Guide",
      "url": "/guides/cro",
      "similarity": 0.89,
      "topics": ["marketing", "analytics", "optimization"],
      "freshness_score": 0.95
    }
  ]
}

Admin API

POST /v1/admin/refresh-content

Trigger content index refresh.

POST /v1/admin/retrain-segments

Trigger segment model retraining.

GET /v1/admin/health

Detailed health check with component status.


Development

Project Structure

ai-content-agent/
├── src/
│   ├── api/                    # FastAPI application
│   │   ├── routes/             # API endpoints
│   │   ├── middleware/         # Auth, logging, etc.
│   │   └── schemas/            # Pydantic models
│   ├── agents/                 # AI Agent core
│   │   ├── perception/         # Event processing
│   │   ├── segmentation/       # User segmentation
│   │   └── decisioning/        # Ranking & bandits
│   ├── content/                # Content intelligence
│   │   ├── indexer/            # CMS sync & indexing
│   │   ├── nlp/                # NLP tagging pipeline
│   │   └── embeddings/         # Vector generation
│   ├── data/                   # Data layer
│   │   ├── warehouse/          # Warehouse connectors
│   │   ├── features/           # Feature engineering
│   │   └── stream/             # Event streaming
│   ├── ml/                     # ML models
│   │   ├── bandits/            # Bandit algorithms
│   │   ├── clustering/         # Segmentation models
│   │   └── training/           # Training pipelines
│   └── utils/                  # Shared utilities
├── sdk/                        # JavaScript SDK
│   ├── src/
│   │   ├── tracker.js          # Event tracking
│   │   ├── consent.js          # Consent management
│   │   └── client.js           # API client
│   └── examples/               # Integration examples
├── pipelines/                  # Data pipelines
│   ├── content_indexing/       # Content ETL
│   ├── feature_engineering/    # Feature computation
│   └── model_training/         # ML training jobs
├── tests/
│   ├── unit/
│   ├── integration/
│   └── e2e/
├── k8s/                        # Kubernetes manifests
├── terraform/                  # Infrastructure as code
└── docs/                       # Documentation

Local Development Workflow

# Start services
make dev

# Run tests in watch mode
make test-watch

# Lint and format
make lint
make format

# Type checking
make typecheck

# Generate API docs
make docs

Adding a New Surface

  1. Define the surface in config/surfaces.yaml:
surfaces:
  product_listing:
    type: ranking
    max_items: 20
    diversity: true
    freshness_boost: 0.2
    eligible_content_types: [product, category]
  1. Add ranking logic in src/agents/decisioning/rankers/:
class ProductListingRanker(BaseRanker):
    def rank(self, context: RankingContext) -> List[RankedItem]:
        # Implement surface-specific logic
        pass
  1. Register route in src/api/routes/ranking.py

  2. Add tests in tests/unit/rankers/test_product_listing.py

  3. Update documentation in docs/surfaces/

Creating Custom Features

# src/data/features/custom_features.py
from feast import FeatureView, Field
from feast.types import Float32, String

user_engagement_features = FeatureView(
    name="user_engagement",
    entities=["user"],
    schema=[
        Field(name="avg_session_duration", dtype=Float32),
        Field(name="pages_per_session", dtype=Float32),
        Field(name="last_visit_intent", dtype=String),
    ],
    online=True,
    source=user_engagement_source,
    ttl=timedelta(days=7),
)

Register in feature store:

feast apply

Testing

Unit Tests

# Run all unit tests
pytest tests/unit/

# Run specific test file
pytest tests/unit/agents/test_decisioning.py

# Run with coverage
pytest --cov=src tests/unit/

Integration Tests

# Start test infrastructure
docker-compose -f docker-compose.test.yml up -d

# Run integration tests
pytest tests/integration/

# Cleanup
docker-compose -f docker-compose.test.yml down

End-to-End Tests

# Deploy to test environment
make deploy-test

# Run E2E suite
pytest tests/e2e/ --env=test

# Verify metrics
make verify-metrics

Load Testing

# Run load test (requires k6)
k6 run tests/load/ranking_api.js

# Sustained load test
k6 run --vus 100 --duration 30m tests/load/sustained.js

A/B Test Validation

# Run A/A test
python scripts/run_aa_test.py --surface homepage.hero --duration 7d

# Validate metrics
python scripts/validate_experiment.py --experiment exp_123

Deployment

Kubernetes Deployment

# Deploy to staging
kubectl apply -f k8s/overlays/staging/

# Deploy to production (requires approval)
kubectl apply -f k8s/overlays/production/

# Check rollout status
kubectl rollout status deployment/ai-content-agent -n personalization

Canary Deployment

# k8s/canary.yaml
apiVersion: flagger.app/v1beta1
kind: Canary
metadata:
  name: ai-content-agent
spec:
  targetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ai-content-agent
  progressDeadlineSeconds: 600
  service:
    port: 8000
  analysis:
    interval: 1m
    threshold: 10
    maxWeight: 50
    stepWeight: 5
    metrics:
      - name: request-success-rate
        thresholdRange:
          min: 99
      - name: request-duration
        thresholdRange:
          max: 500

Deploy canary:

kubectl apply -f k8s/canary.yaml

Model Deployment

# Package model
python scripts/package_model.py --model-path models/segmentation/v2

# Upload to registry
python scripts/upload_model.py --model-id seg_v2

# Deploy model
python scripts/deploy_model.py --model-id seg_v2 --canary-pct 10

# Monitor and promote
python scripts/promote_model.py --model-id seg_v2

Database Migrations

# Create migration
alembic revision --autogenerate -m "add segment confidence column"

# Apply migration
alembic upgrade head

# Rollback
alembic downgrade -1

Monitoring

Key Metrics

API Metrics

  • Latency: p50, p95, p99 response times
  • Throughput: Requests per second
  • Error Rate: 4xx and 5xx errors
  • Cache Hit Rate: CDN and application cache

Business Metrics

  • Conversion Rate: Per surface and overall
  • Click-Through Rate: On personalized content
  • Revenue Per Session: Total and by segment
  • Segment Coverage: % of sessions with segment assignment

ML Metrics

  • Model Latency: Inference time per model
  • Prediction Quality: Confidence distributions
  • Exploration Rate: % of exploratory decisions
  • Regret: Cumulative regret curves

Dashboards

Access monitoring dashboards:

  • Operations: https://monitoring.company.com/d/api-overview
  • Business KPIs: https://monitoring.company.com/d/business-metrics
  • ML Performance: https://monitoring.company.com/d/ml-metrics
  • Experiments: https://experiments.company.com

Alerts

Critical alerts are configured for:

  • API latency p95 > 100ms for 5 minutes
  • Error rate > 1% for 5 minutes
  • Segment assignment rate < 80% for 10 minutes
  • Model serving failures > 0.5% for 5 minutes
  • Experiment metric degradation > 5% for surface

Logging

import logging
from src.utils.logging import get_logger

logger = get_logger(__name__)

# Structured logging
logger.info("ranking_request", extra={
    "surface_id": "homepage.hero",
    "user_id": user_id,
    "segment_id": segment_id,
    "latency_ms": latency,
})

Logs are aggregated in your log management system (DataDog, CloudWatch, etc.).

Tracing

Distributed tracing is enabled via OpenTelemetry:

from opentelemetry import trace

tracer = trace.get_tracer(__name__)

with tracer.start_as_current_span("rank_content"):
    # Ranking logic
    pass

Troubleshooting

Common Issues

High Latency

Symptom: p95 latency > 200ms

Diagnosis:

# Check feature store latency
python scripts/diagnose_latency.py --component feature_store

# Check vector DB performance
python scripts/diagnose_latency.py --component vector_db

Solutions:

  • Increase feature store cache TTL
  • Add vector DB read replicas
  • Optimize candidate generation query

Low Segment Coverage

Symptom: < 80% of sessions have segment assignments

Diagnosis:

# Check segment confidence distribution
python scripts/analyze_segments.py --metric confidence

# Review cold-start rules
python scripts/analyze_segments.py --metric cold_start

Solutions:

  • Lower confidence threshold (0.7 → 0.6)
  • Improve cold-start rule coverage
  • Retrain segmentation model

Content Staleness

Symptom: Freshness scores declining

Diagnosis:

# Check indexer status
curl http://localhost:8000/v1/admin/indexer/status

# Verify CMS webhook delivery
python scripts/check_webhooks.py

Solutions:

  • Trigger manual content refresh
  • Fix webhook delivery issues
  • Adjust freshness decay parameters

Experiment Metric Degradation

Symptom: Negative treatment effect

Diagnosis:

# Analyze experiment results
python scripts/analyze_experiment.py --exp-id exp_123

# Check for Simpson's paradox
python scripts/analyze_experiment.py --exp-id exp_123 --stratify device,source

Solutions:

  • Roll back to control variant
  • Investigate segment-level effects
  • Review policy constraints

Debug Mode

Enable verbose logging:

export LOG_LEVEL=debug
export DEBUG_MODE=true
python -m src.api.main

Performance Profiling

# Profile API endpoint
python -m cProfile -o profile.stats scripts/profile_api.py

# Analyze with snakeviz
snakeviz profile.stats

Contributing

Development Guidelines

  1. Code Style: Follow PEP 8 (Python) and Airbnb (JavaScript)
  2. Type Hints: Required for all Python functions
  3. Documentation: Docstrings required for public APIs
  4. Testing: Minimum 80% code coverage for new code
  5. Commits: Use conventional commits format

Pull Request Process

  1. Create feature branch: git checkout -b feature/your-feature
  2. Make changes with tests
  3. Run linters: make lint
  4. Run tests: make test
  5. Push and create PR
  6. Request review from 2+ maintainers
  7. Squash and merge after approval

Code Review Checklist

  • Tests pass and coverage maintained
  • Documentation updated
  • No performance regressions
  • Privacy/security considerations addressed
  • Backward compatibility maintained
  • Monitoring/alerts added if needed

License

MIT License - see LICENSE file for details.


Support


Changelog

See CHANGELOG.md for version history and release notes.

About

Adaptive User Response Agent

Resources

Security policy

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors