DEV Community

Cover image for Middleware Magic Advanced Request Processing(1750851957312000)
member_c6d11ca9
member_c6d11ca9

Posted on

Middleware Magic Advanced Request Processing(1750851957312000)

As a junior computer science student, I have always been fascinated by the concept of middleware in web development. During my exploration of modern web frameworks, I discovered that middleware is not just a technical pattern, but a powerful architectural philosophy that enables elegant request processing, authentication, authorization, and performance optimization.

Understanding Middleware Architecture

In my ten years of programming learning experience, I found that middleware represents one of the most elegant solutions to cross-cutting concerns in web applications. Unlike monolithic request handlers, middleware allows us to compose functionality in a modular, reusable way that promotes separation of concerns and code reusability.

The beauty of middleware lies in its simplicity and composability. Each middleware component has a single responsibility, and multiple middleware components can be chained together to create complex request processing pipelines.

use hyperlane::*;
use std::sync::Arc;
use std::collections::HashMap;
use std::time::{Duration, Instant};

// Advanced middleware system
#[derive(Clone)]
struct MiddlewareChain {
    middlewares: Vec<Arc<dyn Middleware + Send + Sync>>,
    error_handler: Option<Arc<dyn ErrorHandler + Send + Sync>>,
}

#[async_trait::async_trait]
trait Middleware {
    async fn process(&self, ctx: &mut Context, next: Next<'_>) -> MiddlewareResult;
    fn name(&self) -> &'static str;
    fn priority(&self) -> i32 { 0 } // Higher priority executes first
}

#[async_trait::async_trait]
trait ErrorHandler {
    async fn handle_error(&self, ctx: &mut Context, error: MiddlewareError) -> MiddlewareResult;
}

type MiddlewareResult = Result<(), MiddlewareError>;

#[derive(Debug)]
enum MiddlewareError {
    Unauthorized,
    Forbidden,
    RateLimited,
    ValidationFailed(String),
    InternalError(String),
    Timeout,
    Custom(String),
}

struct Next<'a> {
    chain: &'a MiddlewareChain,
    index: usize,
    ctx: &'a mut Context,
}

impl<'a> Next<'a> {
    async fn run(mut self) -> MiddlewareResult {
        if self.index < self.chain.middlewares.len() {
            let middleware = &self.chain.middlewares[self.index];
            self.index += 1;
            middleware.process(self.ctx, self).await
        } else {
            Ok(())
        }
    }
}

impl MiddlewareChain {
    fn new() -> Self {
        Self {
            middlewares: Vec::new(),
            error_handler: None,
        }
    }

    fn add_middleware<M: Middleware + Send + Sync + 'static>(mut self, middleware: M) -> Self {
        self.middlewares.push(Arc::new(middleware));
        // Sort by priority (higher priority first)
        self.middlewares.sort_by(|a, b| b.priority().cmp(&a.priority()));
        self
    }

    fn set_error_handler<E: ErrorHandler + Send + Sync + 'static>(mut self, handler: E) -> Self {
        self.error_handler = Some(Arc::new(handler));
        self
    }

    async fn execute(&self, ctx: &mut Context) -> MiddlewareResult {
        let next = Next {
            chain: self,
            index: 0,
            ctx,
        };

        match next.run().await {
            Ok(()) => Ok(()),
            Err(error) => {
                if let Some(handler) = &self.error_handler {
                    handler.handle_error(ctx, error).await
                } else {
                    Err(error)
                }
            }
        }
    }
}

// Request logging middleware
struct RequestLoggingMiddleware {
    log_level: LogLevel,
    include_headers: bool,
    include_body: bool,
}

#[derive(Clone)]
enum LogLevel {
    Debug,
    Info,
    Warn,
    Error,
}

impl RequestLoggingMiddleware {
    fn new(log_level: LogLevel, include_headers: bool, include_body: bool) -> Self {
        Self {
            log_level,
            include_headers,
            include_body,
        }
    }
}

#[async_trait::async_trait]
impl Middleware for RequestLoggingMiddleware {
    async fn process(&self, ctx: &mut Context, next: Next<'_>) -> MiddlewareResult {
        let start_time = Instant::now();
        let method = ctx.get_request_method().await;
        let path = ctx.get_request_path().await;
        let client_ip = ctx.get_socket_addr_or_default_string().await;

        println!("[{}] {} {} from {}", 
                chrono::Utc::now().format("%Y-%m-%d %H:%M:%S"),
                method, path, client_ip);

        if self.include_headers {
            let headers = ctx.get_request_headers().await;
            for (key, value) in headers {
                println!("  Header: {}: {}", key, value);
            }
        }

        if self.include_body {
            let body = ctx.get_request_body().await;
            if !body.is_empty() {
                println!("  Body: {} bytes", body.len());
            }
        }

        let result = next.run().await;

        let duration = start_time.elapsed();
        let status = ctx.get_response_status_code().await;

        println!("[{}] Completed {} {} -> {} in {}ms",
                chrono::Utc::now().format("%Y-%m-%d %H:%M:%S"),
                method, path, status, duration.as_millis());

        result
    }

    fn name(&self) -> &'static str {
        "RequestLogging"
    }

    fn priority(&self) -> i32 {
        1000 // High priority - log everything
    }
}

// Authentication middleware
struct AuthenticationMiddleware {
    secret_key: String,
    token_expiry: Duration,
    excluded_paths: Vec<String>,
}

impl AuthenticationMiddleware {
    fn new(secret_key: String, token_expiry: Duration) -> Self {
        Self {
            secret_key,
            token_expiry,
            excluded_paths: vec![
                "/health".to_string(),
                "/login".to_string(),
                "/register".to_string(),
            ],
        }
    }

    fn add_excluded_path(mut self, path: String) -> Self {
        self.excluded_paths.push(path);
        self
    }

    async fn validate_token(&self, token: &str) -> Result<UserClaims, String> {
        // Simplified token validation
        if token.starts_with("valid_") {
            Ok(UserClaims {
                user_id: "user123".to_string(),
                username: "testuser".to_string(),
                roles: vec!["user".to_string()],
                exp: chrono::Utc::now().timestamp() + 3600,
            })
        } else {
            Err("Invalid token".to_string())
        }
    }
}

#[derive(Clone, Debug)]
struct UserClaims {
    user_id: String,
    username: String,
    roles: Vec<String>,
    exp: i64,
}

#[async_trait::async_trait]
impl Middleware for AuthenticationMiddleware {
    async fn process(&self, ctx: &mut Context, next: Next<'_>) -> MiddlewareResult {
        let path = ctx.get_request_path().await;

        // Skip authentication for excluded paths
        if self.excluded_paths.iter().any(|excluded| path.starts_with(excluded)) {
            return next.run().await;
        }

        // Extract token from Authorization header
        let headers = ctx.get_request_headers().await;
        let auth_header = headers.get("authorization")
            .or_else(|| headers.get("Authorization"));

        if let Some(auth_value) = auth_header {
            if let Some(token) = auth_value.strip_prefix("Bearer ") {
                match self.validate_token(token).await {
                    Ok(claims) => {
                        // Store user claims in context
                        ctx.set_attribute("user_claims", claims).await;
                        next.run().await
                    }
                    Err(_) => Err(MiddlewareError::Unauthorized),
                }
            } else {
                Err(MiddlewareError::Unauthorized)
            }
        } else {
            Err(MiddlewareError::Unauthorized)
        }
    }

    fn name(&self) -> &'static str {
        "Authentication"
    }

    fn priority(&self) -> i32 {
        900 // High priority - authenticate early
    }
}

// Rate limiting middleware
struct RateLimitingMiddleware {
    requests_per_minute: u32,
    window_size: Duration,
    client_requests: Arc<std::sync::Mutex<HashMap<String, ClientRateLimit>>>,
}

#[derive(Clone)]
struct ClientRateLimit {
    request_count: u32,
    window_start: Instant,
}

impl RateLimitingMiddleware {
    fn new(requests_per_minute: u32) -> Self {
        Self {
            requests_per_minute,
            window_size: Duration::from_secs(60),
            client_requests: Arc::new(std::sync::Mutex::new(HashMap::new())),
        }
    }

    fn is_rate_limited(&self, client_ip: &str) -> bool {
        let mut clients = self.client_requests.lock().unwrap();
        let now = Instant::now();

        match clients.get_mut(client_ip) {
            Some(rate_limit) => {
                // Check if window has expired
                if now.duration_since(rate_limit.window_start) >= self.window_size {
                    rate_limit.request_count = 1;
                    rate_limit.window_start = now;
                    false
                } else {
                    rate_limit.request_count += 1;
                    rate_limit.request_count > self.requests_per_minute
                }
            }
            None => {
                clients.insert(client_ip.to_string(), ClientRateLimit {
                    request_count: 1,
                    window_start: now,
                });
                false
            }
        }
    }
}

#[async_trait::async_trait]
impl Middleware for RateLimitingMiddleware {
    async fn process(&self, ctx: &mut Context, next: Next<'_>) -> MiddlewareResult {
        let client_ip = ctx.get_socket_addr_or_default_string().await;

        if self.is_rate_limited(&client_ip) {
            Err(MiddlewareError::RateLimited)
        } else {
            next.run().await
        }
    }

    fn name(&self) -> &'static str {
        "RateLimiting"
    }

    fn priority(&self) -> i32 {
        800 // Execute after authentication but before business logic
    }
}

// Request validation middleware
struct ValidationMiddleware {
    max_body_size: usize,
    required_headers: Vec<String>,
    content_type_whitelist: Vec<String>,
}

impl ValidationMiddleware {
    fn new(max_body_size: usize) -> Self {
        Self {
            max_body_size,
            required_headers: Vec::new(),
            content_type_whitelist: vec![
                "application/json".to_string(),
                "application/x-www-form-urlencoded".to_string(),
                "multipart/form-data".to_string(),
            ],
        }
    }

    fn require_header(mut self, header: String) -> Self {
        self.required_headers.push(header);
        self
    }

    fn allow_content_type(mut self, content_type: String) -> Self {
        self.content_type_whitelist.push(content_type);
        self
    }
}

#[async_trait::async_trait]
impl Middleware for ValidationMiddleware {
    async fn process(&self, ctx: &mut Context, next: Next<'_>) -> MiddlewareResult {
        let headers = ctx.get_request_headers().await;

        // Check required headers
        for required_header in &self.required_headers {
            if !headers.contains_key(required_header) {
                return Err(MiddlewareError::ValidationFailed(
                    format!("Missing required header: {}", required_header)
                ));
            }
        }

        // Check content type for POST/PUT requests
        let method = ctx.get_request_method().await;
        if matches!(method.as_str(), "POST" | "PUT" | "PATCH") {
            if let Some(content_type) = headers.get("content-type") {
                let is_allowed = self.content_type_whitelist.iter()
                    .any(|allowed| content_type.starts_with(allowed));

                if !is_allowed {
                    return Err(MiddlewareError::ValidationFailed(
                        format!("Unsupported content type: {}", content_type)
                    ));
                }
            }
        }

        // Check body size
        let body = ctx.get_request_body().await;
        if body.len() > self.max_body_size {
            return Err(MiddlewareError::ValidationFailed(
                format!("Request body too large: {} bytes (max: {})", 
                       body.len(), self.max_body_size)
            ));
        }

        next.run().await
    }

    fn name(&self) -> &'static str {
        "Validation"
    }

    fn priority(&self) -> i32 {
        700 // Execute after rate limiting
    }
}

// Performance monitoring middleware
struct PerformanceMiddleware {
    slow_request_threshold: Duration,
    metrics: Arc<std::sync::Mutex<PerformanceMetrics>>,
}

#[derive(Default)]
struct PerformanceMetrics {
    total_requests: u64,
    total_response_time: Duration,
    slow_requests: u64,
    error_count: u64,
}

impl PerformanceMiddleware {
    fn new(slow_request_threshold: Duration) -> Self {
        Self {
            slow_request_threshold,
            metrics: Arc::new(std::sync::Mutex::new(PerformanceMetrics::default())),
        }
    }

    fn get_metrics(&self) -> PerformanceMetrics {
        self.metrics.lock().unwrap().clone()
    }
}

impl Clone for PerformanceMetrics {
    fn clone(&self) -> Self {
        Self {
            total_requests: self.total_requests,
            total_response_time: self.total_response_time,
            slow_requests: self.slow_requests,
            error_count: self.error_count,
        }
    }
}

#[async_trait::async_trait]
impl Middleware for PerformanceMiddleware {
    async fn process(&self, ctx: &mut Context, next: Next<'_>) -> MiddlewareResult {
        let start_time = Instant::now();

        let result = next.run().await;

        let duration = start_time.elapsed();
        let mut metrics = self.metrics.lock().unwrap();

        metrics.total_requests += 1;
        metrics.total_response_time += duration;

        if duration > self.slow_request_threshold {
            metrics.slow_requests += 1;
            println!("Slow request detected: {}ms for {} {}", 
                    duration.as_millis(),
                    ctx.get_request_method().await,
                    ctx.get_request_path().await);
        }

        if result.is_err() {
            metrics.error_count += 1;
        }

        result
    }

    fn name(&self) -> &'static str {
        "Performance"
    }

    fn priority(&self) -> i32 {
        100 // Low priority - monitor everything
    }
}

// Error handler implementation
struct DefaultErrorHandler;

#[async_trait::async_trait]
impl ErrorHandler for DefaultErrorHandler {
    async fn handle_error(&self, ctx: &mut Context, error: MiddlewareError) -> MiddlewareResult {
        let (status_code, error_message) = match error {
            MiddlewareError::Unauthorized => (401, "Unauthorized"),
            MiddlewareError::Forbidden => (403, "Forbidden"),
            MiddlewareError::RateLimited => (429, "Too Many Requests"),
            MiddlewareError::ValidationFailed(msg) => {
                ctx.set_response_body(format!(r#"{{"error": "Validation failed", "details": "{}"}}"#, msg)).await;
                (400, "Bad Request")
            }
            MiddlewareError::Timeout => (408, "Request Timeout"),
            MiddlewareError::InternalError(_) => (500, "Internal Server Error"),
            MiddlewareError::Custom(msg) => {
                ctx.set_response_body(format!(r#"{{"error": "{}"}}"#, msg)).await;
                (400, "Bad Request")
            }
        };

        ctx.set_response_status_code(status_code).await;

        if ctx.get_response_body().await.is_empty() {
            ctx.set_response_body(format!(r#"{{"error": "{}"}}"#, error_message)).await;
        }

        ctx.set_response_header("Content-Type", "application/json").await;

        Ok(())
    }
}

// Global middleware chain
static MIDDLEWARE_CHAIN: once_cell::sync::Lazy<MiddlewareChain> = 
    once_cell::sync::Lazy::new(|| {
        MiddlewareChain::new()
            .add_middleware(RequestLoggingMiddleware::new(LogLevel::Info, false, false))
            .add_middleware(RateLimitingMiddleware::new(100)) // 100 requests per minute
            .add_middleware(AuthenticationMiddleware::new("secret_key".to_string(), Duration::from_secs(3600)))
            .add_middleware(ValidationMiddleware::new(1024 * 1024)) // 1MB max body size
            .add_middleware(PerformanceMiddleware::new(Duration::from_millis(1000))) // 1s slow threshold
            .set_error_handler(DefaultErrorHandler)
    });

// Middleware-enabled endpoints
async fn middleware_handler(ctx: Context) {
    let mut ctx = ctx;

    match MIDDLEWARE_CHAIN.execute(&mut ctx).await {
        Ok(()) => {
            // Middleware chain completed successfully, proceed with business logic
            ctx.set_response_status_code(200)
                .await
                .set_response_header(CONTENT_TYPE, APPLICATION_JSON)
                .await
                .set_response_body(r#"{"message": "Request processed successfully"}"#)
                .await;
        }
        Err(_) => {
            // Error was handled by error handler
        }
    }
}

#[get]
async fn protected_endpoint(ctx: Context) {
    middleware_handler(ctx).await;
}

#[post]
async fn api_endpoint(ctx: Context) {
    middleware_handler(ctx).await;
}

#[get]
async fn middleware_metrics_endpoint(ctx: Context) {
    // This endpoint bypasses most middleware for metrics collection
    let response = serde_json::json!({
        "middleware_chain": {
            "total_middlewares": 5,
            "execution_order": [
                "RequestLogging",
                "Authentication", 
                "RateLimiting",
                "Validation",
                "Performance"
            ]
        },
        "performance_stats": {
            "average_response_time_ms": 45.2,
            "slow_requests_percentage": 2.1,
            "error_rate_percentage": 0.5
        }
    });

    ctx.set_response_status_code(200)
        .await
        .set_response_header(CONTENT_TYPE, APPLICATION_JSON)
        .await
        .set_response_body(serde_json::to_string(&response).unwrap())
        .await;
}
Enter fullscreen mode Exit fullscreen mode

Advanced Middleware Patterns

Through my exploration of middleware architecture, I discovered several advanced patterns that make middleware systems even more powerful:

  1. Conditional Middleware: Middleware that executes based on request characteristics
  2. Middleware Composition: Combining multiple middleware into reusable components
  3. Context Enrichment: Middleware that adds data to the request context for downstream use
  4. Error Recovery: Middleware that can recover from certain types of errors
  5. Performance Optimization: Middleware that optimizes request processing

These patterns enable building sophisticated request processing pipelines that can handle complex business requirements while maintaining clean, modular code.

The Power of Composability

What I find most impressive about middleware architecture is its composability. Each middleware component can be developed, tested, and maintained independently, yet they work together seamlessly to create powerful request processing pipelines.

This composability enables teams to build reusable middleware libraries that can be shared across projects, reducing development time and improving code quality. It also makes it easy to add new functionality or modify existing behavior without affecting other parts of the system.


This article documents my exploration of middleware architecture as a junior student. Through practical implementation and experimentation, I gained deep insights into how middleware enables elegant, modular request processing in modern web applications. I hope my experience can help other students understand this powerful architectural pattern.

For more information, please visit Hyperlane GitHub page or contact the author: [email protected]

Top comments (0)