• 3 min read

Introducing the #Azure #CosmosDB Change Feed Processor Library

To help you build powerful applications on top of Cosmos DB, we built change feed support, which provides a sorted list of documents within a collection in the order in which they were modified. Now, to address scalability while preserving simplicity of use, we introduce the Cosmos DB Change Feed Processor Library. In this blog, we look at when and how you should use Change Feed Processor Library.

Azure Cosmos DB is a fast and flexible globally-replicated database service that is used for storing high-volume transactional and operational data with predictable millisecond latency for reads and writes. To help you build powerful applications on top of Cosmos DB, we built change feed support, which provides a sorted list of documents within a collection in the order in which they were modified. Now, to address scalability while preserving simplicity of use, we introduce the Cosmos DB Change Feed Processor Library. In this blog, we look at when and how you should use Change Feed Processor Library.

Change feed: Event Sourcing with Cosmos DB

Storing your data is just the beginning of the adventure. With change feed support, you can integrate with many different services depending on what you need to do once changes appear.

Example #1: You are building an online shopping website and need to trigger an email notification once a customer completes a purchase. Whether you prefer to use Azure Functions, Azure Notification Hub, Azure App Services, or your custom-built micro services, change feed allows seamless integration by surfacing changes in the order that they occur.

Example #2: You are storing data from an autonomous vehicle and need to detect abnormalities in incoming sensor data. As new entries are stored in Cosmos DB, these changes that appear on the change feed can be directly processed by Azure HDInsight, Apache Spark, or Apache Storm. With change feed support, you can apply intelligent processing in real-time while data is stored into Cosmos DB.

Example #3: Due to architecture changes, you need to change the partition key for your Cosmos DB collection. Change feed allows you to move your data to a new collection while processing incoming changes. The result is zero down time while you move data from anywhere to Cosmos DB.
 

changefeedoverview_white

What about working with larger data storage with multiple partitions?

As your data storage needs grow, it’s likely that you will use multiple partitions to store your data. Although it’s possible to manually read changes from each partition, the Change Feed Processor makes it easier by abstracting the change feed API. This function facilitates the reading across partitions and distributes change feed event processing across multiple consumers. This library provides a thread-safe, multi-process, safe runtime environment with checkpoint and partition lease management for change feed operations. The Change Feed Processor Library is available as a NuGet package for .NET development.

When to use Change Feed Processor Library:

  • Pulling updates from the change feed when data is stored across multiple partitions
  • Moving or replicating data from one collection to another
  • Parallel execution of actions triggered by updates to data and the change feed

Getting started with the Change Feed Processor Library is simple and lightweight. In the following example, we have a collection of documents containing news events associated with different cities. We use “city” as the partition key. In just a few steps, we can print out all changes made to any document from any partition.

To set this up, install the Change Feed Processor Library Nuget package and create a lease collection. The lease collection should be created through an account close to the write region. This collection will keep track of change feed reading progress per partition and host information.
 

azure-cosmos-db-2

To define the logic performed when new changes surface, edit the ProcessChangesAsync function. Here, we are simply printing out the document ID of the new or updated document. You can also modify this function to perform different tasks.

 

public Task ProcessChangesAsync(ChangeFeedObserverContext context, IReadOnlyList docs)
        {
            Console.WriteLine("Change feed: total {0} doc(s)", Interlocked.Add(ref totalDocs, docs.Count));
            foreach (Document doc in docs)
            {
                Console.WriteLine(doc.Id.ToString());
            }

            return Task.CompletedTask;
        }

 

Next, to begin the Change Feed Processor, instantiate ChangeFeedProcessorHost, providing the appropriate parameters for your Azure Cosmos DB collections. Then, call RegisterObserverAsync to register your IChangeFeedObserver (DocumentFeedObserver in this example) implementation with the runtime. At this point, the host attempts to acquire a lease on every partition key range in the Azure Cosmos DB collection using a “greedy” algorithm. These leases last for a given timeframe and must then be renewed. As new nodes come online, in this case worker instances, they place lease reservations. Over time the load shifts between nodes as each host attempts to acquire more leases.

 

DocumentFeedObserver docObserver = new DocumentFeedObserver();

ChangeFeedEventHost host = new ChangeFeedEventHost(hostName, documentCollectionLocation, leaseCollectionLocation, feedOptions, feedHostOptions);

await host.RegisterObserverAsync(docObserverFactory);

 

Next steps

Stay up-to-date on the latest Azure Cosmos DB news and features by following us on Twitter @AzureCosmosDB and #CosmosDB, and reach out to us on the developer forums on Stack Overflow.