Using Durable Functions to bulk load data into Cosmos

Written on February 7, 2019

I’ve been working with Cosmos DB a fair bit recently, so check out my previous posts around partitioning and cross partition queries.

Bulk smash!

We have created our Cosmos DB service, database and collection (using a specified partition key), detailed link for this via CLI is here

az Cosmos DB collection create \
    --resource-group $resourceGroupName \
    --collection-name $containerName \
    --name $accountName \
    --db-name $databaseName \
    --partition-key-path /device \
    --throughput 10000

Great so now we build our .NET Core app, grab the Cosmos SDK and build a connection in code, I reused most of this from the Cosmos DB BulkExecutor library for .NET sample, which as the name suggests is a great library for importing a lot of data. I was looking for 10M+ documents for this project so needed something that could get that done in a hurry.

To Console or not to Console?

This code originally started life as a simple console app, this is the last commit before I changed tack.

It was nice and all, but it would take a far bit of time to load the documents from my machine over the network.

Yes! I know I could have used the Data Migration Tool but I would not have learnt anything, and that is what this is all about.

So I thought, what would speed this process up?

If I could run my code closer to the database I could reduce the network latency, but also wanted something that could scale out to run in parallel…Durable Functions!!

Azure Durable Functions allow you to write stateful functions in a serverless environment. I was fairly sure running a single Function to insert 10M documents would exceed the 10 minute execution limit, so what if I used Durable Functions to batch up my requests and insert 1000 documents at a time, this could scale out and run in parallel.

So we start with a HTTP trigger function that kicks off the process and builds a Durable Orchestrator which is the context the operation will run under.

[FunctionName("BulkLoader_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{
    // Function input comes from the request content.
    string instanceId = await starter.StartNewAsync("BulkLoader", null);


    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");


    return starter.CreateCheckStatusResponse(req, instanceId);
}

This our Orchestrator function, which looks for a configuration setting on BatchSize which is how many batches I want to run, in this case it is 100,000. The function then creates a Activity task for each of those, this will be the actual function that does the work. The function then waits for all the Activity tasks to complete and returns a result.

[FunctionName("BulkLoader")]
public static async Task<bool> RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context)
{
    int batchSize;
    int.TryParse(Environment.GetEnvironmentVariable("BatchSize"), out batchSize);
    var outputs = new List<bool>();
    var tasks = new Task<bool>[batchSize];
    for (int i = 0; i < batchSize; i++)
    {
        tasks[i] = context.CallActivityAsync<bool>("BulkLoader_Batch", i);
    }

    await Task.WhenAll(tasks);

    return true;
}

This is the Activity Task, the function that actually does the work with Cosmos. I use the Bulk Executor library as per above to do the insert of documents.

[FunctionName("BulkLoader_Batch")]
public static async Task<bool> Import([ActivityTrigger] int batch, ILogger log)
{
    log.LogInformation($"Prepare documents for batch {batch}");
    string partitionKey = Environment.GetEnvironmentVariable("PartitionKey");
    int docsPerBatch;
    int.TryParse(Environment.GetEnvironmentVariable("DocsPerBatch"), out docsPerBatch);
    List<string> documentsToImportInBatch = CosmosHelper.DocumentBatch(partitionKey, docsPerBatch, batch);
    await BulkImport.BulkImportDocuments(documentsToImportInBatch);
    return true;
}   

Scale to Fail

So this was great, I deployed my Function code and enabled Application Insights so I could see what was going on. The Function App started scaling out to run the import batches on multiple servers…it scaled out…and out …and out… Right about now was when I exceeded the 20K Request Units of Cosmos, so as expected the service started throttling and giving 429 response back to the Functions.

I guess I wasn’t expecting this to go so well (and catastrophically bad at the same time) so I didn’t have any retry or backoff handling in my application code. Durable Functions kept spinning up new instances and Cosmos kept throttling them, below is a screen shot from App Insights where you can see the outgoing requests match the outgoing dependency failures! This shows 82 servers running my Functions, this peaked at over 100 for a short time and really shows the power and scale of Functions. With that power comes great responsibility, which is non existent in my case!

Try, try, try again

Ok, so I added some retry logic to the Durable Function, which sorted the issue with failed requests. I also tweaked the batch size and number of documents per batch, 1000 batches of 10,000 documents seemed to be the best result, but this was balanced with how much I wanted to scale Cosmos to cope, so there is definitely a balance to find.

This is the retry logic for the new Orchestrator Function

[FunctionName("BulkLoader")]
public static async Task<bool> RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context)
{
    int batchSize;
    int.TryParse(Environment.GetEnvironmentVariable("BatchSize"), out batchSize);

    int firstRetryIntervalVar;
    int.TryParse(Environment.GetEnvironmentVariable("FirstRetrySeconds"), out firstRetryIntervalVar);

    int maxNumberOfAttemptsVar;
    int.TryParse(Environment.GetEnvironmentVariable("MaxNumberOfAttempts"), out maxNumberOfAttemptsVar);

    double backoffCoefficientVar;
    double.TryParse(Environment.GetEnvironmentVariable("BackoffCoefficient"), out backoffCoefficientVar);

    var retryOptions = new RetryOptions(
        firstRetryInterval: TimeSpan.FromSeconds(firstRetryIntervalVar),
        maxNumberOfAttempts: maxNumberOfAttemptsVar);
    retryOptions.BackoffCoefficient = backoffCoefficientVar;

    var outputs = new List<bool>();
    var tasks = new Task<bool>[batchSize];
    for (int i = 0; i < batchSize; i++)
    {
        tasks[i] = context.CallActivityWithRetryAsync<bool>("BulkLoader_Batch", retryOptions, i);
    }

    await Task.WhenAll(tasks);
    return true;
}
Written on February 7, 2019