• 8 min read

Creating big data pipelines using Azure Data Lake and Azure Data Factory

This week, Microsoft announced the public preview of a new and expanded Azure Data Lake making big data processing and analytics simpler and more accessible.

This week, Microsoft announced the public preview of a new and expanded Azure Data Lake making big data processing and analytics simpler and more accessible. The expanded Azure Data Lake includes Azure Data Lake Store, Azure Data Lake Analytics and Azure HDInsight.

The Azure Data Lake Store provides a single repository where you can easily capture data of any size, type and speed without forcing changes to your application as data scales. Azure Data Lake Analytics is a new service built on Apache YARN  and includes U-SQL, a language that unifies the benefits of SQL with the expressive power of user code. The service dynamically scales and allows you to do analytics on any kind of data with enterprise-grade security through Azure Active Directory so you can focus on your business goals.

In the first week of October, we announced you will be able to create and operationalize big data pipelines (aka workflows) using Azure Data Lake and Azure Data Factory in addition to using our existing support for Azure HDInsight. Today, we are announcing the public preview of these newly added capabilities. The Azure Data Lake and Azure Data Factory integration allows you to do the following:

Easily move data to Azure Data Lake Store

As of today, Azure Data Factory supports moving data from the following sources to Azure Data Lake Store:

  • Azure Blob
  • Azure SQL Database
  • Azure Table
  • On-premises SQL Server Database
  • Azure DocumentDB
  • Azure SQL DW
  • On-premises File System
  • On-premises Oracle Database
  • On-premises MYSQL Database
  • On-premises DB2 Database
  • On-premises Teradata Database
  • On-premises Sybase Database
  • On-premises PostgreSQL Database
  • On-premises HDFS
  • Generic OData (Coming soon!)
  • Generic ODBC (Coming soon!)

You can also move data from Azure Data Lake Store to a number of sinks such as Azure Blob, Azure SQL Database, on-premises file system, etc. Follow the steps below to move data from Azure Blob Storage to Azure Data Lake Store.

Note: You need to have a valid Azure Data Lake Store account before following the steps below. Click here to create a new account if you don’t have one.

Create an Azure Data Factory

Login to Azure Portal and navigate to Azure Data Factory. Enter Name, select Subscription, Resource group name and Region name. Let’s name it AzureDataLakeStoreAnalyticsSample.

2015-10-25_22h22_06

Once created, navigate to your data factory and click Author and deploy.

2015-10-25_22h37_40 

Create ADF Linked Services

Create Azure Storage Linked Service: This is the Azure Blob Storage (source) from where you want to move the data.

Click New Data Store –> Azure Storage. Enter the values for and parameters and hit Deploy.

2015-10-27_14h55_19

Create Azure Data Lake Store Linked Service: This is the Azure Data Lake Storage (sink aka destination) where you want to move the data.

Click New Data Store -> Azure Data Lake Store.

2015-10-25_23h11_00

Enter the mandatory parameters for Azure Data Lake Store Linked Service

DataLakeUri: Created in step above or using an existing one. For example, https://.azuredatalakestore.net/webhdfs/v1. Replace with your ADL Store account name.

Authorization: In order to fill in this parameter, click Authorize. This will open up a pop up and you need to enter your credentials.

2015-10-25_23h26_03

If your Azure Data Lake Store account is in a different subscription and under a different resource group name than your data factory, then you also need to fill in the following parameters:

  • AccountName
  • SubscriptionID
  • ResourceGroupName

Click Deploy. This should create the Azure Data Lake Store Linked Service.

Note: You need to delete the rows saying Optional in the Json if you are not specifying the values for them before hitting Deploy.

Create ADF DataSets

Create Azure Blob Storage source dataset:

Click New Dataset –> Azure Blob storage.

2015-10-26_07h58_55

This will bring in the template for the Azure Blob storage dataset where you can fill in any values. Check out the Azure Blob storage dataset below as an example. For simplicity, we are not using the partitioned by clause for time based partitions and using a static folder. The below dataset specifies that the data being copied (SearchLog.tsv) is in rawdatasample/data/ folder in azure storage.

{
    "name": "RawBlobDemoTable",
    "properties": {
        "published": false,
        "type": "AzureBlob",
        "linkedServiceName": "StorageLinkedService",
        "typeProperties": {
            "fileName": "SearchLog.tsv",
            "folderPath": "rawdatasample/data/",
            "format": {
                "type": "TextFormat",
                "rowDelimiter": "n",
                "columnDelimiter": "t"
            }
        },
        "availability": {
            "frequency": "Day",
            "interval": 1,
            "style": "StartOfInterval"
        },
        "external": true,
        "policy": {
            "validation": {
                "minimumSizeMB": 0.00001
            }
        }
    }
}

Create Azure Data Lake Store destination dataset:

Click New Dataset -> Azure Data Lake Store.

2015-10-26_08h18_14

This will bring in the template for the Azure Data Lake Store dataset where you can fill in any values. For an example, look at the below Azure Data Lake Store dataset. For simplicity, we are not using the partitioned by clause for time based partitions and using a static folder. The below dataset specifies that the data being copied to datalake/input/ folder in data lake.

{
    "name": "DataLakeTable",
    "properties": {
        "published": false,
        "type": "AzureDataLakeStore",
        "linkedServiceName": "AzureDataLakeStoreLinkedService",
        "typeProperties": {
            "folderPath": "datalake/input/",
            "fileName": "SearchLog.tsv",
            "format": {
                "type": "TextFormat",
                "rowDelimiter": "n",
                "columnDelimiter": "t"
            }
        },
        "availability": {
            "frequency": "Day",
            "interval": 1
        }
    }
}

Create ADF Pipelines

Create ADF Copy Pipeline: This pipelines copies data from Azure Blob Storage to Azure Data Lake.

Click New Pipeline and this will open a sample pipeline template. For example: The below pipeline will copy the data from Azure blob storage to Azure Data Lake (sample datasets created above).

Pipeline Definition:

{
    "name": "EgressBlobToDataLakePipeline",
    "properties": {
        "description": "Egress data from blob to azure data lake",
        "activities": [
            {
                "type": "Copy",
                "typeProperties": {
                    "source": {
                        "type": "BlobSource",
                        "treatEmptyAsNull": true
                    },
                    "sink": {
                        "type": "AzureDataLakeStoreSink",
                        "writeBatchSize": 0,
                        "writeBatchTimeout": "00:00:00"
                    }
                },
                "inputs": [
                    {
                        "name": "RawBlobDemoTable"
                    }
                ],
                "outputs": [
                    {
                        "name": "DataLakeTable"
                    }
                ],
                "policy": {
                    "timeout": "10:00:00",
                    "concurrency": 1,
                    "executionPriorityOrder": "NewestFirst",
                    "retry": 1
                },
                "scheduler": {
                    "frequency": "Day",
                    "interval": 1
                },
                "name": "EgressDataLake",
                "description": "Move data from blob to azure data lake"
            }
        ],
        "start": "2015-08-08T00:00:00Z",
        "end": "2015-08-08T01:00:00Z",
        "isPaused": false
    }
}

Monitor ADF Pipelines

The ADF copy pipeline created above will start running as the datasets have a daily frequency and the start, end in the pipeline definition is set to 08/08/2015. So, the pipelines will only run for that day and do the copy operation once. Click here to learn more about scheduling ADF pipelines.

Navigate to ADF Diagram View to view the operational lineage of your data factory. You will be able to see the Azure Blob Storage and Azure Data Lake Store dataset along with the pipeline for moving the data from blob storage to azure data lake store.

2015-10-27_15h04_32

Click on the DataLakeTable in your Diagram view to see the the corresponding activity executions and its status.

2015-10-27_14h51_37

You can see that the copy activity in EgressBlobToDataLakePipeline in ADF (see screenshot above) has successfully executed and copied 3.08 KB data from Azure Blob Storage to Azure Data Lake Store. You can also login to Microsoft Azure portal and use the Azure Data Lake Data Explorer to visualize the data copied to Azure Data Lake Store.

2015-10-27_14h58_37 

Click here to learn more about Azure Data Factory data movement activities. You can find detailed documentation about using AzureDataLakeStore connector in ADF here.

Create E2E big data ADF pipelines that run U-SQL scripts as a processing step on Azure Data Lake Analytics service

A very common use case for multiple industry verticals (retail, finance, gaming) is Log Processing.

Note: You need to have a valid Azure Data Lake Analytics account before following the steps below. Click here to create a new account if you don’t have one.

In this scenario, you will create an ADF pipeline that consumes the logs copied to Azure Data Lake Store account in previous step and processes logs by running U-SQL script on Azure Data Lake Analytics as one of the processing step. The U-SQL script computes events by region that can be consumed by downstream processes.

We will reuse the data factory (AzureDataLakeStoreAnalyticsSample) created in the scenario above to copy data from Azure Blob Storage to Azure Data Lake Store.

Create ADF Linked Services

Create Azure Data Lake Analytics Linked Service. This is the Azure Data Lake Analytics account which will run the U-SQL scripts to do log processing.

Click New Compute –> Azure Data Lake Analytics.

2015-10-26_14h55_59 

Enter the mandatory parameters for Azure Data Lake Analytics Linked Service

  • AccountName: Created in step above or using an existing one
  • Authorization: In order to fill in this parameter, click Authorize. This will open up a pop-up and you’ll need to enter your credentials.

2015-10-26_15h01_38 

Enter the optional parameters in case your Azure Data Lake Analytics account is in a different subscription and under a different resource group name.

  • SubscriptionID
  • ResourceGroupName

Click Deploy. This should create the Azure Data Lake Analytics Linked Service.

Note: You need to delete the rows saying Optional in the JSON if you are not specifying the values for them before hitting Deploy.

Create Azure Data Lake Store Linked Service: This is the Azure Data Lake Storage (sink aka destination) where you want to move the data.

Note: If you are doing this scenario in continuation to the Copy scenario above, then you would have created this Linked Service already.

Click New Data Store -> Azure Data Lake Store.

2015-10-25_23h11_00

Enter the mandatory parameters for Azure Data Lake Store Linked Service

DataLakeUri: Created in step above or using an existing one), example: https://.azuredatalakestore.net/webhdfs/v1. Replace with your ADL Store account name.

Authorization: In order to fill in this parameter, click Authorize. This will open up a pop up and you need to enter your credentials.

2015-10-25_23h26_03

If your Azure Data Lake Store account is in a different subscription and under a different resource group name than your data factory, then you also need to fill in the following parameters:

  • AccountName
  • SubscriptionID
  • ResourceGroupName

Click Deploy. This should create the Azure Data Lake Store Linked Service.

Note: You need to delete the rows saying Optional in the JSON if you are not specifying the values for them before hitting Deploy.

Create ADF DataSets

Create Azure Data Lake Store source dataset:

Note: If you are doing this scenario in continuation to the Copy scenario above, then you would have created this dataset already.

Click New Dataset -> Azure Data Lake Store.

2015-10-26_08h18_14

This will bring in the template for the Azure Data Lake Store dataset. You can fill in any values.

For example: Have a look at the below Azure Data Lake Store dataset. For simplicity, we are not using the partitioned by clause for time based partitions and using a static folder. The below dataset specifies that the data being copied to datalake/input/ folder in data lake.

{
    "name": "DataLakeTable",
    "properties": {
        "published": false,
        "type": "AzureDataLakeStore",
        "linkedServiceName": "AzureDataLakeStoreLinkedService",
        "typeProperties": {
            "folderPath": "datalake/input/",
            "fileName": "SearchLog.tsv",
            "format": {
                "type": "TextFormat",
                "rowDelimiter": "n",
                "columnDelimiter": "t"
            }
        },
        "availability": {
            "frequency": "Day",
            "interval": 1
        }
    }
}

Create Azure Data Lake Store’destination dataset.

Click New Dataset -> Azure Data Lake Store.

For example: See the EventsByEnGbRegionTable dataset definition below. The data corresponding to this dataset will be produced after running the AzureDataLakeAnalytics U-SQL script to get all events for ‘en-gb’ locale and date < “2012/02/19”.

{
    "name": "EventsByEnGbRegionTable",
    "properties": {
        "published": false,
        "type": "AzureDataLakeStore",
        "linkedServiceName": "AzureDataLakeStoreLinkedService",
        "typeProperties": {
            "folderPath": "datalake/output/"
        },
        "availability": {
            "frequency": "Day",
            "interval": 1
        }
    }
}

Create ADF Pipelines

Create ADF AzureDataLakeAnalytics Pipeline: This pipeline runs a U-SQL activity to do processing.

Click New Pipeline and this will open a sample pipeline template.

2015-10-26_17h24_21

You can also click Add Activity after clicking New Pipeline and add the template for the DataLakeAnalyticsU-SQL activity.

2015-10-26_17h26_34

For example, the below pipeline runs a ADLA U-SQL activity to get all events for ‘en-gb’ locale and date < “2012/02/19”.

Pipeline Definition:

{
    "name": "ComputeEventsByEnGbRegionPipeline",
    "properties": {
        "description": "This is a pipeline to compute events for en-gb locale and date less than 2012/02/19.",
        "activities": [
            {
                "type": "DataLakeAnalyticsU-SQL",
                "typeProperties": {
                    "scriptPath": "scriptskonaSearchLogProcessing.txt",
                    "scriptLinkedService": "StorageLinkedService",
                    "degreeOfParallelism": 3,
                    "priority": 100,
                    "parameters": {
                        "in": "/datalake/input/SearchLog.tsv",
                        "out": "/datalake/output/Result.tsv"
                    }
                },
                "inputs": [
                    {
                        "name": "DataLakeTable"
                    }
                ],
                "outputs": [
                    {
                        "name": "EventsByEnGbRegionTable"
                    }
                ],
                "policy": {
                    "timeout": "06:00:00",
                    "concurrency": 1,
                    "executionPriorityOrder": "NewestFirst",
                    "retry": 1
                },
                "scheduler": {
                    "frequency": "Day",
                    "interval": 1
                },
                "name": "EventsByRegion",
                "linkedServiceName": "AzureDataLakeAnalyticsLinkedService"
            }
        ],
        "start": "2015-08-08T00:00:00Z",
        "end": "2015-08-08T01:00:00Z",
        "isPaused": false
    }
}

The U-SQL script being run by the pipeline above resides in the ‘scripts/kona’ folder in the Azure Blob Storage account corresponding to the deployed StorageLinkedService.

SearchLogProcessing.txt Script Definition:

@searchlog =
    EXTRACT UserId          int,
            Start           DateTime,
            Region          string,
            Query           string,
            Duration        int?,
            Urls            string,
            ClickedUrls     string
    FROM @in
    USING Extractors.Tsv(nullEscape:"#NULL#");

@rs1 =
    SELECT Start, Region, Duration
    FROM @searchlog
WHERE Region == "en-gb";

@rs1 =
    SELECT Start, Region, Duration
    FROM @rs1
    WHERE Start <= DateTime.Parse("2012/02/19");

OUTPUT @rs1   
    TO @out
      USING Outputters.Tsv(quoting:false, dateTimeFormat:null);

The values for @in and @out parameters in the above U-SQL script are passed dynamically by ADF using the Parameters section. See the Parameters section above in the pipeline definition.

You can specify other properties viz. degreeOfParallelism, priority etc. as well in your pipeline definition for the jobs that run on the Azure Data Lake Analytics service.

Monitor ADF Pipelines

The ADF copy pipeline above will start running as the datasets have a daily frequency and the start, end in the pipeline definition is set to 08/08/2015. So, the pipelines will only run for that day and run the U-SQL script once. Click here to learn more about scheduling ADF pipelines.

Navigate to ADF Diagram view to view the operational lineage of your data factory. You will see two pipelines and the corresponding datasets viz. EgressBlobToDataLakePipeline (copy data from Azure Blob Storage to Azure Data Lake Store) and ComputeEventsByEnGbRegionPipeline (get all events for ‘en-gb’ locale and date < “2012/02/19”).

2015-10-27_14h50_04

Click on the EventsByEnGbRegionTable in your Diagram view to see the the corresponding activity executions and its status.

2015-10-27_15h01_09

You can see that the U-SQL activity in ComputeEventsByEnGbRegionPipeline in ADF has run successfully and created a Results.tsv file (/datalake/output/Result.tsv) in your AzureDataLakeStore account. The Result.tsv contains all events for ‘en-gb’ locale and date < “2012/02/19”. You can login to Microsoft Azure portal and use the Azure Data Lake Data Explorer to visualize the Result.tsv file generated as part of the processing step above in Azure Data Lake Store.

2015-10-27_15h02_42 

You can find detailed documentation about AzureDataLakeAnalyticsU-SQL activity  in Azure Data Factory here.

To summarize, by following the steps above, you were able to build E2E big data pipelines using Azure Data Factory that allowed you to move data to Azure Data Lake Store. In addition, you were able to run U-SQL script on Azure Data Lake Analytics as one of the processing step and dynamically scale according to your needs.

We will continue to invest in solutions allowing us to operationalize big data processing and analytics workflows. Click here to learn more about the Microsoft Azure Data Lake from the Microsoft Cloud Platform team. If you want to try out Azure Data Factory, visit us here and get started by building pipelines easily and quickly using data factory. If you have any feature requests or want to provide feedback for data factory, please visit the Azure Data Factory Forum.