Öğretici: Stream analytics kullanarak Event Hubs için Apache Kafka olaylarını işleme

Bu makalede Verileri Event Hubs'a aktarma ve Azure Stream Analytics ile işleme adımları gösterilmektedir. Aşağıdaki adımlarda size yol gösterilir:

  1. Bir Event Hubs ad alanı oluşturun.
  2. Olay hub'ına ileti gönderen bir Kafka istemcisi oluşturun.
  3. Olay hub'ından Azure blob depolama alanına veri kopyalayan bir Stream Analytics işi oluşturun.

Bir olay hub'ı tarafından kullanıma sunulan Kafka uç noktasını kullanırken protokol istemcilerinizi değiştirmeniz veya kendi kümelerinizi çalıştırmanız gerekmez. Azure Event Hubs Apache Kafka sürüm 1.0 ve üzerini destekler.

Önkoşullar

Bu hızlı başlangıcı tamamlamak için aşağıdaki önkoşulların karşılandığından emin olun:

Event Hubs ad alanı oluşturma

Bir Event Hubs ad alanı oluşturduğunuzda, ad alanının Kafka uç noktası otomatik olarak etkinleştirilir. Kafka protokollerini kullanan uygulamalarınızdaki olayları olay hub'larına akışla aktarabilirsiniz. Event Hubs ad alanı oluşturmak için Azure portal kullanarak olay hub'ı oluşturma başlığındaki adım adım yönergeleri izleyin. Ayrılmış küme kullanıyorsanız bkz. Ayrılmış kümede ad alanı ve olay hub'ı oluşturma.

Not

Kafka için Event Hubs temel katmanda desteklenmez.

Event Hubs'ta Kafka ile ileti gönderme

  1. Kafka deposu için Azure Event Hubs makinenize kopyalayın.

  2. Klasörüne gidin: azure-event-hubs-for-kafka/quickstart/java/producer.

  3. içindeki src/main/resources/producer.configüreticinin yapılandırma ayrıntılarını güncelleştirin. Olay hub'ı ad alanı için adı ve bağlantı dizesini belirtin.

    bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
    
  4. adresine azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/gidin ve TestDataReporter.java dosyasını istediğiniz bir düzenleyicide açın.

  5. Aşağıdaki kod satırını açıklama satırı yapın:

                //final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
    
  6. Açıklamalı kodun yerine aşağıdaki kod satırını ekleyin:

                final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");            
    

    Bu kod olay verilerini JSON biçiminde gönderir. Stream Analytics işi için girişi yapılandırırken, giriş verilerinin biçimi olarak JSON'u belirtirsiniz.

  7. Üreticiyi çalıştırın ve Event Hubs'a akışla aktarın. Windows makinesinde ,Node.js komut istemi kullanırken bu komutları çalıştırmadan önce klasörüne azure-event-hubs-for-kafka/quickstart/java/producer geçin.

    mvn clean package
    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    

Olay hub'sının verileri aldığını doğrulayın

  1. VARLıKLAR'ın altında Event Hubs'ı seçin. test adlı bir olay hub'ı gördüğünüzden emin olun.

    Olay hub'ı - test

  2. Olay hub'ına gelen iletileri gördüğünüzden emin olun.

    Olay hub'ı - iletiler

Stream Analytics işi kullanarak olay verilerini işleme

Bu bölümde bir Azure Stream Analytics işi oluşturacaksınız. Kafka istemcisi olayları olay hub'ına gönderir. Olay verilerini giriş olarak alan ve azure blob depolama alanına aktaran bir Stream Analytics işi oluşturursunuz. Azure Depolama hesabınız yoksa bir hesap oluşturun.

Stream Analytics işindeki sorgu, herhangi bir analiz gerçekleştirmeden verilerden geçer. Çıkış verilerini farklı bir biçimde veya elde edilen içgörülerle üretmek için giriş verilerini dönüştüren bir sorgu oluşturabilirsiniz.

Akış Analizi işi oluşturma

  1. Azure portal+ Kaynak oluştur'u seçin.
  2. Azure Market menüsünde Analiz'i ve ardından Stream Analytics işi'ni seçin.
  3. Yeni Stream Analytics sayfasında aşağıdaki eylemleri gerçekleştirin:
    1. İş için bir ad girin.

    2. Aboneliğinizi seçin.

    3. Kaynak grubu için Yeni oluştur'u seçin ve adı girin. Var olan bir kaynak grubunu da kullanabilirsiniz.

    4. İş için bir konum seçin.

    5. İşi oluşturmak için Oluştur'u seçin.

      Yeni Stream Analytics işi

İş girişi yapılandırma

  1. Stream Analytics iş sayfasını görmek için bildirim iletisinde Kaynağa git'i seçin.

  2. Soldaki menünün İş TOPOLOJİSİ bölümünde Girişler'i seçin.

  3. Akış girişi ekle'yi ve ardından Olay Hub'ı'na tıklayın.

    Giriş olarak olay hub'ı ekleme

  4. Olay Hub'ı giriş yapılandırması sayfasında aşağıdaki eylemleri gerçekleştirin:

    1. Giriş için bir diğer ad belirtin.

    2. Azure aboneliğinizi seçin.

    3. Daha önce oluşturduğunuz olay hub'ı ad alanını seçin.

    4. Olay hub'ı için test et'i seçin.

    5. Kaydet’i seçin.

      Olay hub'ı giriş yapılandırması

İş çıkışını yapılandırma

  1. Menünün İş TOPOLOJİSİ bölümünde Çıkışlar'ı seçin.
  2. Araç çubuğunda + Ekle'yi ve ardından Blob depolama'yı seçin
  3. Blob depolama çıkış ayarları sayfasında aşağıdaki eylemleri gerçekleştirin:
    1. Çıkış için bir diğer ad belirtin.

    2. Azure aboneliğinizi seçin.

    3. Azure Depolama hesabınızı seçin.

    4. Stream Analytics sorgusundaki çıktı verilerini depolayan kapsayıcı için bir ad girin.

    5. Kaydet’i seçin.

      Blob Depolama çıkış yapılandırması

Sorgu tanımlama

Gelen bir veri akışını okumak için bir Stream Analytics işi ayarladıktan sonraki adım, verileri gerçek zamanlı olarak analiz eden bir dönüştürme oluşturmaktır. Dönüştürme sorgusunu Stream Analytics sorgu dilini kullanarak tanımlarsınız. Bu kılavuzda, herhangi bir dönüştürme gerçekleştirmeden verilerden geçen bir sorgu tanımlarsınız.

  1. Sorgu'yu seçin.

  2. Sorgu penceresinde değerini daha önce oluşturduğunuz çıkış diğer adıyla değiştirin [YourOutputAlias] .

  3. değerini daha önce oluşturduğunuz giriş diğer adıyla değiştirin [YourInputAlias] .

  4. Araç çubuğunda Kaydet’i seçin.

    Ekran yakalama, giriş ve çıkış değişkenleri değerlerini içeren sorgu penceresini gösterir.

Stream Analytics işini çalıştırma

  1. Soldaki menüden Genel Bakış'ı seçin.

  2. Başlat'ı seçin.

    Başlat menüsü

  3. İşi başlat sayfasında Başlat'ı seçin.

    başlangıç işi sayfası

  4. İşin durumu Başlangıç olandan çalışmaya değişene kadar bekleyin.

    İş durumu - çalışıyor

Senaryoyu test etme

  1. Olay hub'ına olay göndermek için Kafka üreticisini yeniden çalıştırın.

    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    
  2. Çıkış verilerininAzure blob depolama alanında oluşturulduğunu gördüğünüzden emin olmanız gerekir. Kapsayıcıda aşağıdaki örnek satırlara benzeyen 100 satırı olan bir JSON dosyası görürsünüz:

    {"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    

    Azure Stream Analytics işi, olay hub'ından giriş verilerini aldı ve bu senaryoda Azure blob depolama alanında depoladı.

Sonraki adımlar

Bu makalede, protokol istemcilerinizi değiştirmeden veya kendi kümelerinizi çalıştırmadan Event Hubs'a akış yapmayı öğrendiniz. Apache Kafka için Event Hubs hakkında daha fazla bilgi edinmek için bkz. Azure Event Hubs için Apache Kafka geliştirici kılavuzu.