Spark Connector for #CosmosDB - seamless interaction with globally-distributed, multi-model data

Publié le 7 juin, 2017

Principal Program Manager, Azure CosmosDB

image

Today, we’re excited to announce that the Spark connector for Azure Cosmos DB is now truly multi-model! As noted in our recent announcement Azure Cosmos DB: The industry’s first globally-distributed, multi-model database service, our goal is to help you write globally distributed apps, more easily, using the tools and APIs you are already familiar with. Azure Cosmos DB’s database engine natively supports SQL (DocumentDB) API, MongoDB API, Gremlin (graph) API, and Azure Table storage API. With the updated Spark connector for Azure Cosmos DB, Apache Spark can now interact with all Azure Cosmos DB data models: Documents, Tables, and Graphs.

What is Azure Cosmos DB?

Azure Cosmos DB is Microsoft's globally distributed, multi-model database service for mission-critical applications. Azure Cosmos DB provides turn-key global distribution, elastic scale out of throughput and storage worldwide, single-digit millisecond latencies at the 99th percentile, five well-defined consistency levels, and guaranteed high availability, all backed by industry-leading, comprehensive SLAs. Azure Cosmos DB automatically indexes all data without requiring you to deal with schema and index management. It is multi-model and supports document, key-value, graph, and columnar data models. As a cloud-born service, Azure Cosmos DB is carefully engineered with multi-tenancy and global distribution from the ground up.

Perform Real-time Machine Learning on Globally-Distributed Data with Apache Spark and Azure Cosmos DB

The Spark connector for Azure Cosmos DB enables real-time data science, machine learning, advanced analytics and exploration over globally distributed data in Azure Cosmos DB. Connecting Apache Spark to Azure Cosmos DB accelerates our customer’s ability to solve fast-moving data science problems, where data can be quickly persisted and queried using Azure Cosmos DB. It efficiently exploits the native Azure Cosmos DB managed indexes and enables updateable columns when performing analytics.  It also utilizes push-down predicate filtering against fast-changing globally-distributed data addressing a diverse set of IoT, data science, and analytics scenarios.

Other use-cases of Azure Cosmos DB + Spark include:

  • Streaming Extract, Transformation, and Loading of data (ETL)
  • Data enrichment
  • Trigger event detection
  • Complex session analysis and personalization
  • Visual data exploration and interactive analysis
  • Notebook experience for data exploration, information sharing, and collaboration

The Spark Connector for Azure Cosmos DB uses the Azure DocumentDB Java SDK. You can get started today and download the Spark connector from GitHub!

Working with Azure Cosmos DB Tables

Azure Cosmos DB provides the Table API for applications that need a key-value store with flexible schema, with predictable performance and global distribution. Azure Table storage SDKs and REST APIs can be used to work with Azure Cosmos DB. Azure Cosmos DB supports throughput-optimized tables (informally called "premium tables"), currently in public preview.

image

To connect Apache Spark to the Azure Cosmos DB Table API, you can use the Spark Connector for Azure Cosmos DB as follows.

// Initialization
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

val readConfig = Config(Map("Endpoint" -> "https://$tableContainer$.documents.azure.com:443/",
"Masterkey" -> "$masterkey$",
"Database" -> "$tableDatabase$",
"Collection" -> "$tableCollection$",
"SamplingRatio" -> "1.0"))

 
// Create collection connection 
val tblCntr = spark.sqlContext.read.cosmosDB(readConfig)
tblCntr.createOrReplaceTempView("tableContainer")

Once you have connected to the Table, you can create a Spark DataFrame (in the preceding example, this would be tblCntr).

// Print tblCntr DataFrame Schema
scala> tblCntr.printSchema()
root
 |-- _etag: string (nullable = true)
 |-- $id: string (nullable = true)
 |-- _rid: string (nullable = true)
 |-- _attachments: string (nullable = true)
 |-- City: struct (nullable = true)
 |    |-- $t: integer (nullable = true)
 |    |-- $v: string (nullable = true)
 |-- State: struct (nullable = true)
 |    |-- $t: integer (nullable = true)
 |    |-- $v: string (nullable = true)
 |-- $pk: string (nullable = true)
 |-- id: string (nullable = true)
 |-- _self: string (nullable = true)
 |-- _ts: integer (nullable = true)



// Run Spark SQL query against your Azure Cosmos DB table
scala > spark.sql("select `$id`, `$pk`, City.`$v` as City, State.`$v` as State from tableContainer where City.`$v` = 'Seattle'").show()
+----+-----+-------+-----+
| $id|  $pk|   City|State|
+----+-----+-------+-----+
|John|Smith|Seattle|   WA|
+----+-----+-------+-----+

You will be able quickly and easily interact with your schema and execute Spark SQL queries against your underlying Azure Cosmos DB table.

Working with Azure Cosmos DB Graphs

Azure Cosmos DB provides graph modeling and traversal APIs along with turn-key global distribution, elastic scale out of storage and throughput, <10 ms read latencies and <15 ms at the 99th percentile, automatic indexing and query, tunable consistency levels, and comprehensive SLAs including 99.99% availability. Azure Cosmos DB can be queried using Apache TinkerPop's graph traversal language, Gremlin, with seamless integration with other TinkerPop-compatible graph systems like Apache Spark GraphX.

image

To connect Apache Spark to the Azure Cosmos DB Graph, you will use the Spark Connector for Azure Cosmos DB as follows.

// Initialization
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config


// Maps
val baseConfigMap = Map(
"Endpoint" -> "https://$graphContainer$.documents.azure.com:443/",
"Masterkey" -> "$masterKey$"
"Database" -> "$database$",
"Collection" -> "$collection$", 
"SamplingRatio" -> "1.0",
"schema_samplesize" -> "1000"
)

val airportConfigMap = baseConfigMap ++ Map("query_custom" -> "select * from c where c.label='airport'") 
val delayConfigMap = baseConfigMap ++ Map("query_custom" -> "select * from c where c.label='flight'") 


// Configs
// get airport data (vertices)
val airportConfig = Config(airportConfigMap)
val airportColl = spark.sqlContext.read.cosmosDB(airportConfig)
airportColl.createOrReplaceTempView("airportColl") 

// get flight delay data (edges)
val delayConfig = Config(delayConfigMap)
val delayColl = spark.sqlContext.read.cosmosDB(delayConfig)
delayColl.createOrReplaceTempView("delayColl") 

Here, we have created Spark DataFrames – one for the airport data (which are the vertices) and one for the flight delay data (which are the edges). The graph we have stored in Azure Cosmos DB can be visually depicted as in the figure below, where the vertexes are the blue circles representing the airports and the edges are the black lines representing the flights between those cities.  In this example, the originating city for those flights (edges) is Seattle (blue circle top left of map where all the edges are originating from).

image

Figure: Airport D3.js visualization of airports (blue circles) and edges (black lines) which are the flights between the cities.

Benefit of Integrating Cosmos DB  Graphs with Spark

One of the key benefits of working with Azure Cosmos DB graphs and Spark connector is that Gremlin queries and Spark DataFrame (as well as other Spark queries) can be executed against the same data container (be it a graph, a table or a collection of documents).  For example, below are some simple Gremlin Groovy queries against this flights graph stored in our Azure Cosmos DB graph.

         \,,,/
         (o o)
-----oOOo-(3)-oOOo-----
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
plugin activated: tinkerpop.tinkergraph
gremlin> :remote connect tinkerpop.server conf/remote-secure.yaml
==>Configured tychostation.graphs.azure.com/52.173.137.146:443

gremlin> // How many flights into each city leaving SEA
==>true
gremlin> :> g.V().has('iata', 'SEA').outE('flight').inV().values('city').groupCount()
==>[Chicago:1088,New York:432,Dallas:800,Miami:90,Washington DC:383,Newark:345,Boston:315,Orlando:116,Philadelphia:193,Fort Lauderdale:90,Minneapolis:601,Juneau:180,Ketchikan:270,Anchorage:1097,Fairbanks:260,San Jose:611,San Francisco:1698,San Diego:617,Oakland:798,Sacramento:629,Los Angeles:1804,Orange County:569,Burbank:266,Ontario:201,Palm Springs:236,Las Vegas:1204,Phoenix:1228,Tucson:90,Austin:90,Denver:1231,Spokane:269,San Antonio:90,Salt Lake City:860,Houston:568,Atlanta:521,St. Louis:90,Kansas City:95,Honolulu, Oahu:415,Kahului, Maui:270,Lihue, Kauai:128,Long Beach:345,Detroit:244,Cincinnati:4,Omaha:90,Santa Barbara:90,Fresno:142,Colorado Springs:90,Portland:602,Jackson Hole:13,Cleveland:6,Charlotte:169,Albuquerque:105,Reno:90,Milwaukee:82]


gremlin> // SEA -> Reno flight delays
==>true
gremlin> :> g.V().has('iata', 'SEA').outE('flight').as('e').inV().has('iata', 'RNO').select('e').values('delay').sum()
==>963

The preceding code connects to the tychostation graph (tychostation.graphs.azure.com) to run the following Gremlin Groovy queries:

  • Using graph traversal and groupCount(), determines the number of flights originating from Seattle to the listed destination cities (e.g. there are 1088 flights from Seattle to Chicago) in this dataset).
  • Using graph to determine the total delay (in minutes) of the 90 flights from Seattle to Reno (i.e. 963 minutes of delay).

With the Spark connector using the same tychostation graph, we can also run our own Spark DataFrame queries. Following up from the preceding Spark connector code snippet, let’s run our Spark SQL queries – in this case we’re using the HDInsight Jupyter notebook service.

Top 5 destination cities departing from Seattle

%%sql
select a.city, sum(f.delay) as TotalDelay 
from delays f 
join airports a 
  on a.iata = f.dst 
where f.src = 'SEA' and f.delay < 0 
group by a.city 
order by sum(f.delay) limit 5

image

Calculate median delays by destination cities departing from Seattle

%%sql
select a.city, percentile_approx(f.delay, 0.5) as median_delay 
from delays f 
join airports a 
  on a.iata = f.dst 
where f.src = 'SEA' and f.delay < 0 
group by a.city 
order by median_delay

image

With Azure Cosmos DB, you can use both Apache Tinkerpop Gremlin queries AND Apache Spark DataFrame queries targeting the same graph.

Working with Azure Cosmos DB Document data model

Whether you are using the Azure Cosmos DB graph, table, or documents, from the perspective of the Spark Connector for Azure Cosmos DB, the code is the same!  Ultimately, the template to connect to any of these data models is noted below:

  1. Configure your connection.
  2. Build your config and DataFrame.
  3. And voila – Apache Spark is working in tandem with Azure Cosmos DB.
// Initialization
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config


// Configure your connection
val baseConfigMap = Map(
"Endpoint" -> "https://$documentContainer$.documents.azure.com:443/",
"Masterkey" -> "$masterKey$"
"Database" -> "$database$",
"Collection" -> "$collection$", 
"SamplingRatio" -> "1.0",
"schema_samplesize" -> "1000"
)

// Build config and DataFrame
val baseConfig = Config(baseConfigMap)
val baseColl = spark.sqlContext.read.cosmosDB(baseConfig)

And, with the Spark Connector for Azure Cosmos DB, data is parallelized between the Spark worker nodes and Azure Cosmos DB data partitions.  Therefore, whether your data is stored in the Tables, Graph, or Documents, you will get the performance, scalability, throughput, and consistency all backed by Azure Cosmos DB when you are solving your Machine Learning and Data Science problems with Apache Spark.

image

Next Steps

In this blog post, we’ve looked at how Spark Connector for Azure Cosmos DB can seamlessly interact with multiple data models supported by Azure Cosmos DB. Apache Spark with Azure Cosmos DB enables both ad-hoc, interactive queries on big data, as well as advanced analytics, data science, machine learning and artificial intelligence. Azure Cosmos DB can be used for capturing data that is collected incrementally from various sources across the globe. This includes social analytics, time series, game or application telemetry, retail catalogs, up-to-date trends and counters, and audit log systems. Spark can then be used for running advanced analytics and AI algorithms at scale and globally on top of the data living in Azure Cosmos DB.  With Azure Cosmos DB being the industry’s first globally distributed multi-model database service, the Spark connector for Azure Cosmos DB can work with tables, graphs, and document data models – with more to come!

To get started running queries, create a new Azure Cosmos DB account from the Azure portal and work with the project in our Azure-CosmosDB-Spark GitHub repo.

Stay up-to-date on the latest Azure Cosmos DB news and features by following us on Twitter @AzureCosmosDB and #CosmosDB and reach out to us on the developer forums on Stack Overflow.