Руководство. Запуск Функции Azure из заданий Azure Stream Analytics

В этом руководстве вы создадите задание Azure Stream Analytics, которое считывает события из Центры событий Azure, выполняет запрос к данным события, а затем вызывает функцию Azure, которая записывается в экземпляр Кэш Azure для Redis.

Снимок экрана: связь между службами Azure в решении.

Примечание.

  • Вы можете запустить Функции Azure из Azure Stream Analytics, настроив Функции в качестве одного из приемников (выходных данных) в задание Stream Analytics. Служба "Функции" — это ориентированная на события среда вычислений по требованию, которая позволяет реализовывать код, активируемый по событиям, возникающим в Azure или в сторонних службах. Эта возможность службы "Функции" реагировать на триггеры упрощает вывод данных в задания Stream Analytics.
  • Stream Analytics вызывает службу "Функции" с помощью триггеров HTTP. Выходной адаптер службы "Функции" позволяет пользователям подключать службу "Функции" к Stream Analytics таким образом, что события могут запускаться на основе запросов Stream Analytics.
  • Подключение к Функциям Azure в виртуальной сети из задания Stream Analytics, которое выполняется в многотенантном кластере, не поддерживается.

В этом руководстве описано следующее:

  • Создание экземпляра Центры событий Azure
  • Создание экземпляра кэша Azure для Redis
  • Создание функции Azure
  • Создание задания Stream Analytics
  • Настройка концентратора событий в качестве входных и функций в качестве выходных данных
  • Выполнение задания Stream Analytics
  • проверить кэш Azure для Redis на наличие результатов.

Если у вас еще нет подписки Azure, создайте бесплатную учетную запись, прежде чем начинать работу.

Необходимые компоненты

Прежде чем начать работу, нужно сделать следующее:

  • Если у вас еще нет подписки Azure, создайте бесплатную учетную запись.
  • Загрузите приложение генератора событий телефонных вызововTelcoGenerator.zip из Центра загрузки Майкрософт или получите исходный код на сайте GitHub.

Вход в Azure

Войдите на портал Azure.

Создание концентратора событий

Прежде чем Stream Analytics сможет проанализировать поток мошеннических вызовов, необходимо отправить некоторые примеры данных в концентратор событий. В этом руководстве вы отправляете данные в Azure с помощью Центры событий Azure.

Следуйте инструкциям ниже, чтобы создать концентратор событий и отправить в него данные вызовов:

  1. Войдите на портал Azure.

  2. Выберите все службы в меню слева, выберите Интернет вещей, наведите указатель мыши на центры событий и нажмите кнопку +(Добавить).

    Снимок экрана: страница создания центров событий.

  3. На странице "Создание пространства имен" выполните следующие действия.

    1. Выберите подписку Azure, в которой нужно создать концентратор событий.

    2. Для группы ресурсов выберите "Создать" и введите имя группы ресурсов. Пространство имен Центров событий создается в этой группе ресурсов.

    3. Для имени пространства имен введите уникальное имя пространства имен Центров событий.

    4. В поле "Расположение" выберите регион, в котором нужно создать пространство имен.

    5. Для ценовой категории выберите "Стандартный".

    6. В нижней части страницы выберите Review + create (Проверить и создать).

      Снимок экрана: страница

    7. На странице "Проверка и создание" мастера создания пространства имен выберите "Создать" в нижней части страницы после просмотра всех параметров.

  4. После успешного развертывания пространства имен выберите Перейти к ресурсу, чтобы перейти на страницу Пространство имен Центров событий.

  5. На странице Пространство имен Центров событий выберите +Центр событий в строке команд.

    Снимок экрана: кнопка

  6. На странице Создание центра событий введите имя концентратора событий. Установите для параметра Количество разделов значение 2. Используйте параметры по умолчанию в оставшихся параметрах и выберите "Просмотр и создание".

    Снимок экрана на котором отображается страница создания центра событий.

  7. На странице Просмотр и создание выберите Создать в нижней части страницы. Дождитесь успешного завершения развертывания.

Предоставление доступа к концентратору событий и получение строки подключения

Чтобы приложение могло отправлять данные в Центры событий Azure, в концентраторе событий должна быть политика, обеспечивающая доступ. Политика доступа создает строку подключения, которая включает сведения об авторизации.

  1. На странице пространства имен Центров событий выберите политики общего доступа в меню слева.

  2. Выберите RootManageSharedAccessKey из списка политик.

  3. Затем нажмите кнопку копирования рядом со строкой Подключение ion — первичный ключ.

  4. Вставьте строку подключения в текстовый редактор. Эта строка подключения вам понадобится в следующем разделе.

    Строка подключения выглядит следующим образом:

    Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>

    Обратите внимание, что строка подключения содержит несколько пар "ключ-значение", разделенных точкой с запятой: Endpoint, SharedAccessKeyName и SharedAccessKey.

Запуск приложения генератора событий

Перед запуском приложения TelcoGenerator необходимо настроить его для отправки данных в созданный ранее концентратор событий Azure.

  1. Извлеките содержимое файла TelcoGenerator.zip.

  2. Откройте файл TelcoGenerator\TelcoGenerator\telcodatagen.exe.config в текстовом редакторе по своему выбору. Файлов с расширением .config несколько, поэтому убедитесь, что открываете нужный файл.

  3. Обновите элемент <appSettings> в файле конфигурации, указав следующие сведения:

    • Задайте для ключа EventHubName значение EntityPath в конце строки подключения.
    • Задайте значение Microsoft.ServiceBus.ПодключениеКлюч ionString к строка подключения пространству имен. Если в концентраторе событий используется строка подключения, а не пространство имен, удалите EntityPath значение (;EntityPath=myeventhub) в конце. Обязательно удалите точку с запятой перед значением EntityPath.
  4. Сохраните файл.

  5. Далее откройте командное окно и перейдите в папку, в которой распаковано приложение TelcoGenerator. Затем введите следующую команду:

    .\telcodatagen.exe 1000 0.2 2
    

    Эта команда принимает следующие параметры:

    • Число записей данных звонков в час.
    • Вероятность мошенничества, которая представляет частоту моделирования приложением мошеннических вызовов. Значение 0,2 означает, что около 20% записей звонков выглядят мошенническими.
    • Продолжительность в часах, которая соответствует количеству часов, на протяжении которых должно выполняться приложение. Вы также можете в любое время остановить выполнение приложения, завершив процесс (CTRL+C) в командной строке.

    Через несколько секунд приложение запустит отображение записей вызовов на экране, так как будет отправлять их в концентратор событий. Данные телефонных звонков содержат следующие поля:

    Запись Определение
    CallrecTime Метка времени начала вызова.
    SwitchNum Телефонный переключатель, используемый для совершения вызова. В этом примере переключатели выражены строками, представляющими страну или регион происхождения (США, Китай, Соединенное Королевство, Германия или Австралия).
    CallingNum Номер телефона звонящего.
    CallingIMSI Идентификатор абонента международной мобильной связи (IMSI). Это уникальный идентификатор звонящего.
    CalledNum Номер телефона получателя.
    CalledIMSI Идентификатор IMSI. Это уникальный идентификатор получателя звонка.

Создание задания Stream Analytics

Теперь, когда у вас есть поток событий звонков, можно создать задание Stream Analytics, которое считывает данные из концентратора событий.

  1. Чтобы создать задание Stream Analytics, перейдите на портал Azure.
  2. Выберите Создать ресурс и выполните поиск по фразе Задание Stream Analytics. Выберите плитку Задание Stream Analytics и щелкните Создать.
  3. На странице задания New Stream Analytics выполните следующие действия.
    1. Для подписки выберите подписку, содержащую пространство имен Центров событий.

    2. Для группы ресурсов выберите созданную ранее группу ресурсов.

    3. В разделе сведений об экземпляре введите уникальное имя задания Stream Analytics.

    4. Для региона выберите регион, в котором нужно создать задание Stream Analytics. Рекомендуется разместить задание и концентратор событий в одном регионе, чтобы обеспечить лучшую производительность и не платить за передачу данных между регионами.

    5. Для среды< размещения выберите Cloud , если она еще не выбрана. Задания Stream Analytics можно развернуть в облаке или на граничных устройствах. Значение Cloud позволяет выполнять развертывание в облаке Azure, а значение Edge — на устройстве IoT Edge.

    6. Для единиц потоковой передачи выберите 1. Единица потоковой передачи предоставляет вычислительные ресурсы, которые необходимы для выполнения задания. По умолчанию установлено значение 1. Чтобы узнать о масштабировании единиц потоковой передачи, ознакомьтесь со статьей Обзор и настройка единиц потоковой передачи.

    7. В нижней части страницы выберите Review + create (Проверить и создать).

      Снимок экрана: страница создания задания Azure Stream Analytics.

  4. На странице "Просмотр и создание" проверьте параметры и нажмите кнопку "Создать", чтобы создать задание Stream Analytics.
  5. После развертывания задания на странице развертывания выберите Перейти к ресурсу, чтобы перейти на страницу Задание Stream Analytics.

Настройка входных данных для задания

Следующий шаг — определить источник входных данных, откуда задание будет считывать данные с помощью концентратора событий, созданного в предыдущем разделе.

  1. На странице Задание Stream Analytics в разделе Топология заданий в меню слева выберите Входные данные.

  2. На странице "Входные данные" выберите +Добавить входные данные и концентратор событий.

    Снимок экрана: страница

  3. На странице концентратора событий выполните следующие действия.

    1. Для псевдонима входных данных введите CallStream. Псевдоним ввода — это понятное имя для идентификации входных данных. Входной псевдоним может содержать только буквенно-цифровые символы, дефисы и знаки подчеркивания. Длина должна составлять от 3 до 63 символов.

    2. Для подписки выберите подписку Azure, в которой вы создали концентратор событий. Концентратор событий может находиться в той же подписке, что и задание Stream Analytics, или в другой.

    3. Для пространства имен Центров событий выберите пространство имен Центров событий, созданное в предыдущем разделе. Все пространства имен, доступные в текущей подписке, перечислены в раскрывающемся списке.

    4. Для имени концентратора событий выберите концентратор событий, созданный в предыдущем разделе. Все центры событий, доступные в выбранном пространстве имен, перечислены в раскрывающемся списке.

    5. Для группы потребителей концентратора событий следует выбрать параметр "Создать", чтобы создать новую группу потребителей в концентраторе событий. Для каждого задания Stream Analytics рекомендуется использовать отдельную группу получателей. Если группа потребителей не указана, задание Stream Analytics использует группу потребителей $Default . Если задание содержит самосоединение или несколько источников входных данных, некоторые входные данные могут позднее считываться несколькими читателями. Эта ситуация влияет на количество читателей в группе потребителей.

    6. В качестве режима аутентификации выберите вариант Строка подключения. Проще протестировать учебник с помощью этого параметра.

    7. Для имени политики концентратора событий выберите "Использовать существующую", а затем выберите созданную ранее политику.

    8. В нижней части страницы нажмите кнопку Сохранить.

      Снимок экрана: страница конфигурации Центров событий для входных данных.

Создание экземпляра кэша Azure для Redis

  1. Создайте кэш в кэше Azure для Redis, выполнив действия, описанные в этом разделе.

  2. После создания кэша в разделе Параметры выберите Ключи доступа. Запишите основную строку подключения.

    Снимок экрана: выбор элемента меню

Создание функции в службе "Функции Azure", которая может записывать данные в кэш Azure для Redis

  1. Сведения см. в разделе Создание приложения-функции документации по службе "Функции". Этот пример был создан с помощью:

  2. Создайте приложение-функцию HttpTrigger по умолчанию в Visual Studio Code, следуя этому руководству. Используются следующие сведения: язык: среда выполнения .NET 6 : C#(под функцией версии 4), шаблон: HTTP trigger

  3. Установите клиентскую библиотеку Redis, выполнив следующую команду в терминале, расположенном в папке проекта:

    dotnet add package StackExchange.Redis --version 2.2.88
    
  4. Добавьте элементы RedisConnectionString и RedisDatabaseIndex в раздел Values файла local.settings.json, заполнив строку подключения целевого сервера:

    {
        "IsEncrypted": false,
        "Values": {
            "AzureWebJobsStorage": "",
            "FUNCTIONS_WORKER_RUNTIME": "dotnet",
            "RedisConnectionString": "Your Redis Connection String",
            "RedisDatabaseIndex":"0"
        }
    }
    

    Индекс базы данных Redis — это число от 0 до 15, с помощью которого можно идентифицировать базу данных в экземпляре.

  5. Замените функцию целиком (CS-файл в проекте) на следующий фрагмент кода. Замените пространство имен, имя класса и имя функции на собственные:

    using System;
    using System.IO;
    using System.Threading.Tasks;
    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Azure.WebJobs;
    using Microsoft.Azure.WebJobs.Extensions.Http;
    using Microsoft.AspNetCore.Http;
    using Microsoft.Extensions.Logging;
    using Newtonsoft.Json;
    
    using StackExchange.Redis;
    
    namespace Company.Function
    {
        public static class HttpTrigger1{
            [FunctionName("HttpTrigger1")]
            public static async Task<IActionResult> Run(
                [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
                ILogger log)
            {
                // Extract the body from the request
                string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
                if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
    
                dynamic data = JsonConvert.DeserializeObject(requestBody);
    
                // Reject if too large, as per the doc
                if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
    
                string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString");
                int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex"));
    
                using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString))
                {
                    // Connection refers to a property that returns a ConnectionMultiplexer
                    IDatabase db = connection.GetDatabase(RedisDatabaseIndex);
    
                    // Parse items and send to binding
                    for (var i = 0; i < data.Count; i++)
                    {
                        string key = data[i].Time + " - " + data[i].CallingNum1;
    
                        db.StringSet(key, data[i].ToString());
                        log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
    
                        // Simple get of data types from the cache
                        string value = db.StringGet(key);
                        log.LogInformation($"Database got: {key} => {value}");
    
                    }
                }
                return new OkResult(); // 200
            }
        }
    }
    

    Когда Stream Analytics получает исключение "Сущность запроса HTTP слишком большая" из функции, размер пакетов, отправляемых в службу "Функции", уменьшается. Следующий код гарантирует, что Stream Analytics не отправляет пакеты слишком большого размера. Убедитесь, что максимальное количество пакетов и размеры значений, используемые в функции, соответствуют значениям, введенным на портале Stream Analytics.

  6. Теперь функцию можно опубликовать в Azure.

  7. Откройте функцию в портал Azure и задайте параметры приложения для RedisConnectionString иRedisDatabaseIndex.

Обновление задания Stream Analytics с использованием функции в качестве выходных данных

  1. Откройте задание Stream Analytics на портале Azure.

  2. Перейдите к функции и выберите Обзор>Выходные данные>Добавить. Чтобы добавить новые выходные данные, выберите Функция Azure в качестве приемника. Новый выходной адаптер службы "Функции" поддерживает следующие свойства:

    Имя свойства Description
    Псевдоним выходных данных Понятное имя, используемое в запросах задания для ссылки на эти выходные данные.
    Вариант импорта Вы можете использовать эту функцию из текущей подписки или указать параметры вручную, если функция находится в другой подписке.
    Приложение-функция Имя приложения службы "Функции".
    Function Имя функции в приложении службы "Функции" (имя функции run.csx).
    Максимальный размер пакета Задает максимальный размер каждого пакета выходных данных в байтах, который передается в функцию. По умолчанию установлено значение 262 144 байт (256 КБ).
    Максимальное количество пакетов Указывает максимальное число событий в каждом пакете, который отправляется в функцию. Значение по умолчанию — 100. Это необязательное свойство.
    Ключ Позволяет использовать функцию из другой подписки. Укажите значение ключа для доступа к функции. Это необязательное свойство.
  3. Введите имя для выходного псевдонима. В этом руководстве оно называется saop1, но вы можете использовать любое имя вашего выбора. Заполните другие сведения.

  4. Откройте задание Stream Analytics и обновите запрос следующим образом.

    Внимание

    В следующем примере скрипта предполагается, что для входного имени и saop1 используется callStream для имени вывода. Если вы использовали разные имена, не забудьте обновить запрос.

     SELECT
             System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1,
             CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2
         INTO saop1
         FROM CallStream CS1 TIMESTAMP BY CallRecTime
            JOIN CallStream CS2 TIMESTAMP BY CallRecTime
             ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5
         WHERE CS1.SwitchNum != CS2.SwitchNum
    
  5. Запустите приложение telcodatagen.exe, выполнив следующую команду в командной строке. Команда использует следующий формат: telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours].

    telcodatagen.exe 1000 0.2 2
    
  6. Запустите задание Stream Analytics.

  7. На странице "Монитор" для функции Azure вы увидите, что эта функция вызывается.

    Снимок экрана: страница

  8. На странице Кэш Azure для Redis кэш выберите метрики в меню слева, добавьте метрику записи кэша и задайте длительность последнего часа. Вы увидите диаграмму, аналогичную следующему изображению.

    Снимок экрана: страница метрик для Кэш Azure для Redis.

проверить кэш Azure для Redis на наличие результатов.

Получение ключа из журналов Функции Azure

Сначала получите ключ для записи, вставленной в Кэш Azure для Redis. В коде ключ вычисляется в функции Azure, как показано в следующем фрагменте кода:

string key = data[i].Time + " - " + data[i].CallingNum1;

db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
  1. Перейдите к портал Azure и найдите приложение Функции Azure.

  2. Выберите Функции в меню слева.

  3. Выберите HTTPTrigger1 из списка функций.

  4. Выберите "Монитор" в меню слева.

  5. Перейдите на вкладку "Журналы ".

  6. Запишите ключ из информационного сообщения, как показано на следующем снимке экрана. Этот ключ используется для поиска значения в Кэш Azure для Redis.

    Снимок экрана: страница

Используйте ключ для поиска записи в Кэш Azure для Redis

  1. Перейдите на портал Azure и найдите кэш Azure для Redis. Выберите Консоль.

  2. Используйте команды кэша Azure для Redis, чтобы проверить наличие данных в кэше Azure для Redis. (Команда принимает формат Get {key}.) Используйте ключ, скопированный из журналов Монитора для функции Azure (в предыдущем разделе).

    Получение "KEY-FROM-THE-PREVIOUS-SECTION"

    Эта команда должна вывести значение для указанного ключа:

    Снимок экрана: консоль кэша Redis с выходными данными команды Get.

Обработка ошибок и повторные попытки

Если при отправке событий в Функции Azure происходит сбой, Stream Analytics повторяет большинство операций. Все исключения http извлекаются до успешного выполнения, за исключением ошибки HTTP 413 (слишком большая сущность). Ошибка "Сущность слишком большая" рассматривается как ошибка данных, к которой применяется политика повтора или удаления.

Примечание.

Время ожидания для запросов HTTP от Stream Analytics к Функциям Azure устанавливается на 100 секунд. Если обработка пакета приложением "Функции Azure" занимает более 100 секунд, Stream Analytics выдает сообщение об ошибке и повторяет попытку обработки пакета.

Повторная попытка времени ожидания может привести к повторяющимся событиям, записанным в приемник выходных данных. Когда Stream Analytics пытается повторно обработать пакет с ошибкой, он повторяет попытки для всех событий в этом пакете. Рассмотрим, например, пакет из 20 событий, которые отправляются в Функции Azure из Stream Analytics. Предположим, что приложению "Функции Azure" требуется 100 секунд для обработки первых 10 событий в этом пакете. Через 100 секунд Stream Analytics приостанавливает запрос, так как он не получил положительный ответ от Функции Azure, а другой запрос отправляется для того же пакета. Первые 10 событий в пакете обрабатываются приложением "Функции Azure" повторно, что приводит к дублированию.

Известные проблемы

На портале Azure при попытке сброса значения максимального размера пакета или максимального числа пакетов до нуля (значения по умолчанию) после сохранения значение возвращается к ранее введенному. В этом случае введите значения по умолчанию для этих полей вручную.

Служба Stream Analytics в настоящее время не поддерживает использование маршрутизации HTTP в Функциях Azure.

Поддержка подключения к Функции Azure, размещенной в виртуальной сети, не включена.

Очистка ресурсов

Ставшие ненужными группу ресурсов, задание потоковой передачи и все связанные ресурсы можно удалить. При удалении задания будет прекращена тарификация за единицы потоковой передачи, потребляемые заданием. Если вы планируете использовать это задание в будущем, вы можете остановить и перезапустить его позже. Если вы не собираетесь использовать это задание дальше, удалите все ресурсы, созданные в ходе работы с этим руководством, сделав следующее:

  1. В меню слева на портале Azure выберите Группы ресурсов, а затем выберите имя созданного ресурса.
  2. На странице группы ресурсов выберите Удалить, в текстовом поле введите имя ресурса для удаления и щелкните Удалить.

Следующие шаги

В этом руководстве вы создали простое задание Stream Analytics, которое запускает функцию Azure. Чтобы узнать больше о заданиях Stream Analytics, перейдите к следующему руководству: