DEV Community

Tim Kelly for MongoDB

Posted on

Building a Real-Time Market Data Aggregator with Kafka and MongoDB

It starts the same way every time: an RSS feed, a CSV dump, a dashboard wired together with hope and duct tape. Maybe the data’s late. Maybe the moving average lags just enough to miss the signal. Or maybe it’s just another system straining under the weight of “real-time” without the architecture to back it up.

In this tutorial, we’ll build something better. Using Spring Kafka, we’ll stream simulated live stock data into a Kafka topic. From there, Kafka Streams will compute the Relative Strength Index (RSI) on the fly, and we’ll persist it to MongoDB using Spring Data MongoDB. On the historical side, we’ll use MongoDB’s aggregation framework to calculate and analyze simple and exponential moving averages (SMA and EMA) using the MongoDB aggregation pipeline—no third-party data warehouse required.

Main dashboard for our app

By the end, we’ll have a real-time financial pipeline that’s compact and fast. Feel free to clone the GitHub repository, and use the front end I reference throughout the tutorial.

Prerequisites

Before getting started, make sure you have the following installed and ready:

Creating our Spring application

To keep things simple, we’ll use Spring Initializr to scaffold our project.

Spring Initializr dependencies
Choose the following options:

  • Project: Maven
  • Language: Java
  • Spring Boot Version: 3.4.4
  • Group: com.mongodb (our your own preferred group)
  • Artifact: FinancialAggregator
  • Name: FinancialAggregator
  • Java version: 21
  • Dependencies:
    • Spring Web
    • Spring Data MongoDB
    • Spring for Apache Kafka
    • Kafka Streams

Here’s what each dependency will do for us:

  • Spring Web: Gives us everything we need to expose REST APIs with Spring Boot. We’ll use this to serve our financial data through HTTP endpoints.
  • Spring Data MongoDB: Provides convenient, idiomatic access to MongoDB using Spring Data repositories. It allows us to interact with our database in a way that feels familiar as a Spring developer.
  • Spring for Apache Kafka: Adds Spring Boot integration for Kafka, including KafkaTemplate for sending messages and configuration support for producers and consumers.
  • Kafka Streams: The actual stream processing engine. This powers our calculations on our stock data in real time. Spring will wire this into the app using @EnableKafkaStreams.

With this setup, we’ll have a data pipeline that simulates live financial data, processes it on the fly, stores it, and exposes it, as well as allowing us to analyze trends in historic data.

Once generated, extract the zip and open it in your preferred IDE.

Setting up MongoDB

Before we can start processing and storing data, we need a MongoDB cluster ready to receive it.

If you don’t already have a MongoDB Atlas cluster, you can follow this quick start guide to set one up for free. Once our cluster is running, we can use either the Atlas UI or mongosh to configure the necessary collections.

What MongoDB does in this project

MongoDB acts as our primary storage engine. It stores both:

  • Real-time RSI data produced by Kafka Streams.
  • Historical stock market data used to calculate SMA and EMA via aggregations.

We’ll take advantage of time series collections in MongoDB, which are optimized for timestamped data like stock prices. This lets us efficiently query, sort, and analyze trends over time.

Creating time series collections in MongoDB Atlas

Once your cluster is ready, follow these steps:

  1. Open the MongoDB Atlas UI and navigate to your cluster.
  2. Click "Browse Collections."
  3. Create a new database named Stock_Market_Data.
  4. Inside this database, create two time series collections:
    • Collection Name: Live_Data
      • Time Field: date
      • Granularity: Seconds
    • Collection Name: Historical_Data
      • Time Field: date
      • Metafield: company (we'll see why this is important later)
      • Granularity: Seconds

That’s all you need to start writing and reading real-time stock data with MongoDB. Next, we’ll simulate live market data and send it to Kafka.

Configuring our MongoDB connection

From the Atlas UI, we can get our connection string, and bring it back to our application properties.

spring.application.name=Financial Aggregator  

spring.data.mongodb.uri=<Your-Connection-String>
spring.data.mongodb.database=Stock_Market_Data
Enter fullscreen mode Exit fullscreen mode

We'll also name the database we're using, Stock_Market_Data.

Creating our live data

Our live data won’t be truly live. Instead, we’ll simulate and stream it into the system. There are plenty of APIs out there that offer real-time stock data, but to keep this demo self-contained and avoid external dependencies and key generation/management, we’ll mock the data ourselves. That said, if you’d prefer to plug in a real API, go for it—this setup won’t get in your way.

Our live data model

Start by creating a package called model in your application. Inside it, add a new class named LiveStockData. I came to understand far too late in the game that this could be confused with an agricultural term, but alas.

This class is our POJO—used to map simulated stock data to documents in MongoDB:

package com.mongodb.financialaggregator.model;  

import org.bson.types.ObjectId;  
import org.springframework.data.annotation.Id;  
import org.springframework.data.mongodb.core.mapping.Document;  
import org.springframework.data.mongodb.core.mapping.TimeSeries;  

import java.util.Date;  

@Document(collection = "Live_Data")  
@TimeSeries(timeField = "date")  
public class LiveStockData {  

    @Id  
    private ObjectId id;  
    private Date date;  
    private String company;  
    private Double closeLast;  
    private Double open;  
    private Double low;  
    private Double volume;  
    private Double high;  
    private Double rsi;  

    public LiveStockData() {}  

    public ObjectId getId() { return id; }  
    public void setId(ObjectId id) { this.id = id; }  

    public Date getDate() { return date; }  
    public void setDate(Date date) { this.date = date; }  

    public String getCompany() { return company; }  
    public void setCompany(String company) { this.company = company; }  

    public Double getCloseLast() { return closeLast; }  
    public void setCloseLast(Double closeLast) { this.closeLast = closeLast; }  

    public Double getOpen() { return open; }  
    public void setOpen(Double open) { this.open = open; }  

    public Double getLow() { return low; }  
    public void setLow(Double low) { this.low = low; }  

    public Double getVolume() { return volume; }  
    public void setVolume(Double volume) { this.volume = volume; }  

    public Double getHigh() { return high; }  
    public void setHigh(Double high) { this.high = high; }  

    public Double getRsi() { return rsi; }  
    public void setRsi(Double rsi) { this.rsi = rsi; }  
}
Enter fullscreen mode Exit fullscreen mode

This class gives us enough fields to mimic the structure of a real stock market API and power a candlestick chart. It also includes a field for the RSI (Relative Strength Index), which we'll calculate later.

A lot of this is fairly standard getter/setters, but let's take a look at the annotations we're adding to let Spring know what to do with this class.

@Document(collection = "Live_Data")  
Enter fullscreen mode Exit fullscreen mode

This tells Spring Data MongoDB to map this class to the Live_Data collection in MongoDB.

@TimeSeries(timeField = "date") 
Enter fullscreen mode Exit fullscreen mode

This annotation enables MongoDB’s time series capabilities. We’re specifying date as the time field, which allows us to efficiently store and query stock data over time.

Configure Kafka producer

Next, we’ll set up a Kafka producer to stream our simulated stock data. Create a new config package, and inside it, add a class named KafkaProducerConfig.

The Kafka producer is responsible for publishing messages, in our case, LiveStockData objects, to a Kafka topic.

package com.mongodb.financialaggregator.config;  

import com.mongodb.financialaggregator.model.LiveStockData;  
import org.apache.kafka.clients.producer.ProducerConfig;  
import org.apache.kafka.common.serialization.StringSerializer;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.kafka.core.*;  
import org.springframework.kafka.support.serializer.JsonSerializer;  

import java.util.HashMap;  
import java.util.Map;  

@Configuration  
public class KafkaProducerConfig {  

    @Bean  
    public ProducerFactory<String, LiveStockData> producerFactory() {  
        Map<String, Object> config = new HashMap<>();  
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");  
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);  
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);  
        return new DefaultKafkaProducerFactory<>(config);  
    }  

    @Bean  
    public KafkaTemplate<String, LiveStockData> kafkaTemplate() {  
        return new KafkaTemplate<>(producerFactory());  
    }  
}
Enter fullscreen mode Exit fullscreen mode

Here, we configure a ProducerFactory that knows how to serialize our data using JsonSerializer. We also state we will be talking to the broker on localhost:9092. The KafkaTemplate is a convenient way to send messages to Kafka topics from anywhere in the app.

The @Configuration annotation just tells Spring this is a configuration class.

Configure Kafka streams

Before we can wire up the stream logic, we need to tell Kafka Streams how to serialize and deserialize our custom LiveStockData objects. To do this, we'll create a custom SerDe (serializer/deserializer).

Create a utility package, and inside it, add the LiveStockDataSerde class:

package com.mongodb.financialaggregator.utility;  

import com.mongodb.financialaggregator.model.LiveStockData;  
import org.apache.kafka.common.serialization.Serdes;  
import org.springframework.kafka.support.serializer.JsonDeserializer;  
import org.springframework.kafka.support.serializer.JsonSerializer;  

public class LiveStockDataSerde extends Serdes.WrapperSerde<LiveStockData> {  
    public LiveStockDataSerde() {  
        super(new JsonSerializer<>(), new JsonDeserializer<>(LiveStockData.class));  
    }  
}
Enter fullscreen mode Exit fullscreen mode

We’ll use this class when defining our stream topology to ensure Kafka Streams can work directly with our domain objects without manual conversion—basically, so Kafka Streams knows how to convert bytes to Java objects and back.

In the same config package as our KafkaProducerConfig, we also need to create a KafkaStreamsConfig class. The KafkaStreamConfig will configure the Kafka Streams API to consume our messages and process them in real time.

package com.mongodb.financialaggregator.config;  

import org.apache.kafka.common.serialization.Serdes;  
import org.apache.kafka.streams.StreamsConfig;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.kafka.annotation.EnableKafkaStreams;  
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;  
import org.springframework.kafka.config.KafkaStreamsConfiguration;  
import org.springframework.kafka.support.serializer.JsonDeserializer;  

import java.util.HashMap;  
import java.util.Map;  


@Configuration  
@EnableKafkaStreams  
public class KafkaStreamsConfig {  

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)  
    KafkaStreamsConfiguration kStreamsConfig() {  
        Map<String, Object> props = new HashMap<>();  
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "rsi-streams-app");  
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");  
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());  
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.mongodb.financialaggregator.model");  
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.mongodb.financialaggregator.model.LiveStockData");  
        props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);  

        return new KafkaStreamsConfiguration(props);  
    }  
}
Enter fullscreen mode Exit fullscreen mode

This sets up the Kafka Streams runtime with the basic configurations it needs: a unique application ID, Kafka broker address, and the expected serialization types. It also ensures that our stream can deserialize LiveStockData objects correctly.

@EnableKafkaStreams tells Spring Boot to activate Kafka Streams support.

We also have some important configurations set in our @Bean. KafkaStreamsConfiguration

  • APPLICATION_ID_CONFIG: A unique name for this Kafka Streams application. This is required. This ID is how Kafka tracks our app’s state and progress across restarts. Changing it will reset our app’s stream state, including any local state stores or offsets.
  • BOOTSTRAP_SERVERS_CONFIG: Where to find our Kafka cluster.
  • DEFAULT_KEY_SERDE_CLASS_CONFIG: Uses Serdes.String(). We expect keys to be strings.
  • JsonDeserializer config: Ensures Kafka Streams knows how to read our custom object (LiveStockData).

If you're new to Kafka, you might be wondering, "Why not just use a Kafka Consumer?" Well Kafka Streams isn’t just a consumer—it’s a full stream processing library built on top of Kafka. It lets us define processing logic as a topology of transformations, stateful operations, and sinks. In this case, we’ll use it to calculate RSI values as they come in.

Simulating live stock data

Now, I’m not going to pretend this is the most impressive stock market simulator in the world. It’s not. But it gets the job done. It gives us consistent, dynamic data to push into Kafka and eventually graph. That’s all we need.

I've added some logic here just to generate some fluctuating data for an imaginary company called "Doof."

We'll create a service package and add a class named SimulateLiveData.

package com.mongodb.financialaggregator.service;

import com.mongodb.financialaggregator.model.LiveStockData;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.*;
import java.util.Date;
import java.util.Random;

@Service
public class SimulateLiveData {

    private Date currentSimulatedDate;
    private double previousClose;
    private final KafkaTemplate<String, LiveStockData> kafkaTemplate;

    private static final String COMPANY = "DOOF";
    private static final double INITIAL_OPEN = 100.0;
    private static final double INITIAL_VOLUME = 1000.0;

    private final double trendSlope = 0.002;
    private boolean trendingUp = true;
    private int trendCounter = 0;

    private final int oscillationPeriod = 30;
    private final double oscillationAmplitude = 1.5;

    private final Random random = new Random();

    public SimulateLiveData(KafkaTemplate<String, LiveStockData> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
        this.currentSimulatedDate = Date.from(LocalDate.now()
                .atStartOfDay(ZoneId.systemDefault())
                .toInstant());
        this.previousClose = INITIAL_OPEN;
    }

    public LiveStockData simulateLiveData() {
        LiveStockData stock = new LiveStockData();

        stock.setDate(currentSimulatedDate);
        stock.setCompany(COMPANY);

        double trendAdjustment = trendingUp ? trendSlope : -trendSlope;
        double oscillation = oscillationAmplitude *
                Math.sin((2 * Math.PI * trendCounter) / oscillationPeriod);
        double marketBaseline = previousClose * (1 + trendAdjustment) + oscillation;

        boolean hasVolatility = random.nextDouble() < 0.1;
        double volatilityBoost = hasVolatility ? (randomBetween(-0.1, 0.1)) : 0;

        double open = marketBaseline * (1 + volatilityBoost);
        double close = open * (1 + randomBetween(-0.05, 0.05));
        double high = Math.max(open, close) * (1 + randomBetween(0.01, 0.1));
        double low = Math.min(open, close) * (1 - randomBetween(0.01, 0.1));
        double volume = INITIAL_VOLUME * (1 + randomBetween(-0.5, 0.5));

        stock.setOpen(round(open));
        stock.setCloseLast(round(close));
        stock.setHigh(round(high));
        stock.setLow(round(low));
        stock.setVolume(round(volume));

        previousClose = close;
        trendCounter++;

        // Advance to the next simulated day
        LocalDate next = currentSimulatedDate.toInstant()
                .atZone(ZoneId.systemDefault())
                .toLocalDate()
                .plusDays(1);
        currentSimulatedDate = Date.from(next.atStartOfDay(ZoneId.systemDefault()).toInstant());

        // Flip the trend every 60 days
        if (trendCounter % 60 == 0) {
            trendingUp = !trendingUp;
        }

        return stock;
    }

    private double randomBetween(double min, double max) {
        return min + (max - min) * random.nextDouble();
    }

    private double round(double value) {
        return BigDecimal.valueOf(value)
                .setScale(2, RoundingMode.HALF_UP)
                .doubleValue();
    }
}
Enter fullscreen mode Exit fullscreen mode

This function simulates somewhat real-looking stock data with some randomness, gentle trends, and a hint of periodic volatility. It resets daily and flips direction every 60 “days” to prevent things from spiraling to the moon or zero.

Scheduling the simulation

Now, let’s wire it up so that we publish this fake stock data to Kafka automatically. I've arbitrarily decided once per second.

Inside the same SimulateLiveData class, add:

@Scheduled(fixedRate = 1000)
public void simulateEverySecond() {
    LiveStockData data = simulateLiveData();

    System.out.println("========== STOCK DEBUG INFO ==========");
    System.out.println("Company:       " + data.getCompany());
    System.out.println("Date (raw):    " + data.getDate());
    System.out.println("Date (class):  " + (data.getDate() != null ? data.getDate().getClass().getName() : "null"));
    System.out.println("Open:          " + data.getOpen());
    System.out.println("Close/Last:    " + data.getCloseLast());
    System.out.println("High:          " + data.getHigh());
    System.out.println("Low:           " + data.getLow());
    System.out.println("Volume:        " + data.getVolume());
    System.out.println("======================================");

    try {
        kafkaTemplate.send("stock-prices", data.getCompany(), data);
        System.out.println("Sent successfully.\n");
    } catch (Exception e) {
        System.err.println("Insert failed: " + e.getMessage());
        e.printStackTrace();
    }
}
Enter fullscreen mode Exit fullscreen mode

Thanks to @Scheduled(fixedRate = 1000), this method is triggered every second. It generates a new stock datapoint, prints it out for debugging, and sends it to the stock-prices Kafka topic using the KafkaTemplate.

I've added some logging and error handling just to help with the clarity when we're running the application.

Finally, make sure you enable scheduling in your application entrypoint by adding @EnableScheduling:

package com.mongodb.financialaggregator;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class FinancialAggregatorApplication {

    public static void main(String[] args) {
        SpringApplication.run(FinancialAggregatorApplication.class, args);
    }
}
Enter fullscreen mode Exit fullscreen mode

With this in place, our app will start streaming simulated stock data to Kafka as soon as we run it.

Perform calculations in the stream processor pipeline

The stream processor allows us to consume the stock price data flowing through Kafka, calculate real-time analytics, and persist our enriched results to MongoDB.

We'll use this to calculate some Relative Strength Indexes (RSIs). The RSI is a momentum indicator used in financial analysis. RSI measures the speed and magnitude of a security's recent price changes to detect overbought or oversold conditions in the price of that security. It is an oscillatory metric (displayed as a line graph) on a scale of 0 to 100. We can add them to our documents before we store them in MongoDB.

Live stock data graph

Using MongoRepository

To save our live stock data into MongoDB, we’ll use Spring Data’s MongoRepository interface. This gives us a ready-to-go set of CRUD operations with minimal setup. No boilerplate, just clean data access.

Create a repository package and add a new interface called LiveDataRepository:

package com.mongodb.financialaggregator.repository;

import com.mongodb.financialaggregator.model.LiveStockData;
import org.bson.types.ObjectId;
import org.springframework.data.mongodb.repository.MongoRepository;

public interface LiveDataRepository extends MongoRepository<LiveStockData, ObjectId> {
}
Enter fullscreen mode Exit fullscreen mode

Here, we're extending MongoRepository, which is a Spring Data interface that automatically provides methods like save(), findAll(), deleteById(), and more.

We specify LiveStockData as the type we’re working with, and ObjectId as the ID type (since that’s what we used in our model).

We can also define custom query methods here later, if needed (e.g., findByCompany(String company)), but for now, the built-in methods are more than enough.

The RSI calculator

In the Utility package, add a new class called RsiCalculator:

package com.mongodb.financialaggregator.utility;  

import java.util.List;  

public class RsiCalculator {  

    public static double calculate(List<Double> prices, int period) {  
        if (prices.size() < period + 1) return 0.0;  

        double gain = 0.0;  
        double loss = 0.0;  
        for (int i = 1; i <= period; i++) {  
            double diff = prices.get(i) - prices.get(i - 1);  
            if (diff > 0) gain += diff;  
            else loss -= diff;  
        }  

        if (loss == 0) return 100.0;  
        double rs = gain / loss;  
        return 100.0 - (100.0 / (1.0 + rs));  
    }  
}
Enter fullscreen mode Exit fullscreen mode

Here, you can see that we take a rolling window of prices and compute RSI using the standard formula. If there's no loss (i.e., price only went up), the RSI maxes out at 100. Otherwise, it calculates the average gain/loss ratio over the given period.

Now, in our service package, we'll add a class RsiStreamProcessor. Here, we will define our hook into the Kafka Streams pipeline, apply the RSI calculation, and persist the result to MongoDB.

package com.mongodb.financialaggregator.service;  

import com.mongodb.financialaggregator.model.LiveStockData;  
import com.mongodb.financialaggregator.repository.LiveDataRepository;  
import com.mongodb.financialaggregator.utility.LiveStockDataSerde;  
import com.mongodb.financialaggregator.utility.RsiCalculator;  
import org.apache.kafka.common.serialization.Serdes;  
import org.apache.kafka.streams.StreamsBuilder;  
import org.apache.kafka.streams.kstream.Consumed;  
import org.apache.kafka.streams.kstream.KStream;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.stereotype.Component;  

import java.util.ArrayDeque;  
import java.util.ArrayList;  
import java.util.Deque;  
import java.util.Map;  
import java.util.concurrent.ConcurrentHashMap;  

@Component  
public class RsiStreamProcessor {  

    private static final int RSI_PERIOD = 14;  
    private final LiveDataRepository liveDataRepository;  
    private final Map<String, Deque<Double>> priceHistory = new ConcurrentHashMap<>();  

    public RsiStreamProcessor(LiveDataRepository liveDataRepository, StreamsBuilder streamsBuilder) {  
        this.liveDataRepository = liveDataRepository;  
        buildPipeline(streamsBuilder);  
    }  

    public void buildPipeline(StreamsBuilder builder) {  
        LiveStockDataSerde liveSerde = new LiveStockDataSerde();  
        KStream<String, LiveStockData> stream = builder.stream(  
                "stock-prices",  
                Consumed.with(Serdes.String(), liveSerde)  
        );  

        stream.peek((key, data) -> {  
            System.out.println("Received from Kafka: " + data);  

            if (data == null || data.getCloseLast() == null || data.getCompany() == null) {  
                return;  
            }  

            String symbol = data.getCompany();  
            double close = data.getCloseLast();  

            Deque<Double> prices = priceHistory.computeIfAbsent(symbol, k -> new ArrayDeque<>());  
            prices.addLast(close);  
            if (prices.size() > RSI_PERIOD + 1) prices.removeFirst();  

            if (prices.size() == RSI_PERIOD + 1) {  
                double rsi = RsiCalculator.calculate(new ArrayList<>(prices), RSI_PERIOD);  
                data.setRsi(rsi);  
            }  

            liveDataRepository.save(data);  
        });  
    }  
}
Enter fullscreen mode Exit fullscreen mode

You can see in the buildPipeline we first validate the data. Then, we track a sliding window of the last 15 prices. If we have enough data (at least 15 days to look through), we compute the RSI and attach it to the stock object. At the end of all of this, we can then save our data into our database with the RSI calculated.

The @Component annotation tells Spring this is a managed bean and should be picked up during component scanning.

This gives us a fully reactive stream: Every second, simulated data flows in, RSI is computed in real time, and the result is persisted—all without writing a line of boilerplate consumer logic.

Serve up our data in an API

We have our data processed and funnelled into our database. Let's look at getting some of that data out.

We’ll expose a REST endpoint so we (or our front end) can query the latest stock data and indicators like RSI.

Live stock data graph

In our service package, let’s create a new class, FinancialService:

package com.mongodb.financialaggregator.service;  

import com.mongodb.financialaggregator.model.*;  
import com.mongodb.financialaggregator.repository.LiveDataRepository;  
import org.springframework.stereotype.Service;  

import java.util.Date;  
import java.util.List;  

@Service  
public class FinancialService {  

    private LiveDataRepository liveDataRepository;

    public FinancialService(LiveDataRepository liveDataRepository) {
        this.liveDataRepository = liveDataRepository; 
    }  

    public List<LiveStockData> getLiveData() {  
        return liveDataRepository.findAll();  
    }  
}
Enter fullscreen mode Exit fullscreen mode

Nothing fancy—we inject the LiveDataRepository and expose a getLiveData() method that fetches everything from the Live_Data collection using findAll().

In a new package, controller, create a class called FinancialController. This is the part of the app that receives HTTP requests and sends responses back.

package com.mongodb.financialaggregator.controller;  

import com.mongodb.financialaggregator.model.LiveStockData;   
import com.mongodb.financialaggregator.service.FinancialService;  
import org.springframework.web.bind.annotation.*;  

import java.util.Date;  
import java.util.List;  

@CrossOrigin(origins = "http://localhost:5173") // Adjust if needed  
@RestController  
@RequestMapping(path = "/stocks")  
public class FinancialController {  
    private final FinancialService financialService;  

    public FinancialController(FinancialService financialService) {  
        this.financialService = financialService;  
    }  

    // test  
    // GET http://localhost:8080/stocks/live    
    // curl -v "http://localhost:8080/stocks/live"    
    @GetMapping(value= "/live", produces = "application/json")  
    public List<LiveStockData> getLiveData() {  
        return financialService.getLiveData();  
    };  
}
Enter fullscreen mode Exit fullscreen mode

This controller:

  • Maps the /stocks route to this controller.
  • Exposes a /stocks/live endpoint that returns a JSON list of all stored LiveStockData documents.
  • Uses @CrossOrigin to allow local frontend dev (like our React/Vite on port 5173) to make requests during development.

We'll be back here later to add more endpoints. Now that we have our live data being fed to our database and presented in our API, it's time to look at some of the aggregations we can do on data in our database.

Perform aggregations on our historical data

Our stock market data model

package com.mongodb.financialaggregator.model;  

import org.bson.types.ObjectId;  
import org.springframework.data.annotation.Id;  
import org.springframework.data.mongodb.core.mapping.Document;  
import org.springframework.data.mongodb.core.mapping.TimeSeries;  

import java.util.Date;  

@Document(collection = "Historical_Data")  
@TimeSeries(collection = "Historical_Data", timeField = "Date", metaField = "Company")  
public class StockMarketData {  

    @Id  
    private ObjectId id;  
    private Date date;  
    private String company;  
    private Double closeLast;  
    private Double open;  
    private Double low;  
    private Double volume;  
    private Double high;  

    public StockMarketData() {  
    }  

    public StockMarketData(ObjectId id, Date date, String company, Double closeLast, Double open, Double low,Double volume, Double high) {  
        this.id = id;  
        this.date = date;  
        this.company = company;  
        this.closeLast = closeLast;  
        this.open = open;  
        this.low = low;  
        this.volume = volume;  
        this.high = high;  
    }  

    public StockMarketData(Date date, String company, Double closeLast, Double open, Double low,Double volume, Double high) {  
        this.date = date;  
        this.company = company;  
        this.closeLast = closeLast;  
        this.open = open;  
        this.low = low;  
        this.volume = volume;  
        this.high = high;  
    }  

    public ObjectId getId() {  
        return id;  
    }  

    public void setId(ObjectId id) {  
        this.id = id;  
    }  

    public Date getDate() {  
        return date;  
    }  

    public void setDate(Date date) {  
        this.date = date;  
    }  

    public String getCompany() {  
        return company;  
    }  

    public void setCompany(String company) {  
        this.company = company;  
    }  

    public Double getCloseLast() {  
        return closeLast;  
    }  

    public void setCloseLast(Double closeLast) {  
        this.closeLast = closeLast;  
    }  

    public Double getOpen() {  
        return open;  
    }  

    public void setOpen(Double open) {  
        this.open = open;  
    }  

    public Double getLow() {  
        return low;  
    }  

    public void setLow(Double low) {  
        this.low = low;  
    }  

    public Double getVolume() {  
        return volume;  
    }  

    public void setVolume(Double volume) {  
        this.volume = volume;  
    }  

    public Double getHigh() {  
        return high;  
    }  

    public void setHigh(Double high) {  
        this.high = high;  
    }  
}
Enter fullscreen mode Exit fullscreen mode

Read in historical data

In addition to our real-time stream, we’ll bring in some historical stock data for context and longer-term trend analysis, showing off a few of MongoDB's aggregations specifically for analysing trends in financial data.

We’re using the Stock Market: Historical Data of Top 10 Companies dataset from Kaggle. It contains 25,161 rows, each representing stock market activity for a major company on a specific date between July 2013 and July 2023. The dataset includes daily open, high, low, close prices, and trading volumes for companies like Apple, Starbucks, Microsoft, Cisco, Qualcomm, Meta, Amazon, Tesla, AMD, and Netflix.

Download the dataset and place it in your project’s resources directory with the filename data.csv.

Creating a repository for historical data

To load this data into MongoDB, we’ll use a MongoRepository just like we did for live data.

Create a repository package and add an interface called FinancialRepository:

package com.mongodb.financialaggregator.repository;

import com.mongodb.financialaggregator.model.StockMarketData;
import org.bson.types.ObjectId;
import org.springframework.data.mongodb.repository.MongoRepository;

import java.util.Date;
import java.util.List;

public interface FinancialRepository extends MongoRepository<StockMarketData, ObjectId> {
    List<StockMarketData> findAllByCompanyAndDateBetween(String company, Date startDate, Date endDate);
}
Enter fullscreen mode Exit fullscreen mode

This interface gives us access to save() and saveAll() for inserting data, and we’ve also added a custom method—

findAllByCompanyAndDateBetween(...)—which Spring Data will automatically implement for us. This will be handy when calculating things like SMA and EMA over a time range.

Reading and importing the CSV

Now, let’s read the historical CSV and insert it into MongoDB.

Create a class called ReadInHistoricalData in your service package:

package com.mongodb.financialaggregator.service;

import com.mongodb.financialaggregator.model.StockMarketData;
import com.mongodb.financialaggregator.repository.FinancialRepository;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Service;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

@Service
public class ReadInHistoricalData implements ApplicationRunner {

    private final FinancialRepository financialRepository;
    private static final int BATCH_SIZE = 500; // Insert in chunks for performance

    public ReadInHistoricalData(FinancialRepository financialRepository) {
        this.financialRepository = financialRepository;
    }

    @Override
    public void run(org.springframework.boot.ApplicationArguments args) {
        loadData();
    }

    public void loadData() {
        String fileName = "data.csv";

        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(new ClassPathResource(fileName).getInputStream()))) {

            // Skip header
            reader.readLine();

            List<StockMarketData> batch = new ArrayList<>();
            String line;

            while ((line = reader.readLine()) != null) {
                StockMarketData data = parseCsvLine(line);
                if (data != null) {
                    batch.add(data);
                }

                if (batch.size() == BATCH_SIZE) {
                    financialRepository.saveAll(batch);
                    batch.clear();
                }
            }

            // Save any remaining data
            if (!batch.isEmpty()) {
                financialRepository.saveAll(batch);
            }

            System.out.println("Data import completed successfully.");
        } catch (Exception e) {
            System.err.println("Error loading CSV: " + e.getMessage());
            e.printStackTrace();
        }
    }

    private StockMarketData parseCsvLine(String line) {
        try {
            String[] fields = line.split(",");

            if (fields.length < 7) {
                System.err.println("Skipping invalid line: " + line);
                return null;
            }

            String company = fields[0].trim();
            Date date = parseDate(fields[1].trim());
            Double closeLast = parseDouble(fields[2]);
            Double volume = parseDouble(fields[3]);
            Double open = parseDouble(fields[4]);
            Double high = parseDouble(fields[5]);
            Double low = parseDouble(fields[6]);

            if (date == null) {
                System.err.println("Skipping row due to invalid date: " + line);
                return null;
            }

            return new StockMarketData(date, company, closeLast, open, low, volume, high);
        } catch (Exception e) {
            System.err.println("Error parsing line: " + line);
            return null;
        }
    }

    private Date parseDate(String dateStr) {
        String[] dateFormats = {"MM/dd/yyyy", "MM-dd-yyyy"};

        for (String format : dateFormats) {
            try {
                return new SimpleDateFormat(format, Locale.US).parse(dateStr);
            } catch (ParseException ignored) {}
        }

        System.err.println("Unrecognized date format: " + dateStr);
        return null;
    }

    private Double parseDouble(String value) {
        try {
            return Double.parseDouble(value.replace("$", "").trim());
        } catch (NumberFormatException e) {
            return null;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

This does a few important things:

  • It uses ApplicationRunner to run once at startup and load the data (which we’ll remove later).

  • It reads the data.csv file from resources using a buffered reader.

  • It parses and batches records into chunks of 500 for efficient writes.

  • It handles some common data parsing issues (like dollar signs and inconsistent date formats).

Tip: One-time data load

Once the data is successfully loaded into MongoDB, we can prevent it from running again on every startup by removing the implements ApplicationRunner line and deleting this method:

@Override
public void run(org.springframework.boot.ApplicationArguments args) {
    loadData();
}
Enter fullscreen mode Exit fullscreen mode

This will leave us with a reusable loadData() method we can call manually if needed.

Custom financial repository

We already have a basic repository for loading data. But now we want to calculate things like moving averages, and that’s where things get a little more advanced. Instead of fetching raw documents and calculating these averages manually in memory in Java, we’re going to lean on MongoDB’s aggregation framework to do the heavy lifting for us.

Let’s start by creating a few records in our model package.

First, we'll define a simple record for the company names. Create a record called CompanyDTO:

package com.mongodb.financialaggregator.model;  

public record CompanyDTO(String name) {}
Enter fullscreen mode Exit fullscreen mode

We'll use this to retrieve a list of unique company names for our front end, that we can use to filter our stock information for graphing.

Company select drop down

Next, we need a record called SimpleMovingAverageDTO:

package com.mongodb.financialaggregator.model;  

import java.util.Date;  

public record SimpleMovingAverageDTO(double sma, Date date) {}
Enter fullscreen mode Exit fullscreen mode

Lastly, let’s define a DTO first called ExponentialMovingAverageDTO:

package com.mongodb.financialaggregator.model;  

import java.util.Date;  

public record ExponentialMovingAverageDTO(double ema, Date date) {}
Enter fullscreen mode Exit fullscreen mode

Up next, we'll create our interface in the repository package called CustomFinancialRepository, where we'll define our methods.

package com.mongodb.financialaggregator.repository;  

import com.mongodb.financialaggregator.model.CompanyDTO;  
import com.mongodb.financialaggregator.model.ExponentialMovingAverageDTO;  
import com.mongodb.financialaggregator.model.SimpleMovingAverageDTO;  

import java.util.Date;  
import java.util.List;  

public interface CustomFinancialRepository {  
    List<CompanyDTO> getAllCompanies();  
    List<SimpleMovingAverageDTO> getSimpleMovingAverage(String company, Date startDate, Date endDate);  
    List<ExponentialMovingAverageDTO> getExponentialMovingAverage(String company, Date startDate, Date endDate);  
}
Enter fullscreen mode Exit fullscreen mode

Now, we’ll set up our custom repository implementation. This is where we’ll drop in our aggregation logic:

package com.mongodb.financialaggregator.repository;  

import com.mongodb.financialaggregator.model.CompanyDTO;  
import com.mongodb.financialaggregator.model.ExponentialMovingAverageDTO;  
import com.mongodb.financialaggregator.model.SimpleMovingAverageDTO;  
import org.springframework.data.domain.Sort;  
import org.springframework.data.mongodb.core.MongoTemplate;  
import org.springframework.data.mongodb.core.aggregation.*;  
import org.springframework.data.mongodb.core.query.Criteria;  
import org.springframework.stereotype.Repository;  

import java.util.Date;  
import java.util.List;  

import static org.springframework.data.mongodb.core.aggregation.Aggregation.*;  

@Repository  
public class CustomFinancialRepositoryImplementation implements CustomFinancialRepository {  

    private final int EXPONENTIAL_MOVING_AVERAGE_PERIOD = 20;  
    private final int SIMPLE_MOVING_AVERAGE_PERIOD = 4;  

    private final MongoTemplate mongoTemplate;  

    public CustomFinancialRepositoryImplementation(MongoTemplate mongoTemplate) {  
        this.mongoTemplate = mongoTemplate;  
    }  

    @Override  
    public List<CompanyDTO> getAllCompanies() {  
        Aggregation getListOfUniqueCompanies = newAggregation(  
                group("company").first("company").as("name"),  
                sort(Sort.by("name"))  
        );  

        return mongoTemplate.aggregate(  
                getListOfUniqueCompanies, "Historical_Data", CompanyDTO.class  
        ).getMappedResults();  
    }  

}
Enter fullscreen mode Exit fullscreen mode

The getAllCompanies() method grabs all unique companies in our Historical_Data collection. We’re grouping by the "company" field and grabbing the first one per group. It’s a clean way to get a list of available stock symbols to work with.

We also define some values we'll use for windows when calculating our moving averages.

    private final int EXPONENTIAL_MOVING_AVERAGE_PERIOD = 20;  
    private final int SIMPLE_MOVING_AVERAGE_PERIOD = 4;  
Enter fullscreen mode Exit fullscreen mode

Now, let’s move into the actual indicators.

Simple Moving Average

A Simple Moving Average (SMA) is what it sounds like. It takes the average closing price over a rolling window. This smooths out daily fluctuations and helps us see the trend over time.

Now, the actual implementation. We will add the following method to our CustomFinancialRepositoryImplementation implementation:

    @Override  
    public List<SimpleMovingAverageDTO> getSimpleMovingAverage(String company, Date startDate, Date endDate) {  
        Criteria criteria = Criteria  
                .where("company").is(company)  
                .and("date").gte(startDate).lte(endDate);  

        // Define SMA calculation (5-day moving average)  
        SetWindowFieldsOperation smaOperation = SetWindowFieldsOperation.builder()  
                .partitionBy("company") // Ensures calculations are per company  
                .sortBy(Sort.by(Sort.Direction.ASC, "date")) // Sort ascending by Date  
                .output(AccumulatorOperators.Avg.avgOf("closeLast")) // Compute SMA using avg()  
                .within(SetWindowFieldsOperation.Windows.documents(-SIMPLE_MOVING_AVERAGE_PERIOD, 0)) // Defines a 5-day window (4 previous + current)  
                .as("sma") // Output field named "sma"  
                .build();  

        Aggregation aggregation = newAggregation(  
                match(criteria),  
                smaOperation,  
                project("sma").and("date").as("date") // Keep only SMA and Date fields  
        );  

        return mongoTemplate.aggregate(  
                aggregation, "Historical_Data", SimpleMovingAverageDTO.class  
        ).getMappedResults();  
    }  
Enter fullscreen mode Exit fullscreen mode

We’re using MongoDB’s $setWindowFields operator here. This allows us to do rolling window calculations directly inside the database. We’re asking it to average the closeLast value for each document, across a sliding five-day window (the current doc plus the four before it), for a single company.

The result? We get a list of dates with a corresponding sma value.

Simple Moving Average graph

Exponential Moving Average

The Exponential Moving Average is similar to the SMA, but instead of giving every day equal weight, it weights recent days more heavily. This makes the EMA a bit more reactive to recent price movement, which traders often prefer.

And now, let’s add the method that calculates it to our CustomFinancialRepositoryImplementation:

    @Override  
    public List<ExponentialMovingAverageDTO> getExponentialMovingAverage(String company, Date startDate, Date endDate) {  
        Criteria criteria = Criteria  
                .where("company").is(company)  
                .and("date").gte(startDate).lte(endDate);  

        SetWindowFieldsOperation emaOperation = SetWindowFieldsOperation.builder()  
                .partitionBy("company")  
                .sortBy(Sort.by(Sort.Direction.ASC, "date"))  
                .output(AccumulatorOperators.ExpMovingAvg.expMovingAvgOf("closeLast").n(EXPONENTIAL_MOVING_AVERAGE_PERIOD))  
                .as("ema")  
                .build();  

        Aggregation aggregation = newAggregation(  
                match(criteria),  
                emaOperation,  
                project("ema").and("date").as("date")  
        );  

        return mongoTemplate.aggregate(  
                aggregation, "Historical_Data", ExponentialMovingAverageDTO.class  
        ).getMappedResults();  
    }  
Enter fullscreen mode Exit fullscreen mode

This is almost identical to the SMA implementation, except we swap out $avgOf for $expMovingAvgOf, and give it a period (n) to define how far back to weigh things. MongoDB handles the weighting math, so we don’t need to.

Exponential Moving Average graph

Let’s test it

Now, with the aggregations done, all we need is a service to expose these methods to our controllers. This is the part of the app that acts as a go-between: It calls our repositories, applies any business logic (if needed), and returns results to the controller so they can be exposed through our API.

Update our FinancialService class

Here’s the full and final FinancialService class:

package com.mongodb.financialaggregator.service;  

import com.mongodb.financialaggregator.model.*;  
import com.mongodb.financialaggregator.repository.CustomFinancialRepository;  
import com.mongodb.financialaggregator.repository.FinancialRepository;  
import com.mongodb.financialaggregator.repository.LiveDataRepository;  
import org.springframework.stereotype.Service;  

import java.util.Date;  
import java.util.List;  

@Service  
public class FinancialService {  

    private FinancialRepository financialRepository;  
    private CustomFinancialRepository customFinancialRepository;  
    private LiveDataRepository liveDataRepository;  

    public FinancialService(CustomFinancialRepository customFinancialRepository,  
                            FinancialRepository financialRepository,  
                            LiveDataRepository liveDataRepository) {  
        this.customFinancialRepository = customFinancialRepository;  
        this.financialRepository = financialRepository;  
        this.liveDataRepository = liveDataRepository;  
    }  

    public List<String> getAllCompanies() {  
        List<CompanyDTO> companiesList = customFinancialRepository.getAllCompanies();  

        return companiesList.stream()  
                .map(CompanyDTO::name)  
                .toList();  
    }  

    public List<StockMarketData> getStockDataBetweenDates(String company, Date startDate, Date endDate) {  
        return financialRepository.findAllByCompanyAndDateBetween(company, startDate, endDate);  
    }  

    public List<SimpleMovingAverageDTO> getSimpleMovingAverage(String companyName, Date startDate, Date endDate) {  
        return customFinancialRepository.getSimpleMovingAverage(companyName, startDate, endDate);  
    }  

    public List<ExponentialMovingAverageDTO> getExponentialMovingAverage(String companyName, Date startDate, Date endDate) {  
        return customFinancialRepository.getExponentialMovingAverage(companyName, startDate, endDate);  
    }  

    public List<LiveStockData> getLiveData() {  
        return liveDataRepository.findAll();  
    }  
}
Enter fullscreen mode Exit fullscreen mode

We've added our new repositories to the constructor, so we can access the methods. We've also called the methods we created in these repositories:

  • getAllCompanies() fetches a list of unique company names from our historical dataset.
  • getStockDataBetweenDates(...) pulls raw historical records between two dates, no aggregations applied.
  • getSimpleMovingAverage(...) and getExponentialMovingAverage(...) call the aggregation pipelines we built in the custom repository.

Adding our endpoints

Let's go back to the FinancialController we built earlier and add some endpoints for our new methods.

This controller will expose everything we’ve been working on—historical data queries, technical indicators, and even our live Kafka-driven data.

package com.mongodb.financialaggregator.controller;  

import com.mongodb.financialaggregator.model.ExponentialMovingAverageDTO;  
import com.mongodb.financialaggregator.model.LiveStockData;  
import com.mongodb.financialaggregator.model.SimpleMovingAverageDTO;  
import com.mongodb.financialaggregator.model.StockMarketData;  
import com.mongodb.financialaggregator.service.FinancialService;  
import org.springframework.format.annotation.DateTimeFormat;  
import org.springframework.web.bind.annotation.*;  

import java.util.Date;  
import java.util.List;  

@CrossOrigin(origins = "http://localhost:5173") // Adjust if needed  
@RestController  
@RequestMapping(path = "/stocks")  
public class FinancialController {  
    private final FinancialService financialService;  

    public FinancialController(FinancialService financialService) {  
        this.financialService = financialService;  
    }  

    // test  
    // GET http://localhost:8080/stocks/companies    // curl -v "http://localhost:8080/stocks/companies"    @GetMapping(value = "/companies", produces = "application/json")  
    public List<String> getAllCompanies() {  
        return financialService.getAllCompanies();  
    }  

    // test  
    // GET http://localhost:8080/stocks/companyStocksBetween?company=AAPL&startDate=2022-17-10&endDate=2023-07-17    // curl -v "http://localhost:8080/stocks/companyStocksBetween?company=AAPL&startDate=2022-07-17&endDate=2023-07-17"    @GetMapping(value = "/companyStocksBetween", produces = "application/json")  
    public List<StockMarketData> getStockDataBetweenDates(  
            @RequestParam String company,  
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) Date startDate,  
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) Date endDate) {  

        return financialService.getStockDataBetweenDates(company, startDate, endDate);  
    };  

    // test  
    // GET http://localhost:8080/stocks/sma?company=AAPL&startDate=2023-07-10&endDate=2023-07-17    // curl -v "http://localhost:8080/stocks/sma?company=AAPL&startDate=2022-07-10&endDate=2023-07-17"    @GetMapping(value = "/sma", produces = "application/json")  
    public List<SimpleMovingAverageDTO> getSmaForCompany(  
            @RequestParam String company,  
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) Date startDate,  
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) Date endDate) {  

        return financialService.getSimpleMovingAverage(company, startDate, endDate);  
    };  

    // test  
    // GET http://localhost:8080/stocks/ema?company=AAPL&startDate=2023-07-10&endDate=2023-07-17    // curl -v "http://localhost:8080/stocks/sma?company=AAPL&startDate=2023-07-10&endDate=2023-07-17"    @GetMapping(value= "/ema", produces = "application/json")  
    public List<ExponentialMovingAverageDTO> getEmaForCompany(  
            @RequestParam String company,  
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) Date startDate,  
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) Date endDate) {  

        return financialService.getExponentialMovingAverage(company, startDate, endDate);  
    };  

    // test  
    // GET http://localhost:8080/stocks/live    // curl -v "http://localhost:8080/stocks/live"    @GetMapping(value= "/live", produces = "application/json")  
    public List<LiveStockData> getLiveData() {  
        return financialService.getLiveData();  
    };  

}
Enter fullscreen mode Exit fullscreen mode

Every method here maps directly to one in FinancialService. We keep this controller thin—just parsing request parameters, calling the service, and returning the result.

Let's walk through the new endpoints we just added.

    public List<String> getAllCompanies() {  
        return financialService.getAllCompanies();  
    }  
Enter fullscreen mode Exit fullscreen mode

This endpoint returns a list of all the unique companies in our historical dataset.

Behind the scenes, the FinancialService calls a MongoDB aggregation that groups documents by the company field, sorts them alphabetically, and maps them into a CompanyDTO. This method simply extracts the .name field from each of those records and returns the result as a List<String>.

    public List<StockMarketData> getStockDataBetweenDates(  
            @RequestParam String company,  
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) Date startDate,  
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) Date endDate) {  

        return financialService.getStockDataBetweenDates(company, startDate, endDate);  
    };  
Enter fullscreen mode Exit fullscreen mode

This one gives us the raw historical stock data for a given company between two dates.

  • The @RequestParam annotations pull values straight from the query string.
  • The @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) ensures that Spring can parse the date strings correctly, e.g., 2023-01-01.
  • The FinancialService calls a Spring Data repository method that uses a MongoDB query filter: company == ... and date >= startDate && <= endDate.

We'll use this to populate the candlestick graph in the front end.

Candlestick graph for historical data

    public List<SimpleMovingAverageDTO> getSmaForCompany(  
            @RequestParam String company,  
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) Date startDate,  
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) Date endDate) {  

        return financialService.getSimpleMovingAverage(company, startDate, endDate);  
    };  
Enter fullscreen mode Exit fullscreen mode

This endpoint returns a Simple Moving Average for a given company over a specific date range.

This method uses the MongoDB aggregation pipeline we set up to compute a rolling average of the closeLast field, and returns an array we can plot as a line graph.

Each record in the response includes:

  • A sma value (the average).
  • The date it corresponds to.
    public List<ExponentialMovingAverageDTO> getEmaForCompany(  
            @RequestParam String company,  
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) Date startDate,  
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) Date endDate) {  

        return financialService.getExponentialMovingAverage(company, startDate, endDate);  
    };  
Enter fullscreen mode Exit fullscreen mode

This one’s similar to the SMA endpoint but calculates an EMA instead.

The key difference is that recent values are weighted more heavily, but it still returns that array we can plot as a line graph.

This is useful for traders looking for momentum shifts or for graphing smoother, more sensitive trendlines.

Running our application

If you’re using Kafka 3.x with KRaft mode (no ZooKeeper), you must initialize your Kafka log directory with kafka-storage.sh format before starting the server. This only needs to be done once per broker.

Feel free to check a full detailed walkthrough on getting started with Kafka, but we’ll just do what we need to get started.

Choose any UUID for your cluster (or generate one). You can generate one like this:

CLUSTER_ID=$(bin/kafka-storage.sh random-uuid) echo $CLUSTER_ID
Enter fullscreen mode Exit fullscreen mode

Run this command to format the KRaft storage:

bin/kafka-storage.sh format -t $CLUSTER_ID -c config/kraft/server.properties
Enter fullscreen mode Exit fullscreen mode

This writes the metadata to the log dirs and makes the broker startable.

Now, start the Kafka server:

bin/kafka-server-start.sh config/kraft/server.properties
Enter fullscreen mode Exit fullscreen mode

You should see logs like: INFO [KafkaServer id=0] started (kafka.server.KafkaServer).

You can now run your Spring application, and after we give some time for everything to start up, we should start to see some of our logs:

Received from Kafka: com.mongodb.financialaggregator.model.LiveStockData@2b0aaf9c
========== STOCK DEBUG INFO ==========
Company:       DOOF
Date (raw):    Sun Jun 22 00:00:00 IST 2025
Date (class):  java.util.Date
Open:          111.12
Close/Last:    115.26
High:          123.71
Low:           102.81
Volume:        1054.14
======================================
Sent successfully.

Received from Kafka: com.mongodb.financialaggregator.model.LiveStockData@398892bb
========== STOCK DEBUG INFO ==========
Company:       DOOF
Date (raw):    Mon Jun 23 00:00:00 IST 2025
Date (class):  java.util.Date
Open:          116.53
Close/Last:    111.57
High:          127.41
Low:           106.09
Volume:        1282.71
======================================
Sent successfully.

Received from Kafka: com.mongodb.financialaggregator.model.LiveStockData@31ca10ba
========== STOCK DEBUG INFO ==========
Company:       DOOF
Date (raw):    Tue Jun 24 00:00:00 IST 2025
Date (class):  java.util.Date
Open:          112.77
Close/Last:    117.03
High:          128.63
Low:           106.69
Volume:        1418.38
======================================
Sent successfully.
Enter fullscreen mode Exit fullscreen mode

And we can verify this in our MongoDB collection. Our Historical_Data collection will take a little longer to populate, but it shouldn't be more than a few seconds until we start to see data in our Live_Data.

If we want to test out our endpoints, spin up the React front end I provided in the GitHub repo that allows us to look at our data and aggregations, graphed.

If you want to test out the endpoints manually, we can run these curl commands:

# All companies
curl -v http://localhost:8080/stocks/companies

# Historical data for AAPL
curl -v "http://localhost:8080/stocks/companyStocksBetween?company=AAPL&startDate=2023-01-01&endDate=2023-02-01"

# SMA
curl -v "http://localhost:8080/stocks/sma?company=AAPL&startDate=2023-01-01&endDate=2023-02-01"

# EMA
curl -v "http://localhost:8080/stocks/ema?company=AAPL&startDate=2023-01-01&endDate=2023-02-01"

# Live data
curl -v http://localhost:8080/stocks/live

Enter fullscreen mode Exit fullscreen mode

Conclusion

This isn’t just another toy project pretending to be real-time. We’ve built a proper streaming architecture—one that ingests live data, enriches it on the fly with RSI, persists it with zero ceremony using Spring Data MongoDB, and exposes it through a clean, RESTful interface. On the historical side, we’ve leaned on MongoDB’s aggregation framework to calculate indicators like SMA and EMA directly in the database, avoiding the overhead of processing everything in memory or shipping it off to a separate analytics engine.

If you want to learn more about what can be done with MongoDB and Kafka, check out my tutorial, Building a Real-Time AI Fraud Detection System with Spring Kafka and MongoDB, or learn how to create an app in Demand Forecasting With MongoDB Time Series and Edge-Server over on Medium.

Top comments (2)

Collapse
 
cwrite profile image
Christopher Wright

This is a really comprehensive guide, thanks for sharing! Out of curiosity, are there any challenges you faced when integrating the Kafka Streams calculations with real-time MongoDB writes, or did Spring abstract most of that complexity away?

Collapse
 
timkelly profile image
Tim Kelly • Edited

Thanks! Spring really helps you get something solid up and running fast, I leaned into that here to keep the setup clean and approachable. That said, there’s definitely room to optimize: batching writes, handling retries more robustly, or offloading RSI state to Kafka state stores. For the scope of this demo, I went with the simplest path that still felt production-worthy