Usar paralelização de consulta no Azure Stream Analytics

Este artigo mostra como tirar proveito da paralelização no Azure Stream Analytics. Você aprende a dimensionar trabalhos do Stream Analytics configurando partições de entrada e ajustando a definição de consulta de análise.

Como pré-requisito, convém estar familiarizado com a noção de unidade de streaming descrita em Compreender e ajustar unidades de streaming.

Quais são as partes de um trabalho do Stream Analytics?

Uma definição de trabalho do Stream Analytics inclui pelo menos uma entrada de streaming, uma consulta e uma saída. As entradas são de onde o trabalho lê o fluxo de dados. A consulta é usada para transformar o fluxo de entrada de dados, e a saída é para onde o trabalho envia os resultados do trabalho.

Partições em entradas e saídas

O particionamento permite dividir os dados em subconjuntos com base em uma chave de partição. Se sua entrada (por exemplo, Hubs de Eventos) for particionada por uma chave, recomendamos que você especifique a chave de partição ao adicionar uma entrada ao seu trabalho do Stream Analytics. O dimensionamento de um trabalho do Stream Analytics aproveita as partições na entrada e na saída. Um trabalho do Stream Analytics pode consumir e gravar partições diferentes em paralelo, o que aumenta a taxa de transferência.

Entradas

Todas as entradas de streaming do Azure Stream Analytics podem tirar proveito do particionamento: Hubs de Eventos, Hub IoT, armazenamento de Blob, Data Lake Storage Gen2.

Nota

Para o nível de compatibilidade 1.2 e superior, a chave de partição deve ser definida como uma propriedade de entrada, sem necessidade da palavra-chave PARTITION BY na consulta. Para o nível de compatibilidade 1.1 e inferior, a chave de partição precisa ser definida com a palavra-chave PARTITION BY na consulta.

Saídas

Ao trabalhar com o Stream Analytics, você pode aproveitar o particionamento nas saídas:

  • Azure Data Lake Storage
  • Funções do Azure
  • Tabela do Azure
  • Armazenamento de Blob (pode definir a chave de partição explicitamente)
  • Azure Cosmos DB (precisa definir a chave de partição explicitamente)
  • Hubs de Eventos (necessidade de definir a chave de partição explicitamente)
  • Hub IoT (precisa definir a chave de partição explicitamente)
  • Service Bus
  • SQL e Azure Synapse Analytics com particionamento opcional: veja mais informações na página Saída para o Banco de Dados SQL do Azure.

O Power BI não suporta particionamento. No entanto, você ainda pode particionar a entrada conforme descrito nesta seção.

Para obter mais informações sobre partições, consulte os seguintes artigos:

Query

Para que um trabalho seja paralelo, as chaves de partição precisam estar alinhadas entre todas as entradas, todas as etapas da lógica de consulta e todas as saídas. O particionamento da lógica de consulta é determinado pelas chaves usadas para junções e agregações (GROUP BY). Este último requisito pode ser ignorado se a lógica de consulta não estiver chaveada (projeção, filtros, junções referenciais...).

  • Se uma entrada e uma saída forem particionadas por WarehouseId, e os grupos de consulta por ProductId sem WarehouseId, o trabalho não será paralelo.
  • Se duas entradas a serem unidas forem particionadas por chaves de partição diferentes (WarehouseId e ProductId), o trabalho não será paralelo.
  • Se dois ou mais fluxos de dados independentes estiverem contidos em um único trabalho, cada um com sua própria chave de partição, o trabalho não será paralelo.

Somente quando todas as entradas, saídas e etapas de consulta estiverem usando a mesma chave, o trabalho será paralelo.

Trabalhos vergonhosamente paralelos

Um trabalho embaraçosamente paralelo é o cenário mais escalável no Azure Stream Analytics. Ele conecta uma partição da entrada a uma instância da consulta a uma partição da saída. Este paralelismo tem os seguintes requisitos:

  • Se a lógica de consulta depender da mesma chave que está sendo processada pela mesma instância de consulta, você deve certificar-se de que os eventos vão para a mesma partição da sua entrada. Para Hubs de Eventos ou Hub IoT, isso significa que os dados de evento devem ter o valor PartitionKey definido. Como alternativa, você pode usar remetentes particionados. Para armazenamento de blob, isso significa que os eventos são enviados para a mesma pasta de partição. Um exemplo seria uma instância de consulta que agrega dados por userID onde o hub de eventos de entrada é particionado usando userID como chave de partição. No entanto, se a lógica de consulta não exigir que a mesma chave seja processada pela mesma instância de consulta, você poderá ignorar esse requisito. Um exemplo dessa lógica seria uma simples consulta select-project-filter.

  • O próximo passo é fazer com que sua consulta seja particionada. Para trabalhos com nível de compatibilidade 1.2 ou superior (recomendado), a coluna personalizada pode ser especificada como Chave de Partição nas configurações de entrada e o trabalho será paralelo automaticamente. Trabalhos com nível de compatibilidade 1.0 ou 1.1, requer que você use PARTITION BY PartitionId em todas as etapas da sua consulta. Várias etapas são permitidas, mas todas elas devem ser particionadas pela mesma chave.

  • A maioria das saídas suportadas no Stream Analytics pode tirar proveito do particionamento. Se você usar um tipo de saída que não suporte particionamento, seu trabalho não será embaraçosamente paralelo. Para a saída dos Hubs de Eventos, verifique se a coluna Chave de partição está definida como a mesma chave de partição usada na consulta. Para obter mais informações, consulte a seção saída.

  • O número de partições de entrada deve ser igual ao número de partições de saída. A saída de armazenamento de Blob pode suportar partições e herda o esquema de particionamento da consulta upstream. Quando uma chave de partição para armazenamento de Blob é especificada, os dados são particionados por partição de entrada, portanto, o resultado ainda é totalmente paralelo. Aqui estão exemplos de valores de partição que permitem um trabalho totalmente paralelo:

    • Oito partições de entrada do hub de eventos e oito partições de saída do hub de eventos
    • Oito partições de entrada do hub de eventos e saída de armazenamento de blob
    • Oito partições de entrada do hub de eventos e saída de armazenamento de blob particionadas por um campo personalizado com cardinalidade arbitrária
    • Oito partições de entrada de armazenamento de blob e saída de armazenamento de blob
    • Oito partições de entrada de armazenamento de blob e oito partições de saída de hub de eventos

As seções a seguir discutem alguns cenários de exemplo que são embaraçosamente paralelos.

Consulta simples

  • Entrada: Um hub de eventos com oito partições
  • Saída: Um hub de eventos com oito partições ("Coluna de chave de partição" deve ser configurado para uso PartitionId)

Consulta:

    --Using compatibility level 1.2 or above
    SELECT TollBoothId
    FROM Input1
    WHERE TollBoothId > 100
    
    --Using compatibility level 1.0 or 1.1
    SELECT TollBoothId
    FROM Input1 PARTITION BY PartitionId
    WHERE TollBoothId > 100

Esta consulta é um filtro simples. Portanto, não precisamos nos preocupar em particionar a entrada que está sendo enviada para o hub de eventos. Observe que os trabalhos com nível de compatibilidade antes de 1.2 devem incluir a cláusula PARTITION BY PartitionId , para que cumpra o requisito #2 anterior. Para a saída, precisamos configurar a saída do hub de eventos no trabalho para ter a chave de partição definida como PartitionId. Uma última verificação é certificar-se de que o número de partições de entrada é igual ao número de partições de saída.

Consulta com uma chave de agrupamento

  • Entrada: Hub de eventos com oito partições
  • Saída: Armazenamento de Blob

Consulta:

    --Using compatibility level 1.2 or above
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    
    --Using compatibility level 1.0 or 1.1
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Esta consulta tem uma chave de agrupamento. Portanto, os eventos agrupados devem ser enviados para a mesma partição de Hubs de Eventos. Como neste exemplo agrupamos por TollBoothID, devemos ter certeza de que TollBoothID é usado como a chave de partição quando os eventos são enviados para Hubs de Eventos. Em seguida, no Azure Stream Analytics, você pode usar PARTITION BY PartitionId para herdar desse esquema de partição e habilitar a paralelização completa. Como a saída é o armazenamento de blobs, não precisamos nos preocupar em configurar um valor de chave de partição, conforme o requisito #4.

Exemplo de cenários que não são* embaraçosamente paralelos

Na seção anterior, o artigo abordou alguns cenários embaraçosamente paralelos. Nesta seção, você aprenderá sobre cenários que não atendem a todos os requisitos para serem embaraçosamente paralelos.

Contagem de partições incompatível

  • Entrada: Um hub de eventos com oito partições
  • Saída: Um hub de eventos com 32 partições

Se a contagem de partições de entrada não corresponder à contagem de partições de saída, a topologia não será embaraçosamente paralela, independentemente da consulta. No entanto, ainda podemos obter algum nível de paralelização.

Consulta usando saída não particionada

  • Entrada: Um hub de eventos com oito partições
  • Saída: Power BI

Atualmente, a saída do Power BI não oferece suporte ao particionamento. Portanto, este cenário não é embaraçosamente paralelo.

Consulta em várias etapas com diferentes valores PARTITION BY

  • Entrada: Hub de eventos com oito partições
  • Saída: Hub de eventos com oito partições
  • Nível de compatibilidade: 1.0 ou 1.1

Consulta:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId, PartitionId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1 Partition By TollBoothId
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Como você pode ver, a segunda etapa usa TollBoothId como a chave de particionamento. Este passo não é o mesmo que o primeiro e, portanto, exige que façamos um shuffle.

Consulta em várias etapas com diferentes valores PARTITION BY

  • Entrada: Hub de eventos com oito partições ("Coluna de chave de partição" não definida, padrão para "PartitionId")
  • Saída: Hub de eventos com oito partições ("Coluna de chave de partição" deve ser definida para usar "TollBoothId")
  • Nível de compatibilidade - 1.2 ou superior

Consulta:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

O nível de compatibilidade 1.2 ou superior permite a execução de consultas paralelas por padrão. Por exemplo, a consulta da seção anterior será particionada desde que a coluna "TollBoothId" esteja definida como chave de partição de entrada. A cláusula PARTITION BY PartitionId não é necessária.

Calcular o máximo de unidades de streaming de um trabalho

O número total de unidades de streaming que podem ser usadas por um trabalho do Stream Analytics depende do número de etapas na consulta definidas para o trabalho e do número de partições para cada etapa.

Etapas em uma consulta

Uma consulta pode ter uma ou várias etapas. Cada etapa é uma subconsulta definida pela palavra-chave WITH . A consulta que está fora da palavra-chave WITH (apenas uma consulta) também é contada como uma etapa, como a instrução SELECT na consulta a seguir:

Consulta:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )
    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute,3), TollBoothId

Esta consulta tem duas etapas.

Nota

Esta consulta é discutida em mais detalhes mais adiante no artigo.

Particionar um passo

O particionamento de uma etapa requer as seguintes condições:

  • A fonte de entrada deve ser particionada.
  • A instrução SELECT da consulta deve ser lida de uma fonte de entrada particionada.
  • A consulta dentro da etapa deve ter a palavra-chave PARTITION BY .

Quando uma consulta é particionada, os eventos de entrada são processados e agregados em grupos de partições separados e os eventos de saída são gerados para cada um dos grupos. Se quiser uma agregação combinada, você deve criar uma segunda etapa não particionada para agregar.

Calcular o máximo de unidades de streaming para um trabalho

Todas as etapas não particionadas juntas podem ser dimensionadas para uma unidade de streaming (SU V2s) para um trabalho do Stream Analytics. Além disso, você pode adicionar um SU V2 para cada partição em uma etapa particionada. Você pode ver alguns exemplos na tabela a seguir.

Query Max SUs para o trabalho
  • A consulta contém uma etapa.
  • A etapa não está particionada.
1 SU V2
  • O fluxo de dados de entrada é particionado por 16.
  • A consulta contém uma etapa.
  • A etapa é particionada.
16 SU V2 (1 * 16 divisórias)
  • A consulta contém duas etapas.
  • Nenhuma das etapas é particionada.
1 SU V2
  • O fluxo de dados de entrada é particionado por 3.
  • A consulta contém duas etapas. A etapa de entrada é particionada e a segunda etapa não.
  • A instrução SELECT lê a partir da entrada particionada.
4 SU V2s (3 para passos particionados + 1 para passos não particionados

Exemplos de dimensionamento

A consulta a seguir calcula o número de carros dentro de uma janela de três minutos passando por uma estação de pedágio que tem três cabines de pedágio. Esta consulta pode ser dimensionada para um SU V2.

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Para usar mais SUs para a consulta, tanto o fluxo de dados de entrada quanto a consulta devem ser particionados. Como a partição de fluxo de dados está definida como 3, a seguinte consulta modificada pode ser dimensionada até 3 SU V2s:

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Quando uma consulta é particionada, os eventos de entrada são processados e agregados em grupos de partições separados. Os eventos de saída também são gerados para cada um dos grupos. O particionamento pode causar alguns resultados inesperados quando o campo GROUP BY não é a chave de partição no fluxo de dados de entrada. Por exemplo, o campo TollBoothId na consulta anterior não é a chave de partição de Input1. O resultado é que os dados do TollBooth #1 podem ser espalhados em várias partições.

Cada uma das partições Input1 será processada separadamente pelo Stream Analytics. Como resultado, vários registros da contagem de carros para a mesma cabine de pedágio na mesma janela Tumbling serão criados. Se a chave de partição de entrada não puder ser alterada, esse problema pode ser corrigido adicionando uma etapa não particionária para agregar valores entre partições, como no exemplo a seguir:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Esta consulta pode ser dimensionada para 4 SU V2s.

Nota

Se você estiver unindo dois fluxos, certifique-se de que os fluxos são particionados pela chave de partição da coluna que você usa para criar as junções. Certifique-se também de que tem o mesmo número de partições em ambos os fluxos.

Alcançando rendimentos mais altos em escala

Um trabalho paralelo embaraçoso é necessário, mas não suficiente para sustentar um rendimento mais alto em escala. Cada sistema de armazenamento, e sua saída correspondente do Stream Analytics, tem variações sobre como obter a melhor taxa de transferência de gravação possível. Como em qualquer cenário em escala, existem alguns desafios que podem ser resolvidos usando as configurações certas. Esta seção discute as configurações para algumas saídas comuns e fornece amostras para sustentar as taxas de ingestão de eventos de 1 K, 5 K e 10 K por segundo.

As observações a seguir usam um trabalho do Stream Analytics com consulta sem estado (passagem), um UDF JavaScript básico que grava em Hubs de Eventos, Azure SQL ou Azure Cosmos DB.

Hubs de Eventos

Taxa de ingestão (eventos por segundo) Unidades Transmissão em Fluxo Recursos de saída
1 K 1/3 2 TU
5 K 1 6 TU
10 mil 2 10 TU

A solução Event Hubs é dimensionada linearmente em termos de unidades de streaming (SU) e taxa de transferência, tornando-se a maneira mais eficiente e eficiente de analisar e transmitir dados para fora do Stream Analytics. Os trabalhos podem ser dimensionados até 66 SU V2s, o que se traduz aproximadamente no processamento de até 400 MB/s, ou 38 trilhões de eventos por dia.

SQL do Azure

Taxa de ingestão (eventos por segundo) Unidades Transmissão em Fluxo Recursos de saída
1 K 2/3 S3
5 K 3 P4
10 mil 6 P6

O SQL do Azure dá suporte à escrita em paralelo, chamada Herdar particionamento, mas não está habilitada por padrão. No entanto, habilitar o particionamento de herdados, juntamente com uma consulta totalmente paralela, pode não ser suficiente para obter taxas de transferência mais altas. As taxas de transferência de gravação SQL dependem significativamente da configuração do banco de dados e do esquema da tabela. O artigo SQL Output Performance tem mais detalhes sobre os parâmetros que podem maximizar sua taxa de transferência de gravação. Conforme observado no artigo Saída do Azure Stream Analytics para o Banco de Dados SQL do Azure, essa solução não é dimensionada linearmente como um pipeline totalmente paralelo além de 8 partições e pode precisar de reparticionamento antes da saída SQL (consulte INTO). SKUs premium são necessários para sustentar altas taxas de E/S, juntamente com a sobrecarga de backups de log acontecendo a cada poucos minutos.

Azure Cosmos DB

Taxa de ingestão (eventos por segundo) Unidades Transmissão em Fluxo Recursos de saída
1 K 2/3 RU de 20 K
5 K 4 RU de 60 K
10 mil 8 RU de 120 K

A saída do Azure Cosmos DB do Stream Analytics foi atualizada para usar a integração nativa no nível de compatibilidade 1.2. O nível de compatibilidade 1.2 permite uma taxa de transferência significativamente maior e reduz o consumo de RU em comparação com o 1.1, que é o nível de compatibilidade padrão para novos trabalhos. A solução usa contêineres do Azure Cosmos DB particionados em /deviceId e o restante da solução é configurado de forma idêntica.

Todos os exemplos de Streaming em Escala do Azure usam Hubs de Eventos como entrada que é alimentada por clientes de teste de simulação de carga. Cada evento de entrada é um documento JSON de 1 KB, que traduz facilmente as taxas de ingestão configuradas em taxas de transferência (1 MB/s, 5 MB/s e 10 MB/s). Os eventos simulam um dispositivo IoT enviando os seguintes dados JSON (em um formato abreviado) para até 1.000 dispositivos:

{
    "eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
    "complexData": {
        "moreData0": 51.3068118685458,
        "moreData22": 45.34076957651598
    },
    "value": 49.02278128887753,
    "deviceId": "contoso://device-id-1554",
    "type": "CO2",
    "createdAt": "2019-05-16T17:16:40.000003Z"
}

Nota

As configurações estão sujeitas a alterações devido aos vários componentes utilizados na solução. Para obter uma estimativa mais precisa, personalize as amostras para se adequar ao seu cenário.

Identificação de gargalos

Use o painel Métricas em seu trabalho do Azure Stream Analytics para identificar gargalos em seu pipeline. Analise os Eventos de Entrada/Saída para verificar a taxa de transferência e "Atraso de marca d'água" ou Eventos em atraso para ver se o trabalho está acompanhando a taxa de entrada. Para métricas de Hubs de Eventos, procure Solicitações Limitadas e ajuste as Unidades de Limite de acordo. Para métricas do Azure Cosmos DB, revise RU/s máximo consumido por intervalo de chaves de partição em Taxa de transferência para garantir que seus intervalos de chaves de partição sejam consumidos uniformemente. Para o Banco de Dados SQL do Azure, monitore a E/S de Log e a CPU.

Obter ajuda

Para obter mais assistência, experimente a nossa página de perguntas e respostas da Microsoft para o Azure Stream Analytics.

Próximos passos