Руководство. Использование структурированной потоковой передачи Apache Spark с Apache Kafka в HDInsight

В этом руководстве показано, как использовать структурированную потоковую передачу Apache Spark, чтобы считывать и записывать данные с использованием Apache Kafka в Azure HDInsight.

Структурированная потоковая передача Spark — это механизм обработки потока, встроенный в Spark SQL. Он позволяет выражать потоковые вычисления так же, как пакетные вычисления статических данных.

В этом руководстве описано следующее:

  • Использование шаблона Azure Resource Manager для создания кластеров.
  • Использование структурированной потоковой передачи Spark с Kafka.

Выполнив инструкции, не забудьте удалить кластеры, чтобы избежать ненужных расходов.

Необходимые компоненты

Внимание

Чтобы выполнить действия, описанные в этом документе, необходимо иметь группу ресурсов Azure, которая содержит кластеры Spark и Kafka в HDInsight. Оба этих кластера находятся в виртуальной сети Azure, что позволяет кластеру Spark напрямую обмениваться данными с кластером Kafka.

Для удобства в этом документе есть ссылка на шаблон, с помощью которого можно создать все необходимые ресурсы Azure.

Дополнительные сведения об использовании HDInsight в виртуальной сети см. в статье опланирование виртуальной сети для Azure HDInsight.

Структурированная потоковая передача с помощью Apache Kafka

Структурированная потоковая передача Spark — это механизм обработки потока, встроенный в ядро Spark SQL. При использовании структурированной потоковой передачи можно писать запросы потоковой передачи так же, как и пакетные запросы.

В следующих фрагментах кода демонстрируется чтение данных из Kafka и их сохранение в файле. Первый фрагмент кода — это пакетная операция, а второй — операция потоковой передачи.

// Read a batch from Kafka
val kafkaDF = spark.read.format("kafka")
                .option("kafka.bootstrap.servers", kafkaBrokers)
                .option("subscribe", kafkaTopic)
                .option("startingOffsets", "earliest")
                .load()

// Select data and write to file
kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip")
                .write
                .format("parquet")
                .option("path","/example/batchtripdata")
                .option("checkpointLocation", "/batchcheckpoint")
                .save()
// Stream from Kafka
val kafkaStreamDF = spark.readStream.format("kafka")
                .option("kafka.bootstrap.servers", kafkaBrokers)
                .option("subscribe", kafkaTopic)
                .option("startingOffsets", "earliest")
                .load()

// Select data from the stream and write to file
kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip")
                .writeStream
                .format("parquet")
                .option("path","/example/streamingtripdata")
                .option("checkpointLocation", "/streamcheckpoint")
                .start.awaitTermination(30000)

В обоих фрагментах кода данные считываются из Kafka и записываются в файл. Ниже показаны различия в примерах.

Пакетная служба Потоковая передача
read readStream
write writeStream
save start

Операция потоковой передачи также использует awaitTermination(30000), что останавливает потоковую передачу спустя 30 000 мс.

Чтобы использовать структурированную потоковую передачу с помощью Kafka, ваш проект должен иметь зависимость в пакете org.apache.spark : spark-sql-kafka-0-10_2.11. Версия этого пакета должна соответствовать версии Spark в HDInsight. Для Spark 2.4 (доступно в HDInsight 4.0) можно найти сведения о зависимости для различных типов проектов по адресу https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-sql-kafka-0-10_2.11%7C2.2.0%7Cjar.

Для приложения Jupyter Notebook, используемого в рамках этого руководства, следующая ячейка передает зависимость пакета:

%%configure -f
{
    "conf": {
        "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
        "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"
    }
}

Создание кластеров

Apache Kafka в HDInsight не предоставляет доступ к брокерам Kafka через общедоступный сегмент Интернета. Все ресурсы, которые использует Kafka, должны находиться в одной виртуальной сети Azure. В рамках этого руководства кластеры Kafka и Spark расположены в одной и той же виртуальной сети Azure.

На следующей схеме показано, как происходит обмен данными между Spark и Kafka:

Diagram of Spark and Kafka clusters in an Azure virtual network.

Примечание.

Служба Kafka ограничена обменом данными в пределах виртуальной сети. Другие службы в кластере, например SSH и Ambari, могут быть доступны через Интернет. Дополнительные сведения об общих портах, доступных в HDInsight, см. в статье Порты и универсальные коды ресурсов (URI), используемые кластерами HDInsight.

Чтобы создать виртуальную сеть Azure, а затем создать кластеры Kafka и Spark в ее пределах, выполните такие действия:

  1. Нажмите эту кнопку, чтобы войти в Azure и открыть шаблон на портале Azure.

    Deploy to Azure button for new cluster

    Шаблон Azure Resource Manager доступен по адресу https://raw.githubusercontent.com/Azure-Samples/hdinsight-spark-kafka-structured-streaming/master/azuredeploy.json.

    Этот шаблон создает следующие ресурсы:

    • Кластер Kafka в HDInsight 4.0 или 5.0.

    • Кластер SPARK 2.4 или 3.1 в HDInsight 4.0 или 5.0.

    • виртуальную сеть Azure, содержащую кластеры HDInsight.

      Внимание

      Структурированная записная книжка потоковой передачи, используемая в этом руководстве, требует Spark 2.4 или 3.1 в HDInsight 4.0 или 5.0. Если используется более ранняя версия Spark в HDInsight, возникнут ошибки при использовании этой записной книжки.

  2. Заполните раздел Настроенный шаблон, используя следующие сведения:

    Параметр Значение
    Отток подписок Ваша подписка Azure
    Группа ресурсов Группа ресурсов, в которой содержатся ресурсы.
    Расположение Регион Azure, в котором создаются ресурсы.
    Spark Cluster Name (Имя кластера Spark) Имя кластера Spark. Первые шесть символов должны отличаться от имени кластера Kafka.
    Kafka Cluster Name (Имя кластера Kafka) Имя кластера Kafka. Первые шесть символов должны отличаться от имени кластера Spark.
    Имя пользователя для входа в кластер Имя администратора кластеров.
    Пароль для входа в кластер Пароль администратора кластеров.
    Имя пользователя SSH Пользователь SSH, который создается для кластеров.
    Пароль SSH Пароль пользователя SSH.

    Screenshot of the customized template.

  3. Прочтите условия использования и установите флажок Я принимаю указанные выше условия.

  4. Щелкните Приобрести.

Примечание.

Создание кластеров может занять до 20 минут.

Использование структурированной потоковой передачи Spark

В этом примере показано, как использовать структурированную потоковую передачу Spark с Kafka в HDInsight. Для этого примера используются данные о поездках в такси, предоставленные сайтом города Нью-Йорк. Набор данных, используемых в этой записной книжке, взят на странице 2016 Green Taxi Trip Data.

  1. Выполните сбор информации об узле. Воспользуйтесь приведенными ниже командами curl и jq, чтобы получить сведения об узлах Kafka ZooKeeper и брокера. Эти команды предназначены для командной строки Windows. Команды для других сред будут немного отличаться. Замените KafkaCluster именем кластера Kafka, а KafkaPassword — паролем для входа в кластер. Также замените C:\HDI\jq-win64.exe фактическим путем к папке установки jq. Введите команды в командной строке Windows и сохраните выходные данные для использования в следующих шагах.

    REM Enter cluster name in lowercase
    
    set CLUSTERNAME=KafkaCluster
    set PASSWORD=KafkaPassword
    
    curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):2181"""] | join(""",""")"
    
    curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/KAFKA/components/KAFKA_BROKER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):9092"""] | join(""",""")"
    
  2. В веб-браузере перейдите на страницу https://CLUSTERNAME.azurehdinsight.net/jupyter, где CLUSTERNAME — это имя вашего кластера. При появлении запроса введите имя администратора кластера и пароль, которые использовались при создании кластера.

  3. Выберите Создать > Spark, чтобы создать записную книжку.

  4. При потоковой передаче данных Spark использует микропакеты, то есть данные передаются в пакетах, после чего такие пакеты обрабатываются исполнителями. Если время ожидания простоя исполнителя меньше времени, затрачиваемого на обработку пакета, исполнители будут постоянно добавляться и удаляться. Если время ожидания простоя исполнителя превышает длительность передачи пакета, такой исполнитель не будет удаляться. Поэтому мы рекомендуем отключить динамическое распределение, задав для параметра spark.dynamicAllocation.enabled значение false, при запуске приложений с потоковой передачей данных.

    Загрузите пакеты, используемые Notebook, указав следующие сведения в ячейке Notebook. Выполните команду, нажав клавиши CTRL+ВВОД.

    %%configure -f
    {
        "conf": {
            "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
            "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11",
            "spark.dynamicAllocation.enabled": false
        }
    }
    
  5. Создайте раздел Kafka. Измените указанную ниже команду, заменив YOUR_ZOOKEEPER_HOSTS сведениями об узле Zookeeper, которые вы извлекли на первом шаге. Введите измененную команду в Jupyter Notebook, чтобы создать раздел tripdata.

    %%bash
    export KafkaZookeepers="YOUR_ZOOKEEPER_HOSTS"
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic tripdata --zookeeper $KafkaZookeepers
    
  6. Получите данные о поездках в такси. Введите команду в следующей ячейке, чтобы загрузить данные о поездках в такси в Нью-Йорке. Данные загружаются в таблицу данных, которая потом отображается в виде выходных данных ячейки.

    import spark.implicits._
    
    // Load the data from the New York City Taxi data REST API for 2016 Green Taxi Trip Data
    val url="https://data.cityofnewyork.us/resource/pqfs-mqru.json"
    val result = scala.io.Source.fromURL(url).mkString
    
    // Create a dataframe from the JSON data
    val taxiDF = spark.read.json(Seq(result).toDS)
    
    // Display the dataframe containing trip data
    taxiDF.show()
    
  7. Укажите сведения об узлах брокера Kafka. Замените YOUR_KAFKA_BROKER_HOSTS информацией об узлах брокера, которую вы извлекли на первом шаге. Введите измененную команду в следующей ячейке Jupyter Notebook.

    // The Kafka broker hosts and topic used to write to Kafka
    val kafkaBrokers="YOUR_KAFKA_BROKER_HOSTS"
    val kafkaTopic="tripdata"
    
    println("Finished setting Kafka broker and topic configuration.")
    
  8. Отправьте данные в Kafka. В приведенной ниже команде поле vendorid используется в качестве значения ключа для сообщения Kafka. Kafka использует ключ при секционировании данных. Все поля хранятся в сообщении Kafka в виде строковых значений JSON. Введите следующую команду в Jupyter, чтобы сохранить данные в Kafka с помощью пакетного запроса.

    // Select the vendorid as the key and save the JSON string as the value.
    val query = taxiDF.selectExpr("CAST(vendorid AS STRING) as key", "to_JSON(struct(*)) AS value").write.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("topic", kafkaTopic).save()
    
    println("Data sent to Kafka")
    
  9. Объявите схему. В приведенной ниже команде показано, как использовать схему при считывании данных JSON из Kafka. Введите команду в следующую ячейку Jupyter.

    // Import bits useed for declaring schemas and working with JSON data
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    
    // Define a schema for the data
    val schema = (new StructType).add("dropoff_latitude", StringType).add("dropoff_longitude", StringType).add("extra", StringType).add("fare_amount", StringType).add("improvement_surcharge", StringType).add("lpep_dropoff_datetime", StringType).add("lpep_pickup_datetime", StringType).add("mta_tax", StringType).add("passenger_count", StringType).add("payment_type", StringType).add("pickup_latitude", StringType).add("pickup_longitude", StringType).add("ratecodeid", StringType).add("store_and_fwd_flag", StringType).add("tip_amount", StringType).add("tolls_amount", StringType).add("total_amount", StringType).add("trip_distance", StringType).add("trip_type", StringType).add("vendorid", StringType)
    // Reproduced here for readability
    //val schema = (new StructType)
    //   .add("dropoff_latitude", StringType)
    //   .add("dropoff_longitude", StringType)
    //   .add("extra", StringType)
    //   .add("fare_amount", StringType)
    //   .add("improvement_surcharge", StringType)
    //   .add("lpep_dropoff_datetime", StringType)
    //   .add("lpep_pickup_datetime", StringType)
    //   .add("mta_tax", StringType)
    //   .add("passenger_count", StringType)
    //   .add("payment_type", StringType)
    //   .add("pickup_latitude", StringType)
    //   .add("pickup_longitude", StringType)
    //   .add("ratecodeid", StringType)
    //   .add("store_and_fwd_flag", StringType)
    //   .add("tip_amount", StringType)
    //   .add("tolls_amount", StringType)
    //   .add("total_amount", StringType)
    //   .add("trip_distance", StringType)
    //   .add("trip_type", StringType)
    //   .add("vendorid", StringType)
    
    println("Schema declared")
    
  10. Выберите данные и начните потоковую передачу. Следующая команда извлекает данные из Kafka с помощью пакетного запроса. Затем запишите результаты в HDFS в кластере Spark. В этом примере select получает сообщение (значение поля) из Kafka и применяет схему к нему. Затем данные записываются в HDFS (WASB или ADL) в формате Parquet. Введите команду в следующую ячейку Jupyter.

    // Read a batch from Kafka
    val kafkaDF = spark.read.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
    
    // Select data and write to file
    val query = kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip").write.format("parquet").option("path","/example/batchtripdata").option("checkpointLocation", "/batchcheckpoint").save()
    
    println("Wrote data to file")
    
  11. Чтобы проверить, созданы ли файлы, введите команду в следующую ячейку Jupyter. Отобразится список файлов из каталога /example/batchtripdata.

    %%bash
    hdfs dfs -ls /example/batchtripdata
    
  12. В предыдущем примере использовался пакетный запрос. В следующей команде показано, как сделать то же самое, используя запрос потоковой передачи. Введите команду в следующую ячейку Jupyter.

    // Stream from Kafka
    val kafkaStreamDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
    
    // Select data from the stream and write to file
    kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip").writeStream.format("parquet").option("path","/example/streamingtripdata").option("checkpointLocation", "/streamcheckpoint").start.awaitTermination(30000)
    println("Wrote data to file")
    
  13. Выполните следующую ячейку, чтобы проверить, записаны ли файлы в ответ на запрос потоковой передачи.

    %%bash
    hdfs dfs -ls /example/streamingtripdata
    

Очистка ресурсов

Чтобы очистить ресурсы, созданные при работе с этим руководством, удалите группу ресурсов. При удалении группы ресурсов также удаляются связанные с ней кластер HDInsight и другие ресурсы.

Чтобы удалить группу ресурсов с помощью портала Azure, сделайте следующее:

  1. На портале Azure разверните меню слева, чтобы открыть меню служб, а затем выберите Группы ресурсов, чтобы просмотреть список групп ресурсов.
  2. Найдите группу ресурсов, которую нужно удалить, и щелкните правой кнопкой мыши кнопку Дополнительно (…) справа от списка.
  3. Выберите Удалить группу ресурсов и подтвердите выбор.

Предупреждение

Начисление оплаты начинается после создания кластера HDInsight и прекращается только после его удаления. Кластеры оплачиваются поминутно, поэтому всегда следует удалять кластер, когда он больше не нужен.

При удалении кластера Kafka в HDInsight удаляются все данные, хранящиеся в Kafka.