This blog post is aimed at a technical audience and assumes prior knowledge of C#, Entity Framework Core and SQL Server.
The Investment Platform team at Moneybox is responsible for building the software that manages all of the money and assetsAn asset is anything that holds value and which can be bought and sold freely. held on behalf of our customers. These responsibilities include trading fundsFunds, also called ‘tracker funds’, are financial instruments that have been set up to match or ‘track’ the price of a market index. Investing in a fund lets you get exposure to different financial assets like shares and bonds, without having to buy them directly., calculating interest, processing deposits and withdrawals, and much more.
The system contains background jobs to perform many of its operations, often running overnight processing of large volumes of data, such as accruing daily interest for all of our cash accounts. As developers, we want to be able to quickly build these jobs without repeating boilerplate code, and utilise out of the box features to ensure appropriate error handling, concurrent processing for performance, and monitoring. To address all of these requirements we built an abstraction that we call the “Entity Enumerator”, which utilises Entity Framework Core and the power of .NET Generics to provide reusable building blocks to run all of our jobs.
Before we dive into the implementation, let’s first consider how a basic job could be written from scratch. A common approach to doing this with Entity Framework is to first select the IDs of the records you want to process, then use a new DbContext instance per batch. New context instances are required to ensure the change tracker is kept small for performance reasons and to keep a lower memory footprint. Once a batch of entities has been processed you no longer need to hold on to them and they can be garbage collected.
A job that accrues daily interest for cash accounts, including our Cash LISA and Notice Savings accounts, may look something like this:
const int batchSize = 200; var startOfDay = DateTime.UtcNow.StartOfDay(); long[] assetIds; // Select the IDs of the entities to process using (var context = _contextFactory.Create()) { assetIds = await context .Assets .Where(x => x.AssetDefinition.DoesAccrueInterest) .Where(x => x.CreatedOn < startOfDay) .Select(x => x.Id) .ToArrayAsync(); } // Group the IDs into batches based on the requested batch size var batches = assetIds .Select((id, index) => new { Id = id, Index = index, }) .GroupBy(x => x.Index / batchSize) .ToArray(); // Iterate through the batches, using a new context for each batch foreach (var batch in batches) { var assetIdsInBatch = batch.Select(x => x.Id).ToArray(); using (var context = _contextFactory.Create()) { var assets = await context .Assets .Include(x => x.AssetDefinition) .ThenInclude(x => x.InterestRates) .Where(x => assetIdsInBatch.Contains(x.Id)) .ToArrayAsync(); // Business logic to accrue interest here await context.SaveChangesAsync(); } }
The ID selection query can in some cases return hundreds of thousands of IDs – SQL Server is perfectly happy with this and with appropriate indexes is usually very fast. It’s also important to consider that our system is highly available and therefore small batches are required to reduce table locking that would introduce delays and timeouts to other queries and operations.
When you start to consider other requirements we want to have as part of all of our jobs, the amount of generic boilerplate code increases dramatically. This leads to massive amounts of code duplication and longer implementation times for the developer.
Given that our Investment Platform handles very large volumes of customer data, the following is required for all of our jobs:
- Batching logic to ensure read/write operations are quick
- Error handling and batch retries
- Concurrent processing of batches to reduce total run time
- Logging and progress reporting
Further to this, we have well over 100 background jobs running within the system, all of which need to be able to load and modify data concurrently without tripping over each other and creating ‘last one in’ situations resulting in data loss. Imagine a scenario where two different jobs load a customer’s Asset into memory at the very same instant, change the number of units held (e.g. a fund buy and sell being processed at the same time), then both save the result. This would lead to one of these operations being lost from the underlying database record as it wouldn’t know about the other concurrent operation and would overwrite the result. While unlikely to happen at exactly the same time, it can and does happen, so we need to ensure we handle this to maintain the integrity of the data.
To protect against concurrency issues we use a rowversion field on all of our entities acting as a concurrency token. The result of this is that we sometimes get concurrency exceptions bubbling out of Entity Framework, which tell us someone else has modified the entity since we first loaded it. This is the desired behaviour, but means we need to gracefully retry the batch with freshly loaded data without blowing up the entire job. Adding this retry behaviour across all of our jobs without any reusable building blocks would be time intensive and error prone.
Considering all of the requirements above, we set out to build a job runner that would be able to take any entity in our system and process them in batches. We wanted to achieve as much code reuse as possible with an elegant interface. The result was an Entity Enumerator with a fluent builder pattern to configure it:
await EntityEnumeratorBuilder .Create<Asset>(jobName: "AccrueAssetInterest", entityFrameworkContextFactory: _contextFactory) .WithBatchSize(100) .WithBatchRetryOnExceptions(typeof (DbUpdateConcurrencyException)) .WithBatchRetryLimit(3) .WithParallelProcessing(4) .WithIdQuery(dbSet => dbSet .Where(x => x.AssetDefinition.DoesAccrueInterest) .Where(x => x.CreatedOn < startOfDay) .Select(x => x.Id) ) .WithBatchQuery(dbSet => dbSet .Include(x => x.AssetDefinition) .ThenInclude(x => x.InterestRates) ) .WithBatchCallback(ProcessBatch) .Build() .RunAsync(cancellationToken); // Callback signature, using generics for the entity type private async Task ProcessBatch(Asset[] assets, DbContext context, CancellationToken cancellationToken) { // Business logic await context.SaveChangesAsync(); }
Breaking down the arguments:
ParameterDescription
Job Name | The name of a job used in logs to identify different instances. |
Entity Framework Context Factory | A factory to create a DbContext instance configured for your database. |
Batch Size | The number of entities to load into each batch. |
Batch Retry On Exceptions | Triggers a retry of a batch when any of the given exceptions are thrown. In this case we are telling the Entity Enumerator to retry the database concurrency exceptions referenced above. |
Batch Retry Limit | The number of allowed retries for each batch before the entire job fails and throws an exception.
Our retry logic is implemented using the amazing Polly library, available on NuGet. |
Parallel Processing | Spawns the given number of background threads (tasks) to process the batches concurrently. |
Id Query | A function callback using the DbSet<TEntity> to configure the selection criteria for the IDs of the entities to process. |
Batch Query | A function callback using the DbSet<TEntity> to configure any required Entity Framework includes for navigation properties on the entities loaded for the batch. |
Batch Callback | An async function callback to contain the job specific logic, provided with the entities loaded in the batch, the context instance created for the batch so further database operations can be performed including SaveChanges, and a cancellation token. |
The underlying implementation of the Entity Enumerator relies heavily on generics to give it access to the Id field. Every entity in our system uses an Int64 column as the primary key and clustered index for consistency, which then gives us the ability to generically call .Id on each entity. This is defined by the following base class and it is then used as a type constraint for TEntity on the EntityEnumeator.
public abstract class BaseEntity { public long Id { get; private set; } } public class EntityEnumerator<TEntity> where TEntity : BaseEntity { }
To select the entities IDs we want to process, we use the user defined callback (_idQuery) to add the selection criteria to the Entity Framework query:
using (var context = _contextFactory.Create()) { var dbSet = context.Set<TEntity>() entityIds = await _idQuery(dbSet) .ToArrayAsync(cancellationToken: cancellationToken); }
Now we have the entity IDs, we split them up into batches and then iterate through them using the user defined callback (_batchQuery) to add any required includes to the query:
foreach (batch in batches) { var idsInBatch = batch.Select(x => x.Id).ToArray(); // Polly retry policy configured with batch retry limit and exception handling await _pollyRetryPolicy.ExecuteAsync(async token => { using (var context = _contextFactory.Create(_enableVerboseLogging)) { var dbSet = context.Set<TEntity>() // User defined callback to add includes var baseQuery = _batchQuery(dbSet); // Load the entities var results = await baseQuery // Id is available thanks to the type constraint on BaseEntity .Where(x => idsInBatch.Contains(x.Id)) .ToArrayAsync(cancellationToken); await _processBatch(results, context, cancellationToken); } }, cancellationToken); }
We now have a reusable job runner giving us many powerful features that can be easily extended to give us new functionality across all of our jobs. For example, we have added logging to our implementation which, for every 10 batches processed, prints out the estimated time remaining. This is done by keeping a running average of how long each batch took complete and a counter for the number of completed batches:
EntityEnumerator[AccrueAssetInterest] estimated time remaining: 15 Minutes 5 Seconds, 2126 batches remaining out of 2144
The code snippets shown are a dramatic simplification of the actual implementation, however they demonstrate the approach used. For example, the code to do the parallelism requires some interesting task scheduling because if a batch fails (and exceeds its retry limit) the entire job needs to be terminated which means stopping any other parallel tasks that are running. It also requires making all of the counters and timing statistics used for reporting progress thread safe.
From a technical perspective, you can see the benefits an abstraction like this can provide, however the biggest gain we have found, considering the number of jobs we have in the system, is the amount of time saved by our team when implementing new jobs – I cannot overstate this enough. This is an extremely important factor for the fast paced environment at Moneybox.
Written by: Chris Haines, Principal Engineer
Team: Investment Platform
Related content
It's important you know
Capital at risk. All investing should be regarded as longer term. The value of your investments can go up and down, and you may get back less than you invest.
Tax treatment depends on individual circumstances and may be subject to change in the future.