Share via


Event Hubs .NET SDK'larını (AMQP) kullanarak olay akışı yaparken Avro şeması kullanarak doğrulama

Bu hızlı başlangıçta, Azure.Messaging.EventHubs .NET kitaplığını kullanarak şema doğrulaması ile olay hub'ına olay göndermeyi ve olay hub'ından olay almayı öğreneceksiniz.

Not

Azure Schema Registry , olay odaklı ve mesajlaşma odaklı uygulamalar için şemalar için merkezi bir depo sağlayan Event Hubs'ın bir özelliğidir. Üretici ve tüketici uygulamalarınızın şemayı yönetmek ve paylaşmak zorunda kalmadan veri alışverişi yapma esnekliği sağlar. Ayrıca, yeniden kullanılabilir şemalar için basit bir idare çerçevesi sağlar ve bir gruplandırma yapısı (şema grupları) aracılığıyla şemalar arasındaki ilişkiyi tanımlar. Daha fazla bilgi için bkz. Event Hubs'ta Azure Schema Registry.

Önkoşullar

Azure Event Hubs yeniyseniz, bu hızlı başlangıcı yapmadan önce bkz. Event Hubs'a genel bakış.

Bu hızlı başlangıcı tamamlamak için aşağıdaki önkoşullara ihtiyacınız vardır:

  • Azure aboneliğiniz yoksa başlamadan önce ücretsiz bir hesap oluşturun.
  • Microsoft Visual Studio 2022. Azure Event Hubs istemci kitaplığı, C# 8.0'da tanıtılan yeni özellikleri kullanır. Kitaplığı önceki C# dil sürümleriyle kullanmaya devam edebilirsiniz, ancak yeni söz dizimi kullanılamaz. Tam söz dizimini kullanmak için .NET Core SDK 3.0 veya üzeri ile derlemenizi ve dil sürümünün olarak ayarlanmasını latestöneririz. Visual Studio kullanıyorsanız, Visual Studio 2019'un önceki sürümleri C# 8.0 projeleri oluşturmak için gereken araçlarla uyumlu değildir. Ücretsiz Community sürümü de dahil olmak üzere Visual Studio 2019 buradan indirilebilir.

Olay hub’ı oluşturma

Event Hubs ad alanı ve olay hub'ı oluşturmak için hızlı başlangıçtaki yönergeleri izleyin: Event Hubs ad alanı ve olay hub'ı oluşturma. Ardından, Event Hubs ad alanınıza bağlantı dizesi almak için Bağlantı dizesini alma başlığındaki yönergeleri izleyin.

Geçerli hızlı başlangıçta kullanacağınız aşağıdaki ayarları not edin:

  • Event Hubs ad alanı için bağlantı dizesi
  • Olay hub'ının adı

Şema oluşturma

Şema grubu ve şema oluşturmak için Şema Kayıt Defteri'ni kullanarak şema oluşturma başlığındaki yönergeleri izleyin.

  1. Şema Kayıt Defteri portalını kullanarak contoso-sg adlı bir şema grubu oluşturun. Uyumluluk modu için serileştirme türü olarak Avro ve Yok kullanın.

  2. Bu şema grubunda, aşağıdaki şema içeriğini kullanarak şema adıyla Microsoft.Azure.Data.SchemaRegistry.example.Order yeni bir Avro şeması oluşturun.

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

Şema Kayıt Defteri Okuyucusu rolüne kullanıcı ekleme

Kullanıcı hesabınızı ad alanı düzeyinde Şema Kayıt Defteri Okuyucusu rolüne ekleyin. Şema Kayıt Defteri Katkıda Bulunanı rolünü de kullanabilirsiniz, ancak bu hızlı başlangıç için gerekli değildir.

  1. Event Hubs Ad Alanı sayfasında, soldaki menüden Erişim denetimi (IAM) öğesini seçin.
  2. Erişim denetimi (IAM) sayfasında menüde + Ekle ->Rol ataması ekle'yi seçin.
  3. Atama türü sayfasında İleri'yi seçin.
  4. Roller sayfasında Şema Kayıt Defteri Okuyucusu (Önizleme) öğesini ve ardından sayfanın alt kısmındaki İleri'yi seçin.
  5. + Üye seç bağlantısını kullanarak kullanıcı hesabınızı role ekleyin ve ardından İleri'yi seçin.
  6. Gözden geçir ve ata sayfasında Gözden geçir ve ata'yı seçin.

Şema doğrulaması ile olay hub'larına olay oluşturma

Olay üreticisi için konsol uygulaması oluşturma

  1. Visual Studio 2019’u başlatın.
  2. Yeni proje oluştur'u seçin.
  3. Yeni proje oluştur iletişim kutusunda aşağıdaki adımları uygulayın: Bu iletişim kutusunu görmüyorsanız, menüden Dosya'yı seçin, Yeni'yi ve ardından Proje'yi seçin.
    1. Programlama dili için C# öğesini seçin.

    2. Uygulamanın türü için Konsol'a tıklayın.

    3. Sonuçlar listesinden Konsol Uygulaması'nı seçin.

    4. Ardından İleri'yi seçin.

      Yeni Proje iletişim kutusunu gösteren resim.

  4. Proje adı olarak OrderProducer , çözüm adı olarak SRQuickStart yazın ve projeyi oluşturmak için Tamam'ı seçin.

Event Hubs NuGet paketini ekleme

  1. Menüden Araçlar>NuGet Paket Yöneticisi Paket Yöneticisi>Konsolu'nu seçin.

  2. Azure.Messaging.EventHubs ve diğer NuGet paketlerini yüklemek için aşağıdaki komutları çalıştırın. Son komutu çalıştırmak için ENTER tuşuna basın.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  3. Burada gösterildiği gibi Visual Studio aracılığıyla Azure'a bağlanmak için üretici uygulamalarının kimliğini doğrulayın.

  4. Ad alanı düzeyinde rolün üyesi Schema Registry Reader olan kullanıcı hesabını kullanarak Azure'da oturum açın. Şema kayıt defteri rolleri hakkında bilgi için bkz. Event Hubs'ta Azure Schema Registry.

Avro şemasını kullanarak kod oluşturma

  1. adlı Order.avscbir dosya oluşturmak için şemayı oluşturmak için kullandığınız içeriği kullanın. Dosyayı proje veya çözüm klasörüne kaydedin.
  2. Ardından bu şema dosyasını kullanarak .NET için kod oluşturabilirsiniz. Kod oluşturma için avrogen gibi herhangi bir dış kod oluşturma aracını kullanabilirsiniz. Örneğin kod oluşturmak için komutunu çalıştırabilirsiniz avrogen -s .\Order.avsc . .
  3. Kod oluşturduktan sonra klasöründe adlı Order.cs\Microsoft\Azure\Data\SchemaRegistry\example dosyayı görürsünüz. Yukarıdaki Avro şeması için ad alanında Microsoft.Azure.Data.SchemaRegistry.example C# türlerini oluşturur.
  4. Order.cs Dosyayı OrderProducer projeye ekleyin.

Olayları seri hale getirmek ve olay hub'ına göndermek için kod yazma

  1. Program.cs dosyasına aşağıdaki kodu ekleyin. Ayrıntılar için kod açıklamalarına bakın. Koddaki üst düzey adımlar şunlardır:

    1. Olay hub'ına olay göndermek için kullanabileceğiniz bir üretici istemcisi oluşturun.
    2. Bir nesnedeki verileri seri hale getirmek ve doğrulamak için kullanabileceğiniz bir Order şema kayıt defteri istemcisi oluşturun.
    3. Oluşturulan Order türü kullanarak yeni Order bir nesne oluşturun.
    4. nesnesini olarak serileştirmek için şema kayıt defteri istemcisini OrderEventDatakullanın.
    5. Bir toplu olay oluşturun.
    6. Olay verilerini olay toplu işlemine ekleyin.
    7. Olay toplu işlemini olay hub'ına göndermek için üretici istemcisini kullanın.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    EventHubProducerClient producerClient;
    
    // Create a producer client that you can use to send events to an event hub
    producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
    // Create a new order object using the generated type/class 'Order'. 
    var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
    EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
    
    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
    // Add the event data to the event batch. 
    eventBatch.TryAdd(eventData);
    
    // Send the batch of events to the event hub. 
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("A batch of 1 order has been published.");        
    
  2. Aşağıdaki yer tutucu değerleri gerçek değerlerle değiştirin.

    • EVENTHUBSNAMESPACECONNECTIONSTRING - Event Hubs ad alanı için bağlantı dizesi
    • EVENTHUBNAME - olay hub'ının adı
    • EVENTHUBSNAMESPACENAME - Event Hubs ad alanının adı
    • SCHEMAGROUPNAME - şema grubunun adı
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
  3. Projeyi derleyin ve hata olmadığından emin olun.

  4. Programı çalıştırın ve onay iletisini bekleyin.

    A batch of 1 order has been published.
    
  5. Azure portal olay hub'ında olayları aldığını doğrulayabilirsiniz. Ölçümler bölümünde İletiler görünümüne geçin. Grafiği güncelleştirmek için sayfayı yenileyin. İletilerin alındığını göstermesi birkaç saniye sürebilir.

    Olay hub'ının olayları aldığını doğrulamak için Azure portal sayfasının görüntüsü.

Şema doğrulaması ile olay hub'larından olayları kullanma

Bu bölümde, olay hub'ından olay alan ve olay verilerini seri durumdan kaldırmak için şema kayıt defterini kullanan bir .NET Core konsol uygulamasının nasıl yazıldığını gösterir.

Ek önkoşullar

  • Olay işlemcisini kullanmak için depolama hesabını oluşturun.

Tüketici uygulaması oluşturma

  1. Çözüm Gezgini penceresinde SRQuickStart çözümüne sağ tıklayın, Ekle'nin üzerine gelin ve Yeni Proje'yi seçin.
  2. Konsol uygulaması'nın ardından İleri'yi seçin.
  3. Proje adı olarak OrderConsumer girin ve Oluştur'u seçin.
  4. Çözüm Gezgini penceresinde OrderConsumer'a sağ tıklayın ve Başlangıç Projesi Olarak Ayarla'yı seçin.

Event Hubs NuGet paketini ekleme

  1. Menüden Araçlar>NuGet Paket Yöneticisi Paket Yöneticisi>Konsolu'nu seçin.

  2. Paket Yöneticisi Konsolu penceresinde, Varsayılan proje için OrderConsumer'ın seçildiğini onaylayın. Aksi takdirde, OrderConsumer'ı seçmek için açılan listeyi kullanın.

  3. Gerekli NuGet paketlerini yüklemek için aşağıdaki komutu çalıştırın. Son komutu çalıştırmak için ENTER tuşuna basın.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  4. Burada gösterildiği gibi Visual Studio aracılığıyla Azure'a bağlanmak için üretici uygulamalarının kimliğini doğrulayın.

  5. Ad alanı düzeyinde rolün üyesi Schema Registry Reader olan kullanıcı hesabını kullanarak Azure'da oturum açın. Şema kayıt defteri rolleri hakkında bilgi için bkz. Event Hubs'ta Azure Schema Registry.

  6. Order.cs Üretici uygulamasını oluşturmanın bir parçası olarak oluşturduğunuz dosyayı OrderConsumer projesine ekleyin.

  7. OrderConsumer projesine sağ tıklayın ve Başlangıç projesi olarak ayarla'yı seçin.

Schema Registry kullanarak olayları almak ve seri durumdan çıkarabilmek için kod yazma

  1. Program.cs dosyasına aşağıdaki kodu ekleyin. Ayrıntılar için kod açıklamalarına bakın. Koddaki üst düzey adımlar şunlardır:

    1. Olay hub'ına olay göndermek için kullanabileceğiniz bir tüketici istemcisi oluşturun.
    2. Azure blob depolama alanındaki blob kapsayıcısı için bir blob kapsayıcısı istemcisi oluşturun.
    3. Bir olay işlemcisi istemcisi oluşturun ve olay ve hata işleyicilerini kaydedin.
    4. Olay işleyicisinde, olay verilerini bir nesnede seri durumdan çıkarmak için kullanabileceğiniz bir Order şema kayıt defteri istemcisi oluşturun.
    5. Seri hale getiriciyi kullanarak olay verilerini bir Order nesnede seri durumdan çıkarma.
    6. Alınan sipariş hakkındaki bilgileri yazdırın.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // connection string for the Azure Storage account
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // name of the blob container that will be userd as a checkpoint store
    const string blobContainerName = "BLOBCONTAINERNAME";
    
    // Create a blob container client that the event processor will use 
    BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
    // Create an event processor client to process events in the event hub
    EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
    
    // Register handlers for processing events and handling errors
    processor.ProcessEventAsync += ProcessEventHandler;
    processor.ProcessErrorAsync += ProcessErrorHandler;
    
    // Start the processing
    await processor.StartProcessingAsync();
    
    // Wait for 30 seconds for the events to be processed
    await Task.Delay(TimeSpan.FromSeconds(30));
    
    // Stop the processing
    await processor.StopProcessingAsync();
    
    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Create a schema registry client that you can use to serialize and validate data.  
        var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
        // Create an Avro object serializer using the Schema Registry client object. 
        var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
        // Deserialized data in the received event using the schema 
        Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    
        // Print the received event
        Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
    
           await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }      
    
  2. Aşağıdaki yer tutucu değerleri gerçek değerlerle değiştirin.

    • EVENTHUBSNAMESPACE-CONNECTIONSTRING - Event Hubs ad alanı için bağlantı dizesi
    • EVENTHUBNAME - olay hub'ının adı
    • EVENTHUBSNAMESPACENAME - Event Hubs ad alanının adı
    • SCHEMAGROUPNAME - şema grubunun adı
    • AZURESTORAGECONNECTIONSTRING - Azure depolama hesabı için bağlantı dizesi
    • BLOBCONTAINERNAME - Blob kapsayıcısının adı
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // Azure storage connection string
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // Azure blob container name
    const string blobContainerName = "BLOBCONTAINERNAME";
    
  3. Projeyi derleyin ve hata olmadığından emin olun.

  4. Alıcı uygulamasını çalıştırın.

  5. Olayların alındığını belirten bir ileti görmeniz gerekir.

    Received order with ID: 1234, amount: 45.29, description: First sample order.
    

    Bu olaylar, gönderen programını çalıştırarak daha önce olay hub'ına gönderdiğiniz üç olaydır.

Örnekler

GitHub depomuzdaki Benioku makalesine bakın.

Kaynakları temizleme

Event Hubs ad alanını silin veya ad alanını içeren kaynak grubunu silin.

Sonraki adımlar