Java gebruiken om gebeurtenissen te verzenden of gebeurtenissen te ontvangen uit Azure Event Hubs

In deze quickstart ziet u hoe u gebeurtenissen kunt verzenden naar en ontvangen van een Event Hub met behulp van het azure-messaging-eventHubs Java-pakket.

Tip

Als u met Azure Event Hubs-resources in een Spring-toepassing werkt, raden we u aan Spring Cloud Azure als alternatief te beschouwen. Spring Cloud Azure is een opensource-project dat naadloze Spring-integratie met Azure-services biedt. Zie Spring Cloud Stream met Azure Event Hubs voor meer informatie over Spring Cloud Azure en voor een voorbeeld met Event Hubs.

Vereisten

Als u nog geen ervaring hebt met Azure Event Hubs, raadpleegt u het Event Hubs-overzicht voordat u deze quickstart uitvoert.

Voor het voltooien van deze snelstart moet aan de volgende vereisten worden voldaan:

  • Microsoft Azure-abonnement. Als u Azure-services wilt gebruiken, met inbegrip van Azure Event Hubs, hebt u een abonnement nodig. Als u nog geen Azure-account hebt, kunt u zich aanmelden voor een gratis proefversie of uw voordelen als MSDN-abonnee gebruiken wanneer u een account maakt.
  • Een Java-ontwikkelomgeving. In deze quickstart wordt gebruikgemaakt van Eclipse. Java Development Kit (JDK) met versie 8 of hoger is vereist.
  • Een Event Hubs-naamruimte en een Event Hub maken. In de eerste stap gebruikt u Azure Portal om een naamruimte van het type Event Hubs te maken en de beheerreferenties te verkrijgen die de toepassing nodig heeft om met de Event Hub te communiceren. Volg de procedure in dit artikel om een naamruimte en een Event Hub te maken. Haal vervolgens de verbindingsreeks voor de Event Hubs-naamruimte op door de instructies in het artikel te volgen: Get verbindingsreeks. U gebruikt de verbindingsreeks later in deze quickstart.

Gebeurtenissen verzenden

In deze sectie wordt beschreven hoe u een Java-toepassing maakt voor het verzenden van gebeurtenissen naar een Event Hub.

Verwijzing naar Azure Event Hubs-bibliotheek toevoegen

Maak eerst een nieuw Maven-project voor een console/shell-toepassing in uw favoriete Java-ontwikkelomgeving. Werk het pom.xml bestand als volgt bij. De Java-clientbibliotheek voor Event Hubs is beschikbaar in de Centrale opslagplaats voor Maven.

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.18.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.11.2</version>
		    <scope>compile</scope>
		</dependency>

Notitie

Werk de versie bij naar de nieuwste versie die is gepubliceerd naar de Maven-opslagplaats.

De app verifiëren bij Azure

In deze quickstart ziet u twee manieren om verbinding te maken met Azure Event Hubs: zonder wachtwoord en verbindingsreeks. De eerste optie laat zien hoe u uw beveiligingsprincipaal gebruikt in Microsoft Entra ID en op rollen gebaseerd toegangsbeheer (RBAC) om verbinding te maken met een Event Hubs-naamruimte. U hoeft zich geen zorgen te maken over in code vastgelegde verbindingsreeks s in uw code of in een configuratiebestand of in een beveiligde opslag, zoals Azure Key Vault. De tweede optie laat zien hoe u een verbindingsreeks gebruikt om verbinding te maken met een Event Hubs-naamruimte. Als u nog geen gebruik hebt gemaakt van Azure, kunt u de verbindingsreeks optie gemakkelijker volgen. We raden u aan de optie zonder wachtwoord te gebruiken in echte toepassingen en productieomgevingen. Zie Verificatie en autorisatie voor meer informatie. U kunt ook meer lezen over verificatie zonder wachtwoord op de overzichtspagina.

Rollen toewijzen aan uw Microsoft Entra-gebruiker

Zorg er bij het lokaal ontwikkelen voor dat het gebruikersaccount dat verbinding maakt met Azure Event Hubs over de juiste machtigingen beschikt. U hebt de rol Azure Event Hubs-gegevenseigenaar nodig om berichten te kunnen verzenden en ontvangen. Als u uzelf deze rol wilt toewijzen, hebt u de rol Gebruikerstoegang Beheer istrator of een andere rol nodig die de Microsoft.Authorization/roleAssignments/write actie bevat. U kunt Azure RBAC-rollen toewijzen aan een gebruiker met behulp van Azure Portal, Azure CLI of Azure PowerShell. Meer informatie over de beschikbare bereiken voor roltoewijzingen op de overzichtspagina van het bereik.

In het volgende voorbeeld wordt de Azure Event Hubs Data Owner rol toegewezen aan uw gebruikersaccount, dat volledige toegang biedt tot Azure Event Hubs-resources. Volg in een echt scenario het principe van minimale bevoegdheden om gebruikers alleen de minimale machtigingen te geven die nodig zijn voor een veiligere productieomgeving.

Ingebouwde Azure-rollen voor Azure Event Hubs

Voor Azure Event Hubs is het beheer van naamruimten en alle gerelateerde resources via Azure Portal en de Azure Resource Management-API al beveiligd met behulp van het Azure RBAC-model. Azure biedt de onderstaande ingebouwde Azure-rollen voor het autoriseren van toegang tot een Event Hubs-naamruimte:

Als u een aangepaste rol wilt maken, raadpleegt u Rechten die vereist zijn voor Event Hubs-bewerkingen.

Belangrijk

In de meeste gevallen duurt het een paar minuten voordat de roltoewijzing is doorgegeven in Azure. In zeldzame gevallen kan het maximaal acht minuten duren. Als u verificatiefouten ontvangt wanneer u de code voor het eerst uitvoert, wacht u even en probeert u het opnieuw.

  1. Zoek in Azure Portal uw Event Hubs-naamruimte met behulp van de hoofdzoekbalk of linkernavigatiebalk.

  2. Selecteer op de overzichtspagina toegangsbeheer (IAM) in het linkermenu.

  3. Selecteer op de pagina Toegangsbeheer (IAM) het tabblad Roltoewijzingen .

  4. Selecteer + Toevoegen in het bovenste menu en voeg vervolgens roltoewijzing toe in de resulterende vervolgkeuzelijst.

    A screenshot showing how to assign a role.

  5. Gebruik het zoekvak om de resultaten te filteren op de gewenste rol. In dit voorbeeld zoekt Azure Event Hubs Data Owner en selecteert u het overeenkomende resultaat. Kies vervolgens Volgende.

  6. Selecteer onder Toegang toewijzen de optie Gebruiker, groep of service-principal en kies vervolgens + Leden selecteren.

  7. Zoek in het dialoogvenster naar uw Microsoft Entra-gebruikersnaam (meestal uw user@domain e-mailadres) en kies Vervolgens onderaan het dialoogvenster Selecteren .

  8. Selecteer Beoordelen + toewijzen om naar de laatste pagina te gaan en vervolgens opnieuw beoordelen en toewijzen om het proces te voltooien.

Code schrijven om berichten te verzenden naar de event hub

Voeg een klasse genaamd Sender toe en voeg de volgende code aan de klasse toe:

Belangrijk

  • Werk <NAMESPACE NAME> bij met de naam van uw Event Hubs-naamruimte.
  • Werk <EVENT HUB NAME> bij met de naam van uw Event Hub.
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

Bouw het programma en controleer of er geen fouten zijn. U voert dit programma uit nadat u het ontvangersprogramma hebt uitgevoerd.

Gebeurtenissen ontvangen

De code in deze zelfstudie is gebaseerd op het EventProcessorClient-voorbeeld op GitHub, dat u kunt bekijken om de volledig werkende toepassing weer te geven.

Volg deze aanbevelingen wanneer u Azure Blob Storage gebruikt als controlepuntarchief:

  • Gebruik een afzonderlijke container voor elke consumentengroep. U kunt hetzelfde opslagaccount gebruiken, maar één container per groep gebruiken.
  • Gebruik de container niet voor iets anders en gebruik het opslagaccount niet voor iets anders.
  • Het opslagaccount moet zich in dezelfde regio bevinden als waarin de geïmplementeerde toepassing zich bevindt. Als de toepassing on-premises is, probeert u de dichtstbijzijnde regio te kiezen.

Controleer op de pagina Opslagaccount in Azure Portal in de sectie Blob-service of de volgende instellingen zijn uitgeschakeld.

  • Hiërarchische naamruimte
  • Blob voorlopig verwijderen
  • Versiebeheer

Een Azure Storage en een blobcontainer maken

In deze quickstart gebruikt u Azure Storage (bijvoorbeeld Blob Storage) als controlepuntopslag. Controlepunten is een proces waarbij een gebeurtenisprocessor de positie van de laatste uitgevoerde gebeurtenis binnen een partitie markeert of doorvoert. Het markeren van een controlepunt wordt doorgaans uitgevoerd binnen de functie waarmee de gebeurtenissen worden verwerkt. Zie Gebeurtenisprocessor voor meer informatie over controle punten.

Volg deze stappen om een Azure Storage-account te maken.

  1. Een Azure Storage-account maken
  2. Een blobcontainer maken
  3. Verifiëren bij de blobcontainer

Zorg er bij het lokaal ontwikkelen voor dat het gebruikersaccount dat toegang heeft tot blobgegevens over de juiste machtigingen beschikt. U hebt Inzender voor Opslagblobgegevens nodig om blobgegevens te lezen en schrijven. Als u uzelf deze rol wilt toewijzen, moet u de rol Gebruikerstoegang Beheer istrator of een andere rol met de actie Microsoft.Authorization/roleAssignments/write toegewezen krijgen. U kunt Azure RBAC-rollen toewijzen aan een gebruiker met behulp van Azure Portal, Azure CLI of Azure PowerShell. Meer informatie over de beschikbare bereiken voor roltoewijzingen vindt u op de overzichtspagina van het bereik.

In dit scenario wijst u machtigingen toe aan uw gebruikersaccount, dat is afgestemd op het opslagaccount, om het principe van minimale bevoegdheden te volgen. Deze procedure biedt gebruikers alleen de minimale machtigingen die nodig zijn en maakt veiligere productieomgevingen.

In het volgende voorbeeld wordt de rol Inzender voor opslagblobgegevens toegewezen aan uw gebruikersaccount, dat zowel lees- als schrijftoegang biedt tot blobgegevens in uw opslagaccount.

Belangrijk

In de meeste gevallen duurt het een of twee minuten voordat de roltoewijzing is doorgegeven in Azure, maar in zeldzame gevallen kan het maximaal acht minuten duren. Als u verificatiefouten ontvangt wanneer u de code voor het eerst uitvoert, wacht u even en probeert u het opnieuw.

  1. Zoek uw opslagaccount in Azure Portal met behulp van de hoofdzoekbalk of linkernavigatiebalk.

  2. Selecteer op de overzichtspagina van het opslagaccount toegangsbeheer (IAM) in het menu aan de linkerkant.

  3. Selecteer op de pagina Toegangsbeheer (IAM) het tabblad Roltoewijzingen .

  4. Selecteer + Toevoegen in het bovenste menu en voeg vervolgens roltoewijzing toe in de resulterende vervolgkeuzelijst.

    A screenshot showing how to assign a storage account role.

  5. Gebruik het zoekvak om de resultaten te filteren op de gewenste rol. Zoek in dit voorbeeld naar Inzender voor Opslagblobgegevens en selecteer het overeenkomende resultaat en kies vervolgens Volgende.

  6. Selecteer onder Toegang toewijzen de optie Gebruiker, groep of service-principal en kies vervolgens + Leden selecteren.

  7. Zoek in het dialoogvenster naar uw Microsoft Entra-gebruikersnaam (meestal uw user@domain e-mailadres) en kies Vervolgens onderaan het dialoogvenster Selecteren .

  8. Selecteer Beoordelen + toewijzen om naar de laatste pagina te gaan en vervolgens opnieuw beoordelen en toewijzen om het proces te voltooien.

Event Hubs-bibliotheken toevoegen aan uw Java-project

Voeg de volgende afhankelijkheden toe in het pom.xml-bestand.

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.15.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.16.1</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.8.0</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. Voeg de volgende import instructies toe boven aan het Java-bestand.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. Maak een klasse genaamd Receiver toe en voeg de reeks variabelen aan de klasse toe. Vervang de plaatsaanduidingen door de correcte waarden.

    Belangrijk

    Vervang de plaatsaanduidingen door de correcte waarden.

    • <NAMESPACE NAME> met de naam van uw Event Hubs-naamruimte.
    • <EVENT HUB NAME> met de naam van uw Event Hub in de naamruimte.
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. Voeg de volgende methode main toe aan de klasse.

    Belangrijk

    Vervang de plaatsaanduidingen door de correcte waarden.

    • <STORAGE ACCOUNT NAME> met de naam van uw Azure Storage-account.
    • <CONTAINER NAME> met de naam van de blobcontainer in het opslagaccount
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. Voeg de twee helper-methoden (PARTITION_PROCESSOR en ERROR_HANDLER) toe waarmee gebeurtenissen en fouten naar de klasse Receiver worden verwerkt.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. Bouw het programma en controleer of er geen fouten zijn.

De toepassingen uitvoeren

  1. Voer eerst de ontvangertoepassing uit.

  2. Voer vervolgens de toepassing Afzender uit.

  3. Controleer in het venster Ontvangertoepassing of u de gebeurtenissen ziet die zijn gepubliceerd door de toepassing Afzender.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Druk op ENTER in het ontvangerstoepassingsvenster om de toepassing te stoppen.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Volgende stappen

Bekijk de volgende voorbeelden op GitHub: