Использование параллелизации запросов в Azure Stream Analytics

В этой статье показано, как воспользоваться преимуществами параллелизма в Azure Stream Analytics. Узнайте, как масштабировать задания Stream Analytics с помощью настройки входных разделов, настройки аналитики определения запроса.

В качестве предварительных требований вам может потребоваться ознакомиться с понятием единицы потоковой передачи, описанной в разделе "Общие сведения и настройка единиц потоковой передачи".

Из каких частей состоит задание службы Stream Analytics?

Определение задания Stream Analytics включает как минимум один запрос, а также входы и выходы для потоковой передачи. Входные данные — это точки, откуда задания считывают данные из потока. Запрос используется для преобразования потока входных данных, а выходные данные являются точками, куда направляются результаты задания.

Секции во входах и выходах

Секционирование позволяет разделить данные на подмножества на основе ключа секции. Если входные данные (например, Центры событий) секционируются по ключу, рекомендуется указать ключ секции при добавлении входных данных в задание Stream Analytics. Масштабирование задания Stream Analytics реализует преимущества использования секций во входах или выходах. Задание Stream Analytics может получать и записывать различные секции параллельно, тем самым повышая пропускную способность.

Входные данные

Все входные данные потоковой передачи Azure Stream Analytics могут воспользоваться преимуществами секционирования: Центры событий, Центр Интернета вещей, хранилище BLOB-объектов, Data Lake Storage 2-го поколения.

Примечание.

Для уровня совместимости 1.2 и более поздних версий ключ секции должен быть задан как свойство входа, и тогда в запросе не нужно использовать ключевое слово PARTITION BY. Для уровня совместимости 1.1 и более ранних версий ключ секции нужно указывать в запросе с ключевым словом PARTITION BY.

Выходные данные

При работе со Stream Analytics можно воспользоваться преимуществами секционирования в концентраторах событий и выходных данных:

  • Azure Data Lake Storage
  • Функции Azure
  • Таблица Azure
  • Хранилище BLOB-объектов (требуется явно задать ключ раздела).
  • Azure Cosmos DB (необходимо явно задать ключ раздела)
  • Центры событий (требуется явно задать ключ раздела).
  • Центр Интернета вещей (требуется явно задать ключ раздела).
  • Cлужебная шина
  • SQL и Azure Synapse Analytics с необязательным секционированием: дополнительные сведения см. на странице Вывод в базу данных SQL Azure.

Power BI не поддерживает секционирование. Однако можно по-прежнему секционирование входных данных, как описано в этом разделе.

Дополнительные сведения об этих секциях см. в следующих статьях:

Query

Чтобы задание было параллельным, ключи секций должны быть согласованы между всеми входными данными, всеми шагами логики запроса и всеми выходными данными. В логике запроса секционирование определяется ключами, которые используются для соединений и агрегатов (GROUP BY). Последнее требование можно игнорировать, если логика запроса не содержит ключей (например, в проекциях, фильтрах, ссылочные соединениях).

  • Если входные данные и выходные данные секционируются WarehouseIdпо группам запросов ProductId без WarehouseIdних, задание не выполняется параллельно.
  • Если два входных данных для соединения секционируются по разным ключам секционирования (WarehouseId и ProductId), задание не параллельно.
  • Если в одном задании есть два или более независимых потока данных, каждый из которых имеет собственный ключ секции, задание выполняется без использования параллельного режима.

Только если все входные данные, выходные данные и шаги запроса используют один и тот же ключ, задание выполняется параллельно.

Задания с усложненным параллелизмом

Задание с усложненным параллелизмом — это самый масштабируемый сценарий в Azure Stream Analytics. Он соединяет один раздел входных данных с одним экземпляром запроса и одним разделом выходных данных. Такой параллелизм имеет следующие требования:

  • Если в логике запроса применяется ключ, который обрабатывается тем же экземпляром запроса, необходимо только проследить за тем, чтобы события попадали в ту же секцию входных данных. Для Центров событий или Центр Интернета вещей это означает, что данные события должны иметь набор значений PartitionKey. Кроме того, можно использовать секционированные отправители. Для хранилища BLOB-объектов это означает, что события отправляются в папку той же секции. Примером может служить экземпляр запроса, который агрегирует данные по userID, где входной Центр событий секционирован с использованием userID в качестве ключа секции. Но если логика запроса не требует обрабатывать ключ в том же экземпляре запроса, это требование можно игнорировать. В качестве примера такой логики можно привести простой запрос select, project или filter.

  • Следующий шаг заключается в секционировании раздела. Для заданий с уровнем совместимости 1.2 или выше (рекомендуется), настраиваемый столбец можно указать в качестве ключа секции в параметрах входных данных, а задание будет параллельно выполняться автоматически. Для заданий с уровнем совместимости 1.0 или 1.1 необходимо использовать PARTITION BY PartitionId во всех шагах запроса. Этапов может быть несколько, но на каждом из них должен использоваться один и тот же ключ.

  • Большинство выходов, поддерживаемых в Stream Analytics, могут использовать преимущества секционирования. При использовании типа выхода, который не поддерживает секционирование, задание не будет считаться заданием с усложненным параллелизмом. Если выход находится в Центрах событий, обязательно укажите в столбце ключа секции тот же ключ секции, который используется в запросе. Дополнительные сведения см . в разделе выходных данных.

  • Число секций входных данных должно совпадать с числом секций выходных данных. Выходные данные хранилища BLOB-объектов могут поддерживать секции и наследуют схему секционирования вышестоящего запроса. Если для хранилища BLOB-объектов указан ключ секции, то данные секционируются по входным секциям, поэтому результат по-прежнему вычисляется параллельно. Примеры значений секций, позволяющие выполнять задания с полной параллельной обработкой:

    • восемь секций входных данных в концентраторах событий и восемь секций выходных данных в концентраторах событий;
    • восемь секций входных данных в концентраторах событий и выход в хранилище BLOB-объектов;
    • восемь секций входных данных в концентраторах событий и выход в хранилище BLOB-объектов, с секционированием по пользовательскому полю с произвольной кратностью;
    • восемь секций входных данных в хранилище BLOB-объектов и выход в хранилище BLOB-объектов;
    • восемь секций входных данных в хранилище BLOB-объектов и восемь секций выходных данных в концентраторах событий.

Далее рассмотрим примеры сценариев с усложненным параллелизмом.

Простой запрос

  • Входные данные: концентратор событий с восемью секциями
  • Выходные данные: концентратор событий с восемью секциями (необходимо задать для использования PartitionIdключевой столбец секции)

Запрос:

    --Using compatibility level 1.2 or above
    SELECT TollBoothId
    FROM Input1
    WHERE TollBoothId > 100
    
    --Using compatibility level 1.0 or 1.1
    SELECT TollBoothId
    FROM Input1 PARTITION BY PartitionId
    WHERE TollBoothId > 100

Этот запрос является простым фильтром. Поэтому нам не нужно беспокоиться о секционировании входных данных, которые передаются в концентратор событий. Обратите внимание, что для удовлетворения требованию 2 задания с уровнем совместимости, предшествующим 1.2, должны содержать предложение PARTITION BY PartitionId. Выходные данные концентраторов событий необходимо настроить, указав значение PartitionId в качестве ключа секции. Последняя проверка: число секций входных данных должно быть равно числу секций выходных данных.

Запрос с ключом группирования

  • Входные данные: восемь секций в концентраторе событий.
  • Выходные данные — хранилище BLOB-объектов.

Запрос:

    --Using compatibility level 1.2 or above
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    
    --Using compatibility level 1.0 or 1.1
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Этот запрос содержит ключ группирования. Поэтому сгруппированные события нужно отправлять в одну секцию концентратора событий. Так как в этом примере мы группируем по TollBoothID, мы должны быть уверены, что TollBoothID используется в качестве ключа секции при отправке событий в Центры событий. Затем в Azure Stream Analytics можно использовать PARTITION BY PartitionId , чтобы наследовать от этой схемы секционирования и включить полную параллелизацию. Так как выходными данными является хранилище BLOB-объектов, не нужно беспокоиться о настройке значения ключа секции, как описано в требовании 4.

Пример сценариев, которые не* смущаются параллельно

В предыдущем разделе в статье рассматриваются некоторые неловко параллельные сценарии. В этом разделе вы узнаете о сценариях, которые не соответствуют всем требованиям, чтобы быть неловко параллельными.

Несоответствие в числе секций

  • Входные данные: концентратор событий с восемью секциями
  • Выходные данные: концентратор событий с 32 секциями

Если число секций входа не совпадает с числом секций выхода, топология не является топологией с усложненным параллелизмом независимо от запроса. Но и в этом случае мы можем получить определенный уровень параллелизации.

Запрос с использованием несекционированных выходных данных

  • Входные данные: концентратор событий с восемью секциями
  • Выходные данные: Power BI

В настоящее время выходные данные Power BI не поддерживают секционирование. Таким образом этот сценарий не будет чрезвычайно параллельным.

Многоэтапный запрос с разными значениями параметра PARTITION BY

  • Входные данные: восемь секций в концентраторе событий.
  • Выходные данные: восемь секций в концентраторе событий.
  • Уровень совместимости: 1.0 или 1.1

Запрос:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId, PartitionId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1 Partition By TollBoothId
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Как видите, на втором этапе в качестве ключа секционирования используется TollBoothId . Он отличается от значения в первом шаге, а значит, потребуется перетасовка.

Многоэтапный запрос с разными значениями параметра PARTITION BY

  • Входные данные: восемь секций в концентраторе событий (параметр "Столбец ключа секции" не задан, то есть используется значение по умолчанию PartitionId)
  • Выходные данные: восемь секций в концентраторе событий (параметр "Столбец ключа секции" должен иметь значение "TollBoothId").
  • Уровень совместимости — 1.2 или выше

Запрос:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Уровень совместимости 1.2 и выше обеспечивает параллельное выполнение запросов по умолчанию. Например, запрос из предыдущего раздела будет секционирован до тех пор, пока в качестве входного ключа секции задан столбец "TollBoothId". Предложение PARTITION BY PartitionId не требуется.

Расчет максимального количества единиц потоковой передачи для задания

Общее число единиц потоковой передачи, которое можно использовать заданием Stream Analytics, зависит от числа шагов в запросе, определенных для задания, и количества разделов для каждого шага.

Шаги в запросе

Запрос может иметь один или несколько шагов. Каждый шаг — это вложенный запрос, определенный с помощью ключевого слова WITH. Запрос за рамками ключевого слова WITH (только один запрос) также учитывается в качестве шага (например, инструкция SELECT в следующем запросе).

Запрос:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )
    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute,3), TollBoothId

Этот запрос включает 2 шага.

Примечание.

Этот запрос будет описан далее в этой статье.

Разделы шага

Разделение шага требует наличия следующих условий.

  • Источник входных данных должен быть секционирован.
  • Инструкция SELECT запроса должна читаться из разделенного источника входных данных.
  • Запрос внутри шага должен включать ключевое слово PARTITION BY.

Если запрос разделен, входные данные событий будут обработаны и объединены в отдельные группы секции, а выходные данные событий будут сгенерированы для каждой из групп. Если требуется объединенная агрегирование, необходимо создать второй непартиментированный шаг для статистической обработки.

Рассчитайте максимальное количество единиц потоковой передачи для задания

Все непартиментные шаги вместе могут масштабироваться до одной единицы потоковой передачи (SU V2s) для задания Stream Analytics. Кроме того, можно добавить один SU V2 для каждой секции в секционированного шага. В следующей таблице приведены некоторые примеры .

Query Максимальное количество единиц потоковой передачи для задания
  • Запрос содержит один шаг.
  • Этот шаг не секционирован.
1 SU V2
  • Поток входных данных секционирован по 16.
  • Запрос содержит один шаг.
  • Шаг является секционированным.
16 SU V2 (1 * 16 секций)
  • Запрос состоит из двух шагов.
  • Ни один из шагов не секционирован.
1 SU V2
  • Поток входных данных секционирован по 3.
  • Запрос состоит из двух шагов. Входной шаг секционирован, а второй — нет.
  • Инструкция SELECT считывает из секционированных входных данных.
4 SU V2s (3 для секционированных шагов + 1 для непартионированных шагов

Примеры масштабирования

Следующий запрос вычисляет количество машин, проходящих через пропускной пункт с тремя пунктами для оплаты и пропускной способности три минуты для каждого пункта. Этот запрос можно масштабировать до одного su версии 2.

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Чтобы использовать дополнительные единицы потоковой передачи для запроса, входной поток данных и запрос должны быть секционированы. Так как для секции потока данных задано значение 3, следующий измененный запрос можно масштабировать до 3 SU V2s:

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Если запрос секционирован, входные данные событий будут обработаны и объединены в отдельные группы секций. Кроме того, для каждой из групп будут сформированы выходные данные событий. Секционирование может привести к непредвиденным результатам, если во входном потоке данных поле GROUP BY не является ключом секции. Для примера предположим, что поле TollBoothId в предыдущем запросе не является ключом секции в Input1. В результате данные из пункта 1 можно распределить между несколькими секциями.

Каждая из секций Input1 будет обрабатываться отдельно с помощью Stream Analytics. В результате будет создаваться несколько записей для автомобиля, проходящего через один и тот же пункт. Если входной ключ секции не может быть изменен, эта проблема может быть исправлена путем добавления шага nonpartition для агрегирования значений между секциями, как показано в следующем примере:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Этот запрос можно масштабировать до 4 su V2s.

Примечание.

При объединении двух потоков убедитесь, что потоки разделены с помощью ключа секции столбца, используемого для объединения. Также убедитесь, что количество секций в обоих потоках одинаковое.

Достижение более высокой пропускной способности в большом масштабе

Задание с усложненным параллелизмом является необходимым, но недостаточным для обеспечения высокой пропускной способности в большом масштабе. В каждой системе хранения и в соответствующем ей выходе Stream Analytics есть разные возможности, позволяющие добиться наилучшей пропускной способности записи. Как и в любом сценарии в масштабе, существуют некоторые проблемы, которые можно решить с помощью правильных конфигураций. В этом разделе рассматриваются конфигурации для нескольких распространенных выходных данных и приведены примеры для поддержания скорости приема 1 K, 5 K и 10 K событий в секунду.

В следующих наблюдениях используется задание Stream Analytics с запросом без отслеживания состояния (сквозным), базовым UDF JavaScript, который записывается в Центры событий, SQL Azure или Azure Cosmos DB.

Event Hubs

Скорость приема (событий в секунду) Единицы потоковой передачи Выходные ресурсы
1 K 1/3 2 единицы пропускной способности
5000 1 6 единиц пропускной способности
10 тыс. 2 10 единиц пропускной способности

Решение на основе службы Центры событий позволяет линейно масштабировать единицы потоковой передачи (SU) и пропускную способность, что делает его наиболее эффективным и производительным способом для анализа данных и потоковой передачи данных из Stream Analytics. Задания можно масштабировать до 66 su V2s, что примерно преобразуется в обработку до 400 МБ/с или 38 трлн событий в день.

Azure SQL

Скорость приема (событий в секунду) Единицы потоковой передачи Выходные ресурсы
1 K 2/3 S3
5000 3 P4
10 тыс. 6 P6

Azure SQL поддерживает параллельную запись, называемую наследованием секционирования, но по умолчанию она отключена. Однако включение секционирования наследование вместе с полностью параллельным запросом может быть недостаточно для достижения более высокой пропускной способности. Пропускная способность записи SQL в значительной степени зависит от конфигурации базы данных и схемы таблицы. Дополнительные сведения о параметрах, позволяющих максимально увеличить пропускную способность записи, см. в статье о выводе в базу данных SQL. Как отмечалось в выходных данных Azure Stream Analytics в База данных SQL Azure статье, это решение не масштабируется линейно как полностью параллельный конвейер за пределами 8 секций и может потребоваться повторное разделение перед выходными данными SQL (см. раздел INTO). Для обеспечения высоких скоростей ввода-вывода, а также минимизации издержек резервного копирования журналов, происходящего каждые несколько минут, необходимы номера SKU ценовой категории "Премиум".

Azure Cosmos DB

Скорость приема (событий в секунду) Единицы потоковой передачи Выходные ресурсы
1 K 2/3 20 K ЕЗ
5000 4 60 K ЕЗ
10 тыс. 8 120 K ЕЗ

Выходные данные Azure Cosmos DB из Stream Analytics обновлены, чтобы использовать встроенную интеграцию на уровне совместимости 1.2. Уровень совместимости 1.2 обеспечивает значительно более высокую пропускную способность и сокращает потребление ЕЗ по сравнению с уровнем 1.1, который является уровнем совместимости по умолчанию для новых заданий. Решение использует контейнеры Azure Cosmos DB, секционированные на /deviceId, и остальные решения настроены одинаково.

Во всех примерах потоковой передачи в большом масштабе для Azure в качестве входного ресурса используется Центр событий, в который поступает нагрузка от эмулированных тестовых клиентов. Каждое входное событие — это документ JSON 1 КБ, который легко преобразует настроенные частоты приема в пропускную способность (1 МБ/с, 5 МБ/с и 10 МБ/с). События имитируют устройство Интернета вещей, отправляющее следующие данные JSON (в сокращенной форме) для до 1000 устройств:

{
    "eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
    "complexData": {
        "moreData0": 51.3068118685458,
        "moreData22": 45.34076957651598
    },
    "value": 49.02278128887753,
    "deviceId": "contoso://device-id-1554",
    "type": "CO2",
    "createdAt": "2019-05-16T17:16:40.000003Z"
}

Примечание.

Поскольку в решениях могут использоваться различные компоненты, конфигурации могут быть изменены. Чтобы получить более точную оценку, настройте примеры в соответствии с имеющимся сценарием.

Выявление узких мест

Для выявления узких мест в конвейере можно воспользоваться панелью "Метрики" в задании Azure Stream Analytics. Просмотрите параметры События ввода и вывода с данными о пропускной способности и Предельная задержка или Отложенные события, чтобы узнать, выполняется ли задание в соответствии с входной скоростью. Чтобы просмотреть метрики Центров событий, проверьте раздел Регулируемые запросы, а затем соответствующим образом скорректируйте единицы порогового значения. Сведения о метриках Azure Cosmos DB см. в разделе "Пропускная способность", чтобы обеспечить равномерное использование диапазонов ключей секции. Чтобы просмотреть метрики для базы данных SQL Azure, см. разделы Операции ввода-вывода журнала и ЦП.

Получить помощь

За дополнительной информацией перейдите на страницу вопросов и ответов об Azure Stream Analytics.

Следующие шаги