With the latest service update and Data Management Gateway release, you can connect to new data stores and leverage new features to move data with Azure Data Factory, including:
- Copy from on-premises File System to Azure Blob
- Copy from on-premises Oracle Database to Azure Blob
- Specify Encoding for Text Files
- Invoke Stored Procedure with Additional Parameters for Copying into SQL Sink
Please read the following sections for details.
File Copy from on-premises File System to Azure Blob
Azure Data Factory released a new feature enabling copying files from on-premises file system, Windows and Linux network share or Windows local host, to Azure Blob with data factory pipelines.
Let’s get started with the following example:
- Host: contoso
- Folder: marketingcampaignregionaldata{slice}, where files are partitioned in folder named {slice}, such as 2014121112 (year of 2014, month of 12, day of 11, hour of 12)
The host can be either Windows or Linux with Samba configured. Data Management Gateway should be installed in a Windows machine that can connect to the host.
Now let’s leverage Azure Data Factory to copy files contained in slices to Azure Blob.
First let’s define on-premises file system linked service:
{ "name": "FolderDataStore", "properties": { "type": "OnPremisesFileSystemLinkedService", "host": "contoso", "userId": "username", "password": "password", "gatewayName": "ContosoGateway" } }
Please remember to escape “” characters in the host name which is required by JSON.
The following JSON script defines an input table that refers to an on-premises file system defined previously:
{ "name": "OnPremFileSource", "properties": { "location": { "type": "OnPremisesFileSystemLocation", "folderPath": "marketingcampaignregionaldata{Slice}", "partitionedBy": [ { "name": "Slice", "value": { "type": "DateTime", "date": "SliceStart", "format": "yyyyMMddHH" } } ], "linkedServiceName": "FolderDataStore" }, "availability": { "waitOnExternal": { }, "frequency": "Hour", "interval": 24 } } }
Here partitonedBy property is used to define the slices. Please remember to escape “” characters in the host name, and please do not add leading “” in folderPath.
Now we can easily author a pipeline with copy activity to replicate files in the file system to Azure Blob. The content will be copied as binary, without any parsing or transformation.
{ "name": "CopyFileToBlobPipeline", "properties": { "activities": [ { "name": "Ingress", "inputs": [ { "name": "OnPremFileSource" } ], "outputs": [ { "name": "AzureBlobDest" } ], "type": "CopyActivity", "transformation": { "source": { "type": "FileSystemSource" }, "sink": { "type": "BlobSink" } }, "policy": { "concurrency": 4, "timeout": "00:05:00" } } ] } }
Please notice that you can leverage concurrency to copy the slices of files in parallel. This is useful when you want to move the slices already happened in the past.
Caveat: concurrent copy activities with the same host via UNC path with different user accounts may lead to error e.g. “multiple connections to a server or shared resource by the same user, using more than one user name, are not allowed”. This is the restriction of the OS for security reasons. Please either schedule the copy activities with different gateways, or install the gateway within the host and use “localhost” or “local” instead of UNC path.
In addition to partition, more copy scenarios are enabled.
Example 1: copy all files under a specific folder
{ "name": "OnPremFileSource", "properties": { "location": { "type": "OnPremisesFileSystemLocation", "folderPath": "marketingcampaignregionaldatana", "linkedServiceName": "FolderDataStore" }, ... } }
Example 2: copy all CSV files under the specific folder
{ "name": "OnPremFileSource", "properties": { "location": { "type": "OnPremisesFileSystemLocation", "folderPath": "marketingcampaignregionaldatana", "fileFilter": "*.csv", "linkedServiceName": "FolderDataStore" }, ... } }
Example 3: copy specific file
{ "name": "OnPremFileSource", "properties": { "location": { "type": "OnPremisesFileSystemLocation", "folderPath": "marketingcampaignregionaldatana", "fileFilter": "201501.csv", "linkedServiceName": "FolderDataStore" }, ... } }
For details, please check out On-Premises File system Linked Services.
Copy from on-premises Oracle Database to Azure Blob
Azure Data Factory released a new feature to enable copying files from on-premises Oracle database to Azure Blob for further data processing.
First let’s define Oracle linked service, please refer to Oracle Connect Descriptor for detailed connection string format:
{ "name": "LinkedServiceOracle", "properties": { "type": "OnPremisesOracleLinkedService", "ConnectionString": "data source=ds;User Id=uid;Password=pwd;", "gatewayName": "SomeGateway" } }
Then we can define an input table that refers to an on-premises Oracle table:
{ "name": "TableOracle", "properties": { "location": { "type": "OnPremisesOracleTableLocation", "tableName": "LOG", "linkedServiceName": "LinkedServiceOracle" }, "availability": { "frequency": "Day", "interval": "1", "waitOnExternal": {} }, "policy": {} } }
Now it is fairly easy to author copy activity to copy records based on time slices to Azure Blob with macros in Oracle reader query. Notice that you can specify concurrency number to run the qualified copy activities in parallel:
{ "name": "PipelineCopyOracleToBlob", "properties": { "activities": [ { "name": "CopyActivity", "description": "copy slices of oracle records to azure blob", "type": "CopyActivity", "inputs": [ { "name": "TableOracle" } ], "outputs": [ { "name": "TableAzureBlob" } ], "transformation": { "source": { "type": "OracleSource", "oracleReaderQuery": "$$Text.Format('select * from LOG where "Timestamp" >= to_date('{0:yyyy-MM-dd}', 'YYYY-MM-DD') AND "Timestamp" < to_date('{1:yyyy-MM-dd}', 'YYYY-MM-DD')', SliceStart, SliceEnd)" }, "sink": { "type": "BlobSink" } }, "policy": { "concurrency": 3, "timeout": "00:05:00" } } ], "start": "2015-03-01T00:00:00Z", "end": "2015-03-15T00:00:00Z", "isPaused": false } }
Specify Encoding for Text Files
Though UTF-8 encoding is quite popular, often time text files in Azure Blob follow other encodings due to historical reasons. With newly introduced encodingName, user now can specify the encoding by code page name for tables of TextFormat type, e.g. “gb2312”, “windows-1255”. If the value is omitted, the connector will still fall back to UTF-8 as usual, unless BOM denotes another Unicode encoding. Please refer to this link for the supported encoding names.
Here is an example to set the encoding to gb2312 for text files:
"location": { "type": "AzureBlobLocation", "folderPath": "encode", "format": { "type": "TextFormat", "columnDelimiter": ",", "encodingName": "gb2312" }, "linkedServiceName": "LinkedServiceAzureBlob" }
Invoke Stored Procedure with Additional Parameters for Copying into SQL Sink
When copying data into SQL Server or Azure SQL Database, a user specified stored procedure could be configured and invoked with additional parameters.
Example
Define the JSON of output Table as follows (take Azure SQL Database table as an example):
{ "name": "MyAzureSQLTable", "properties": { "location": { "type": "AzureSqlTableLocation", "tableName": "Marketing", "linkedServiceName": "AzureSqlLinkedService" }, "availability": { "frequency": "Hour", "interval": 1 } } }
Define the SqlSink section in copy activity JSON as follows. To call a stored procedure while insert data, both SqlWriterStoredProcedureName and SqlWriterTableType properties are needed.
"sink": { "type": "SqlSink", "SqlWriterTableType": "MarketingType", "SqlWriterStoredProcedureName": "spOverwriteMarketing", "storedProcedureParameters": { "stringData": { "value": "str1" } } }
In your database, define the stored procedure with the same name as SqlWriterStoredProcedureName. It handles input data from your specified source, and insert into the output table. Notice that the parameter name of the stored procedure should be the same as the tableName defined in Table JSON file.
CREATE PROCEDURE spOverwriteMarketing @Marketing [dbo].[MarketingType] READONLY, @stringData varchar(256) AS BEGIN DELETE FROM [dbo].[Marketing] where ProfileID = @stringData INSERT [dbo].[Marketing](ProfileID, State) SELECT * FROM @Marketing END
In your database, define the table type with the same name as SqlWriterTableType. Notice that the schema of the table type should be same as the schema returned by your input data.
CREATE TYPE [dbo].[MarketingType] AS TABLE( [ProfileID] [varchar](256) NOT NULL, [State] [varchar](256) NOT NULL )
Summary
We’re adding more and more data stores for Azure Data Factory. If you have some names in your mind, please feedback via Azure Data Factory User Voice. We are listening:-)