Apache Spark 애플리케이션을 Azure Event Hubs에 연결

이 자습서에서는 Spark 애플리케이션을 Event Hubs에 연결하여 실시간으로 스트리밍하는 방법을 안내합니다. 이렇게 통합하면 프로토콜 클라이언트를 변경하거나 사용자 고유의 Kafka 또는 Zookeeper 클러스터를 실행할 필요 없이 스트리밍이 가능합니다. 이 자습서에서는 Apache Spark v2.4 이상과 Apache Kafka v2.0 이상이 필요합니다.

참고 항목

이 샘플은 GitHub에서 사용할 수 있습니다.

이 자습서에서는 다음을 하는 방법을 알아볼 수 있습니다.

  • Event Hubs 네임스페이스 만들기
  • 프로젝트 예제 복제
  • Spark 실행
  • Kafka용 Event Hubs에서 읽기
  • Kafka용 Event Hubs에 쓰기

필수 조건

이 자습서를 시작하기 전에 먼저 다음 사항을 확인해야 합니다.

참고 항목

Spark-Kafka 어댑터는 Spark v2.4부터 Kafka v2.0을 지원하도록 업데이트되었습니다. 이전 Spark 릴리스에서는 어댑터가 Kafka v0.10 이상을 지원했지만 Kafka v0.10 API에 의존했습니다. Kafka용 Event Hubs에서 Kafka v0.10을 지원하지 않기 때문에 Spark v2.4 미만 버전의 Spark-Kafka 어댑터는 Kafka 에코시스템용 Event Hubs에서 지원되지 않습니다.

Event Hubs 네임스페이스 만들기

Event Hubs 서비스와 통신하려면 Event Hubs 네임스페이스가 필요합니다. 네임스페이스 및 이벤트 허브를 만드는 방법에 대한 지침은 이벤트 허브 만들기를 참조하세요. 나중에 사용할 수 있도록 Event Hubs 연결 문자열 및 FQDN(정규화된 도메인 이름)을 가져옵니다. 자세한 지침은 Event Hubs 연결 문자열 가져오기를 참조하세요.

프로젝트 예제 복제

Azure Event Hubs 리포지토리를 복제하고 tutorials/spark 하위 폴더로 이동합니다.

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Kafka용 Event Hubs에서 읽기

구성을 약간 변경하면 Kafka용 Event Hubs에서 데이터 읽기를 시작할 수 있습니다. BOOTSTRAP_SERVERSEH_SASL을 네임스페이스의 세부 정보로 업데이트하면 Kafka와 같은 방법으로 Event Hubs를 사용하여 스트리밍을 시작할 수 있습니다. 전체 샘플 코드는 GitHub의 sparkConsumer.scala 파일을 참조하세요.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

다음 오류와 유사한 오류가 발생하면 .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true")spark.readStream 호출에 추가하고 다시 시도합니다.

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

Kafka용 Event Hubs에 쓰기

또한 Kafka에 쓰는 것과 동일한 방법으로 Event Hubs에 쓸 수도 있습니다. 잊지 말고 BOOTSTRAP_SERVERSEH_SASL을 Event Hubs 네임스페이스의 정보로 업데이트해야 합니다. 전체 샘플 코드는 GitHub의 sparkProducer.scala 파일을 참조하세요.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

다음 단계

Event Hubs 및 Kafka용 Event Hubs에 대해 자세한 내용은 다음 문서를 참조하세요.