Spring over navigation

  Azure Cosmos DB BulkExecutor library for .NET

The Azure Cosmos DB BulkExecutor library for .NET acts as an extension library to the Cosmos DB .NET SDK and provides developers out-of-the-box functionality to perform bulk operations in Azure Cosmos DB.

---------------------------------------## Consuming the Microsoft Azure Cosmos DB BulkExecutor .NET library

This project includes samples, documentation and performance tips for consuming the BulkExecutor library. You can download the official public NuGet package from here.

---------------------------------------## Configurable parameters

  • enableUpsert : A flag to enable upsert of the documents if document with given id already exists - default value is false.
  • disableAutomaticIdGeneration : A flag to disable automatic generation of id if absent in the document - default value is true.
  • maxConcurrencyPerPartitionKeyRange : The maximum degree of concurrency per partition key range, setting to null will cause library to use default value of 20.
  • maxInMemorySortingBatchSize : The maximum number of documents pulled from the document enumerator passed to the API call in each stage for in-memory pre-processing sorting phase prior to bulk importing, setting to null will cause library to use default value of min(documents.count, 1000000).
  • cancellationToken : The cancellation token to gracefully exit bulk import.

---------------------------------------## Getting started with bulk import

  • Initialize DocumentClient set to Direct TCP connection mode csharp ConnectionPolicy connectionPolicy = new ConnectionPolicy { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp }; DocumentClient client = new DocumentClient( new Uri(endpointUrl), authorizationKey, connectionPolicy)

  • Initialize BulkExecutor with high retry option values for the client SDK and then set to 0 to pass congestion control to BulkExector for its lifetime ```csharp // Set retry options high during initialization (default values). client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30; client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;

IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection); await bulkExecutor.InitializeAsync();

// Set retries to 0 to pass complete control to bulk executor. client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0; client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0; ```

  • Call BulkImportAsync API csharp BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync( documents: documentsToImportInBatch, enableUpsert: true, disableAutomaticIdGeneration: true, maxConcurrencyPerPartitionKeyRange: null, maxInMemorySortingBatchSize: null, cancellationToken: token);

You can find the complete sample application program consuming the bulk import API here - which generates random vertices and edges to be then bulk imported into an Azure Cosmos DB collection. You can configure the application settings in appSettings here.

You can download the Microsoft.Azure.CosmosDB.BulkExecutor nuget package from here.

---------------------------------------## API implementation details

When a bulk import API is triggered with a batch of documents, on the client-side, they are first shuffled into buckets corresponding to their target Cosmos DB partition key range. Within each partiton key range bucket, they are broken down into mini-batches and each mini-batch of documents acts as a payload that is committed transactionally.

We have built in optimizations for the concurrent execution of these mini-batches both within and across partition key ranges to maximally utilize the allocated collection throughput. We have designed an AIMD-style congestion control mechanism for each Cosmos DB partition key range to efficiently handle throttling and timeouts.

These client-side optimizations augment server-side features specific to the BulkExecutor library which together make maximal consumption of available throughput possible.

---------------------------------------## Troubleshooting

  1. Slow Ingestion rate:

    • Check the distance between the client location and the Azure region where the database is hosted.
    • Check the configured throughput, ingestion can be slow if the tool is getting throttled. It is recommended that you increase the RU/s during ingestion and then scale it down later. This can be done programmatically via the ReplaceOfferAsync() API.
    • Use a client with high memory, otherwise GC pressure might interrupt the ingestion.
    • Turn server GC on.
    • Do you have fixed collection/graph (10GB)? Ingestion can be a bit slower for such collection compared to unlimited collection/graph. Ingestion to a unlimited collection/graph is faster as multiple partitions can be filled in parallel, while a single partition is filled in a serial fashion. If you need even faster ingestion for fixed collection, you can partition your data locally and make multiple parallel calls to the bulk import API.
  2. Are you seeing these exceptions:

    • Resource already exists: This means a vertex or edge with same unique key (see key concepts 4.) is already present.
    • Provide partition key while adding vertices/edges: For an unlimited graph, you must provide the partition key as a property of the vertex (or partition key property of the source and destination vertex of an edge).
    • Request Rate is Too Large: If you are trying to bulk ingest, while a significant workload is running on the same graph, the tool might get throttled. While the tool can handle such throttling to some extent, and prolonged period of throttling might lead the tool to give up.
    • Request size is too large: A CosmosDB vertex and edge can have a maximum size of 2MB (please contact the team if you need bigger vertices/edges).
    • Out of Memory Exception: If you have a large number of partitions, and are ingesting a lot of data, the tool would require more memory to operate. We recommend moving with a machine with higher memory. Alternatively, you can split the workload and put multiple machines to work.

--------------------------------------- ## Other relevant projects