DEV Community

MongoDB Guests
MongoDB Guests

Posted on

MongoDB Update With Aggregation Pipelines In C#

This post was written by guest author Markus Wildgruber

The previous articles of this series put their focus on how to use MongoDB aggregations with C# to summarize data. This article highlights how you can use aggregations to update documents with aggregation pipelines and use expressions to calculate new values.

If you have already updated documents in MongoDB, you know that there are many update operators that allow you to:

  • Set values (always or only when inserting a new document).
  • Increase values.
  • Assign maximum or minimum values.
  • Add values to arrays or sets.

The MongoDB C# driver provides an implementation for all of these operators so that you can use them easily when updating or upserting documents in your database. While these operators cover the main update scenarios, sooner or later, you might encounter update requirements that build upon the current state of the document and calculate the new values from expressions, for instance:

  • Updating an average.
  • Handling sub-documents in an array, e.g., adding a new element if it does not exist yet or updating the existing sub-document.
  • Conditionally assigning a value.

Of course, you can solve this by first retrieving the document and calculating the new values in your application code. In a situation with a very low number of changes, this is a viable approach. Keep in mind that you need to use multi-document transactions in order to ensure that the result of the operation is consistent if multiple update operations are executed in parallel.

From a performance point of view, it would clearly be better to update the documents with a single request. Also, as MongoDB write operations on a single document are atomic, we would not need to use multi-document transactions anymore. This article shows exactly how to do that with MongoDB and C#.

Overview of the sample

Let's suppose we read values from a sensor and want to create documents that contain the number of readings and their average for each day. We will loosely follow the bucket pattern and start out with the following simplified classes in C#:

/// <summary>
/// Document that stores aggregated sensor readings for a day.
/// </summary>
public class BucketDocument
{
    public DateTime Id { get; set; } = DateTime.UtcNow.Date;

    public double Total { get; set; }

    public int Count { get; set; }

    public double Average { get; set; }
}

/// <summary>
/// Helper class that contains a single reading of the sensor.
/// </summary>
public class Reading
{
    public DateTime Timestamp { get; set; }

    public double Value { get; set; }
}
Enter fullscreen mode Exit fullscreen mode

As long as we only want to store Total and Count in the document and do not need a dedicated property for the Average, we could still update the document using the $inc operation (Inc in C#) and would not need an aggregation pipeline for this purpose. However, in our sample, we want to filter and sort by the average, so we need to store the value on the document.

Updating an existing document

In order to update an existing document, we can put together the following aggregation pipeline:

// Prepare data
BucketDocument doc = new()
{
    Id = DateTime.UtcNow.Date,
    Total = 12,
    Count = 3,
    Average = 4,
};

await coll.InsertOneAsync(doc);

Reading newReading = new Reading()
{
    Timestamp = DateTime.UtcNow,
    Value = 2,
};

// Create pipeline for update
PipelineDefinition<BucketDocument, BucketDocument> pipeline = new EmptyPipelineDefinition<BucketDocument>()
    .Set(x => new BucketDocument()
    {
        Total = x.Total + newReading.Value,
        Count = x.Count + 1,
    })
    .Set(x => new BucketDocument()
    {
        Average = x.Total / x.Count,
    });
// Run update
UpdateDefinition<BucketDocument> update = Builders<BucketDocument>
    .Update
    .Pipeline(pipeline);
await coll.UpdateOneAsync(
    x => x.Id == doc.Id,
    update);
Enter fullscreen mode Exit fullscreen mode

The above code first inserts a document into the database that we will update in the following lines of code. This is only used for preparation in the sample and is not necessary in a real-world scenario where the document already exists.

The pipeline as such contains two stages:

  1. Assign the new values to Total and Count (as said before, this would be possible with the operators for simple updates).
  2. Calculate the Average by dividing Total by Count. As we are inserting a new reading, we do not need to worry by dividing by zero.

This is the document after the first update:

{
  "_id": {
    "$date": "2025-05-23T00:00:00.000Z"
  },
  "Total": 14,
  "Count": 4,
  "Average": 3.5
}
Enter fullscreen mode Exit fullscreen mode

Upserting documents that might not exist yet

Please keep in mind that the above approach only works if you are updating existing documents; if we plan to upsert a document, we need to take some precautions as the properties are not initialized in a new document yet. Let's make the pipeline safe for upserts:

PipelineDefinition<BucketDocument, BucketDocument> pipeline = new EmptyPipelineDefinition<BucketDocument>()
    .Set(x => new BucketDocument()
    {
        Total = (x.Total > 0 ? x.Total : 0) + newReading.Value,
        Count = (x.Count > 0 ? x.Count : 0) + 1,
    })
    .Set(x => new BucketDocument()
    {
        Average = x.Total / x.Count,
    });
UpdateDefinition<BucketDocument> update = Builders<BucketDocument>
    .Update
    .Pipeline(pipeline);
await coll.UpdateOneAsync(
    x => x.Id == doc.Id,
    update, 
    new()
    {
        IsUpsert = true
    });
Enter fullscreen mode Exit fullscreen mode

In order to cope with null values, the first stage has been adjusted to contain a conditional expression that first checks whether Total or Count, respectively, are greater than zero and uses the value only in this case. Without this, the expression would result in null, as $add results in null if one of the operators is null or not initialized yet. The update now is run as an upsert operation so that a new document is created if there is no match for the filter expression.

Including readings in an array of sub-documents

To further expand our sample, we add a property to store the top three readings of the day along with their timestamp in an array. This array should be sorted by the value in descending order. The following code block shows the updated document class:

/// <summary>
/// Document that stores aggregated sensor readings for a day.
/// </summary>
public class BucketDocument
{
    public DateTime Id { get; set; } = DateTime.UtcNow.Date;

    public List<Reading> TopReadings { get; set; } = new();

    public double Total { get; set; }

    public int Count { get; set; }

    public double Average { get; set; }
}
Enter fullscreen mode Exit fullscreen mode

We update the pipeline to also update the TopReadings property:

PipelineDefinition<BucketDocument, BucketDocument> pipeline = new EmptyPipelineDefinition<BucketDocument>()
    .Set(x => new BucketDocument()
    {
        TopReadings = (x.TopReadings ?? new())
            .Union(new Reading[] { newReading })
            .OrderByDescending(x => x.Value)
            .Take(3)
            .ToList(),
        Total = (x.Total > 0 ? x.Total : 0) + newReading.Value,
        Count = (x.Count > 0 ? x.Count : 0) + 1,
    })
    .Set(x => new BucketDocument()
    {
        Average = x.Total / x.Count,
    });
Enter fullscreen mode Exit fullscreen mode

As before, we have to be prepared for an upsert situation where the TopReadings property is not initialized yet so that we start out with (x.TopReadings ?? new()). Afterwards, we use LINQ to first add the new element, sort by value in descending order, and take only the first three elements.

The database contains the following document after updating the document with the pipeline:

{
  "_id": {
    "$date": "2025-05-23T00:00:00.000Z"
  },
  "TopReadings": [
    {
      "Timestamp": {
        "$date": "2025-05-23T10:37:17.366Z"
      },
      "Value": 6
    },
    {
      "Timestamp": {
        "$date": "2025-05-23T11:37:17.366Z"
      },
      "Value": 5
    },
    {
      "Timestamp": {
        "$date": "2025-05-23T12:37:17.409Z"
      },
      "Value": 2
    }
  ],
  "Total": 14,
  "Count": 4,
  "Average": 3.5
}
Enter fullscreen mode Exit fullscreen mode

Going further

In this introduction, we learned how to update documents with an aggregation pipeline. As shown above, the MongoDB C# driver does a very good job translating LINQ statements to complex MongoDB statements. If there is no translation, you can also add BSON stages to the pipeline and run a custom MongoDB statement as shown in my previous article, Handling Complex Aggregation Pipelines With C#.

Keep in mind that not all types of aggregation stages are supported in a pipeline that is used for an update. For a list of supported stages, please see the documentation on updates with aggregation pipelines.

Though not mentioned explicitly, you can use aggregation pipelines also with UpdateMany or UpdateManyAsync. In this case, each document that matches the filter is transformed with the pipeline.

Updates with aggregation pipelines are a very powerful tool and allow for efficient updates with minimal requests. Have you ever used this kind of update? What is your experience? Share your thoughts in the comments!

Top comments (0)

Some comments may only be visible to logged-in visitors. Sign in to view all comments.