Building a data processing pipeline with the TPL dataflow

Hackered
Friday, October 23, 2015
by Sean McAlinden

One of the applications I am currently working with has to perform a great deal of in-memory statistical analysis on the fly.

A key requirement of this application is similar to most applictions nowadays: it has to be fast.

Whilst there are many ways to add either real or perceived speed to an application, improving the code efficiency with the TPL library is a great place to start.

The initial round of improvements were achieved using standard go-to TPL methods such as Parallel.For and Parallel.ForEach however the problem area feels like a great fit for a TPL dataflow pipline.

In this post I am going to create a representative flow to demonstrate an interesting and fruitful avenue when dealing with data processing and analysis (the actual flow is probably too involved for a blog post).

The flow

The representative flow will be:

  1. Retrieve a large bunch of raw monthly numerical data
  2. For each year retrieve a mean average across all rows
  3. Sort all results into year order descending

The pipeline entry point

This is the top level entry point to the pipeline, we simply get the pipeline, post a bunch of values into it and wait for it to complete.

public class YearlyAverageCalculationService
{
    private readonly IRepository repository;
    private readonly IYearlyAveragePipeline yearlyAveragePipeline;

    public YearlyAverageCalculationService(
        IRepository repository, 
        IYearlyAveragePipeline yearlyAveragePipeline)
    {
        this.repository = repository;
        this.yearlyAveragePipeline = yearlyAveragePipeline;
    }

    public async Task Process()
    {
        var pipeline = yearlyAveragePipeline.GetPipeline();

        for (var i = 0; i < 100; i++)
        {
            var monthlyValues = repository.GetData(i);
            pipeline.Post(monthlyValues);
        }

        await yearlyAveragePipeline.Completion();
    }
}

Now let's fill in the detail...

The repository

We will get the raw data for our calculations from a repository class.

The repository can retrieve a years worth of monthly values per page:

public interface IRepository
{
    List<MonthlyValue> GetData(int pageNumber);

    void SaveYearlyAverages(List<YearlyAverage> yearlyAverages);
}

public class Repository : IRepository
{
    public List<MonthlyValue> GetData(int pageNumber)
    {
        var monthlyValues = new List<MonthlyValue>();

        var year = 2015 - pageNumber;
        var seed = pageNumber / 100;

        for (var monthIndex = 1; monthIndex <= 12; monthIndex++)
        {
            var monthValue = new MonthlyValue
            {
                DateTime = new DateTime(year, monthIndex, 1),
                Value = seed + monthIndex
            };
            monthlyValues.Add(monthValue);
        }

        return monthlyValues;
    }

    public void SaveYearlyAverages(List<YearlyAverage> yearlyAverages)
    {
        // ommitted
    }
}

The raw numeric format is represented by the MonthlyValue class:

public class MonthlyValue
{
    public DateTime DateTime { get; set; }

    public decimal? Value { get; set; }
}