Management .NET SDK: Set up and run analytics jobs using the Azure Stream Analytics API for .NET

Learn how to set up and run analytics jobs using the Stream Analytics API for .NET using the Management .NET SDK. Set up a project, create input and output sources, transformations, and start and stop jobs. For your analytics jobs, you can stream data from Blob storage or from an event hub.

See the management reference documentation for the Stream Analytics API for .NET.

Azure Stream Analytics is a fully managed service providing low-latency, highly available, scalable, complex event processing over streaming data in the cloud. Stream Analytics enables customers to set up streaming jobs to analyze data streams, and allows them to drive near real-time analytics.

Note

We have updated the sample code in this article with Azure Stream Analytics Management .NET SDK v2.x version. For sample code using the uses lagecy (1.x) SDK version, please see Use the Management .NET SDK v1.x for Stream Analytics.

Prerequisites

Before you begin this article, you must have the following requirements:

  • Install Visual Studio 2019 or 2015.

  • Download and install Azure .NET SDK.

  • Create an Azure Resource Group in your subscription. The following example is a sample Azure PowerShell script. For Azure PowerShell information, see Install and configure Azure PowerShell;

    # Log in to your Azure account
    Add-AzureAccount
    
    # Select the Azure subscription you want to use to create the resource group
    Select-AzureSubscription -SubscriptionName <subscription name>
    
    # If Stream Analytics has not been registered to the subscription, remove the remark    symbol (#) to run the Register-AzProvider cmdlet to register the provider namespace
    #Register-AzProvider -Force -ProviderNamespace 'Microsoft.StreamAnalytics'
    
    # Create an Azure resource group
    New-AzureResourceGroup -Name <YOUR RESOURCE GROUP NAME> -Location <LOCATION>
    
  • Set up an input source and output target for the job to connect to.

Set up a project

To create an analytics job, use the Stream Analytics API for .NET, first set up your project.

  1. Create a Visual Studio C# .NET console application.

  2. In the Package Manager Console, run the following commands to install the NuGet packages. The first one is the Azure Stream Analytics Management .NET SDK. The second one is for Azure client authentication.

    Install-Package Microsoft.Azure.Management.StreamAnalytics -Version 2.0.0
    Install-Package Microsoft.Rest.ClientRuntime.Azure.Authentication -Version 2.3.1
    
  3. Add the following appSettings section to the App.config file:

    <appSettings>
        <add key="ClientId" value="1950a258-227b-4e31-a9cf-717495945fc2" />
        <add key="RedirectUri" value="urn:ietf:wg:oauth:2.0:oob" />
        <add key="SubscriptionId" value="YOUR SUBSCRIPTION ID" />
        <add key="ActiveDirectoryTenantId" value="YOUR TENANT ID" />
    </appSettings>
    

    Replace values for SubscriptionId and ActiveDirectoryTenantId with your Azure subscription and tenant IDs. You can get these values by running the following Azure PowerShell cmdlet:

       Get-AzureAccount
    
  4. Add the following reference in your .csproj file:

    <Reference Include="System.Configuration" />
    
  5. Add the following using statements to the source file (Program.cs) in the project:

    using System;
    using System.Collections.Generic;
    using System.Configuration;
    using System.Threading;
    using System.Threading.Tasks;
    
    using Microsoft.Azure.Management.StreamAnalytics;
    using Microsoft.Azure.Management.StreamAnalytics.Models;
    using Microsoft.Rest.Azure.Authentication;
    using Microsoft.Rest;
    
  6. Add an authentication helper method:

    private static async Task<ServiceClientCredentials> GetCredentials()
    {
        var activeDirectoryClientSettings = ActiveDirectoryClientSettings.UsePromptOnly(ConfigurationManager.AppSettings["ClientId"], new Uri("urn:ietf:wg:oauth:2.0:oob"));
        ServiceClientCredentials credentials = await UserTokenProvider.LoginWithPromptAsync(ConfigurationManager.AppSettings["ActiveDirectoryTenantId"], activeDirectoryClientSettings);
    
        return credentials;
     }
    

Create a Stream Analytics management client

A StreamAnalyticsManagementClient object allows you to manage the job and the job components, such as input, output, and transformation.

Add the following code to the beginning of the Main method:

 string resourceGroupName = "<YOUR AZURE RESOURCE GROUP NAME>";
 string streamingJobName = "<YOUR STREAMING JOB NAME>";
 string inputName = "<YOUR JOB INPUT NAME>";
 string transformationName = "<YOUR JOB TRANSFORMATION NAME>";
 string outputName = "<YOUR JOB OUTPUT NAME>";
 
 SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
 
 // Get credentials
 ServiceClientCredentials credentials = GetCredentials().Result;
 
 // Create Stream Analytics management client
 StreamAnalyticsManagementClient streamAnalyticsManagementClient = new StreamAnalyticsManagementClient(credentials)
 {
     SubscriptionId = ConfigurationManager.AppSettings["SubscriptionId"]
 };

The resourceGroupName variable's value should be the same as the name of the resource group you created or picked in the prerequisite steps.

To automate the credential presentation aspect of job creation, refer to Authenticating a service principal with Azure Resource Manager.

The remaining sections of this article assume that this code is at the beginning of the Main method.

Create a Stream Analytics job

The following code creates a Stream Analytics job under the resource group that you have defined. You will add an input, output, and transformation to the job later.

// Create a streaming job
StreamingJob streamingJob = new StreamingJob()
{
    Tags = new Dictionary<string, string>()
    {
        { "Origin", ".NET SDK" },
        { "ReasonCreated", "Getting started tutorial" }
    },
    Location = "West US",
    EventsOutOfOrderPolicy = EventsOutOfOrderPolicy.Drop,
    EventsOutOfOrderMaxDelayInSeconds = 5,
    EventsLateArrivalMaxDelayInSeconds = 16,
    OutputErrorPolicy = OutputErrorPolicy.Drop,
    DataLocale = "en-US",
    CompatibilityLevel = CompatibilityLevel.OneFullStopZero,
    Sku = new Sku()
    {
        Name = SkuName.Standard
    }
};
StreamingJob createStreamingJobResult = streamAnalyticsManagementClient.StreamingJobs.CreateOrReplace(streamingJob, resourceGroupName, streamingJobName);

Create a Stream Analytics input source

The following code creates a Stream Analytics input source with the blob input source type and CSV serialization. To create an event hub input source, use EventHubStreamInputDataSource instead of BlobStreamInputDataSource. Similarly, you can customize the serialization type of the input source.

// Create an input
StorageAccount storageAccount = new StorageAccount()
{
    AccountName = "<YOUR STORAGE ACCOUNT NAME>",
    AccountKey = "<YOUR STORAGE ACCOUNT KEY>"
};
Input input = new Input()
{
    Properties = new StreamInputProperties()
    {
        Serialization = new CsvSerialization()
        {
            FieldDelimiter = ",",
            Encoding = Encoding.UTF8
        },
        Datasource = new BlobStreamInputDataSource()
        {
            StorageAccounts = new[] { storageAccount },
            Container = "<YOUR STORAGE BLOB CONTAINER>",
            PathPattern = "{date}/{time}",
            DateFormat = "yyyy/MM/dd",
            TimeFormat = "HH",
            SourcePartitionCount = 16
        }
    }
};
Input createInputResult = streamAnalyticsManagementClient.Inputs.CreateOrReplace(input, resourceGroupName, streamingJobName, inputName);

Input sources, whether from Blob storage or an event hub, are tied to a specific job. To use the same input source for different jobs, you must call the method again and specify a different job name.

Test a Stream Analytics input source

The TestConnection method tests whether the Stream Analytics job is able to connect to the input source as well as other aspects specific to the input source type. For example, in the blob input source you created in an earlier step, the method will check that the Storage account name and key pair can be used to connect to the Storage account as well as check that the specified container exists.

// Test the connection to the input
ResourceTestStatus testInputResult = streamAnalyticsManagementClient.Inputs.Test(resourceGroupName, streamingJobName, inputName);

The result of the TestConnection call is a ResourceTestResult object that contains two properties:

  • status: It can be one of the following strings: ["TestNotAttempted", "TestSucceeded", "TestFailed"]
  • error: It's of type ErrorResponse containing the following properties:
    • code: a required property of type string. The value is standard System.Net.HttpStatusCode received while testing.
    • message: a required property of type string representing the error.

Create a Stream Analytics output target

Creating an output target is similar to creating a Stream Analytics input source. Like input sources, output targets are tied to a specific job. To use the same output target for different jobs, you must call the method again and specify a different job name.

The following code creates an output target (Azure SQL Database). You can customize the output target's data type and/or serialization type.

// Create an output
Output output = new Output()
{
    Datasource = new AzureSqlDatabaseOutputDataSource()
    {
        Server = "<YOUR DATABASE SERVER NAME>",
        Database = "<YOUR DATABASE NAME>",
        User = "<YOUR DATABASE LOGIN>",
        Password = "<YOUR DATABASE LOGIN PASSWORD>",
        Table = "<YOUR DATABASE TABLE NAME>"
    }
};
Output createOutputResult = streamAnalyticsManagementClient.Outputs.CreateOrReplace(output, resourceGroupName, streamingJobName, outputName);

Test a Stream Analytics output target

A Stream Analytics output target also has the TestConnection method for testing connections.

// Test the connection to the output
ResourceTestStatus testOutputResult = streamAnalyticsManagementClient.Outputs.Test(resourceGroupName, streamingJobName, outputName);

Create a Stream Analytics transformation

The following code creates a Stream Analytics transformation with the query "select * from Input" and specifies to allocate one streaming unit for the Stream Analytics job. For more information on adjusting streaming units, see Scale Azure Stream Analytics jobs.

// Create a transformation
Transformation transformation = new Transformation()
{
    Query = "Select Id, Name from <your input name>", // '<your input name>' should be replaced with the value you put for the 'inputName' variable above or in a previous step
    StreamingUnits = 1
};
Transformation createTransformationResult = streamAnalyticsManagementClient.Transformations.CreateOrReplace(transformation, resourceGroupName, streamingJobName, transformationName);

Like input and output, a transformation is also tied to the specific Stream Analytics job it was created under.

Start a Stream Analytics job

After creating a Stream Analytics job and its input(s), output(s), and transformation, you can start the job by calling the Start method.

The following sample code starts a Stream Analytics job with a custom output start time set to December 12, 2012, 12:12:12 UTC:

// Start a streaming job
StartStreamingJobParameters startStreamingJobParameters = new StartStreamingJobParameters()
{
    OutputStartMode = OutputStartMode.CustomTime,
    OutputStartTime = new DateTime(2012, 12, 12, 12, 12, 12, DateTimeKind.Utc)
};
streamAnalyticsManagementClient.StreamingJobs.Start(resourceGroupName, streamingJobName, startStreamingJobParameters);

Stop a Stream Analytics job

You can stop a running Stream Analytics job by calling the Stop method.

// Stop a streaming job
streamAnalyticsManagementClient.StreamingJobs.Stop(resourceGroupName, streamingJobName);

Delete a Stream Analytics job

The Delete method will delete the job as well as the underlying sub-resources, including input(s), output(s), and transformation of the job.

// Delete a streaming job
streamAnalyticsManagementClient.StreamingJobs.Delete(resourceGroupName, streamingJobName);

Get support

For further assistance, try our Microsoft Q&A question page for Azure Stream Analytics.

Next steps

You've learned the basics of using a .NET SDK to create and run analytics jobs. To learn more, see the following articles: