Java Message Service 1.1 gebruiken met Azure Service Bus Standard en AMQP 1.0

Waarschuwing

Dit artikel biedt beperkte ondersteuning voor de Java Message Service (JMS) 1.1 API en bestaat alleen voor de Azure Service Bus Standard-laag.

Volledige ondersteuning voor de Java Message Service 2.0-API is alleen beschikbaar in de Premium-laag van Azure Service Bus. U wordt aangeraden deze laag te gebruiken.

In dit artikel wordt uitgelegd hoe u Service Bus-berichtenfuncties van Java-toepassingen gebruikt met behulp van de populaire JMS API-standaard. Deze berichtenfuncties omvatten wachtrijen en publiceren of abonneren op onderwerpen. In een aanvullende artikel wordt uitgelegd hoe u hetzelfde kunt doen met behulp van de Azure Service Bus .NET API. U kunt deze twee artikelen samen gebruiken voor meer informatie over platformoverschrijdende berichten met behulp van het Advanced Message Queuing Protocol (AMQP) 1.0.

AMQP 1.0 is een efficiënt, betrouwbaar berichtenprotocol op wire-level dat u kunt gebruiken om robuuste, platformoverschrijdende berichtentoepassingen te bouwen.

Ondersteuning voor AMQP 1.0 in Service Bus betekent dat u de wachtrijen kunt gebruiken en brokered messaging-functies kunt publiceren of abonneren vanaf een scala aan platforms met behulp van een efficiënt binair protocol. U kunt ook toepassingen bouwen die bestaan uit onderdelen die zijn gebouwd met behulp van een combinatie van talen, frameworks en besturingssystemen.

Aan de slag met Service Bus

In dit artikel wordt ervan uitgegaan dat u al een Service Bus-naamruimte hebt die een wachtrij met de naam basicqueuebevat. Als u dat niet doet, kunt u de naamruimte en wachtrij maken met behulp van Azure Portal. Zie Aan de slag met Service Bus-wachtrijen voor meer informatie over het maken van Service Bus-naamruimten en -wachtrijen.

Notitie

Gepartitioneerde wachtrijen en onderwerpen ondersteunen ook AMQP. Zie Gepartitioneerde berichtenentiteiten en AMQP 1.0-ondersteuning voor gepartitioneerde Service Bus-wachtrijen en onderwerpen voor meer informatie.

De AMQP 1.0 JMS-clientbibliotheek downloaden

Zie de Apache Qpid-downloadsite voor informatie over het downloaden van de nieuwste versie van de Apache Qpid JMS AMQP 1.0-clientbibliotheek.

U moet de volgende JAR-bestanden uit het Apache Qpid JMS AMQP 1.0-distributiearchief toevoegen aan de Java CLASSPATH-omgevingsvariabele wanneer u JMS-toepassingen bouwt en uitvoert met Service Bus:

  • geronimo-jms_1.1_spec-1.0.jar
  • qpid-jms-client-[version].jar

Notitie

JMS JAR-namen en -versies zijn mogelijk gewijzigd. Zie Qpid JMS AMQP 1.0 voor meer informatie.

Java-toepassingen coderen

Java-naamgeving en directory-interface

JMS maakt gebruik van de Java Naming and Directory Interface (JNDI) om een scheiding tussen logische namen en fysieke namen te maken. Er worden twee typen JMS-objecten opgelost met behulp van JNDI: Verbinding maken ionFactory en Destination. JNDI maakt gebruik van een providermodel waarin u verschillende adreslijstservices kunt aansluiten om taken voor naamomzetting af te handelen. De Apache Qpid JMS AMQP 1.0-bibliotheek wordt geleverd met een eenvoudige JNDI-provider op basis van eigenschappen die is geconfigureerd met behulp van een eigenschappenbestand met de volgende indeling:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1

JNDI-context instellen en het Verbinding maken ionFactory-object configureren

De verbindingsreeks waarnaar wordt verwezen, is de verbindingsreeks die beschikbaar is in het beleid voor gedeelde toegang in Azure Portal onder de tekenreeks voor primaire Verbinding maken ion.

// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
// connection string. 
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
        
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");

Wachtrijen voor producenten- en consumentenbestemmingen configureren

De vermelding die wordt gebruikt om een bestemming in het JNDI-bestand Qpid-eigenschappen te definiëren, heeft de volgende indeling.

Een doelwachtrij maken voor de producent:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create producer
MessageProducer producer = session.createProducer(queue);

Een doelwachtrij maken voor de consument:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create consumer
MessageConsumer consumer = session.createConsumer(queue);

De JMS-toepassing schrijven

Er zijn geen speciale API's of opties vereist wanneer u JMS met Service Bus gebruikt. Er zijn enkele beperkingen die later worden behandeld. Net als bij elke JMS-toepassing is de configuratie van de JNDI-omgeving vereist om een Verbinding maken ionFactory-object en -bestemmingen op te lossen.

Het JNDI InitialContext-object configureren

De JNDI-omgeving wordt geconfigureerd door een hashtabel met configuratiegegevens door te geven aan de constructor van de klasse javax.naming.InitialContext. De twee vereiste elementen in de hashtabel zijn de klassenaam van de Initial Context Factory en de URL van de provider. De volgende code laat zien hoe u de JNDI-omgeving configureert voor het gebruik van de JNDI-provider op basis van Qpid-eigenschappen met een eigenschappenbestand met de naam servicebus.properties.

// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

Een eenvoudige JMS-toepassing die gebruikmaakt van een Service Bus-wachtrij

Met het volgende voorbeeldprogramma worden JMS-sms-berichten verzonden naar een Service Bus-wachtrij met de logische JNDI-naam van QUEUE en worden de berichten terug ontvangen.

U hebt toegang tot alle broncode- en configuratiegegevens uit de quickstart voor JMS-wachtrijvoorbeelden van Azure Service Bus.

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;

import com.azure.core.amqp.implementation.ConnectionStringProperties;
import org.apache.commons.cli.*;
import org.apache.log4j.*;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
 * This sample demonstrates how to send messages from a JMS queue producer into
 * an Azure Service Bus queue and receive them with a JMS message consumer.
 * JMS queue. 
 */
public class JmsQueueQuickstart {

    // Number of messages to send
    private static int totalSend = 10;
    //Tracking counter for how many messages have been received; used as termination condition
    private static AtomicInteger totalReceived = new AtomicInteger(0);
    // log4j logger 
    private static Logger logger = Logger.getRootLogger();

    public void run(String connectionString) throws Exception {

        // The connection string properties is the only part of the azure-servicebus SDK library
        // we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
        // connection string. 
        ConnectionStringProperties csb = new ConnectionStringProperties(connectionString);
        
        // Set up JNDI context
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
        hashtable.put("queue.QUEUE", "BasicQueue");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
        
        // Look up queue
        Destination queue = (Destination) context.lookup("QUEUE");

        // We create a scope here so we can use the same set of local variables cleanly 
        // again to show the receive side separately with minimal clutter.
        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            // Create producer
            MessageProducer producer = session.createProducer(queue);

            // Send messages
            for (int i = 0; i < totalSend; i++) {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(String.valueOf(i).getBytes());
                producer.send(message);
                System.out.printf("Sent message %d.\n", i + 1);
            }

            producer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            connection.start();
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            // Create a listener callback to receive the messages
            consumer.setMessageListener(message -> {
                try {
                    // Received message is passed to callback
                    System.out.printf("Received message %d with sq#: %s\n",
                            totalReceived.incrementAndGet(), // increments the tracking counter
                            message.getJMSMessageID());
                    message.acknowledge();
                } catch (Exception e) {
                    logger.error(e);
                }
            });

            // Wait on the main thread until all sent messages have been received
            while (totalReceived.get() < totalSend) {
                Thread.sleep(1000);
            }
            consumer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        System.out.printf("Received all messages, exiting the sample.\n");
        System.out.printf("Closing queue client.\n");
    }

    public static void main(String[] args) {

        System.exit(runApp(args, (connectionString) -> {
            JmsQueueQuickstart app = new JmsQueueQuickstart();
            try {
                app.run(connectionString);
                return 0;
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
                return 1;
            }
        }));
    }

    static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

    public static int runApp(String[] args, Function<String, Integer> run) {
        try {

            String connectionString = null;

            // Parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            CommandLineParser clp = new DefaultParser();
            CommandLine cl = clp.parse(options, args);
            if (cl.getOptionValue("c") != null) {
                connectionString = cl.getOptionValue("c");
            }

            // Get overrides from the environment
            String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
            if (env != null) {
                connectionString = env;
            }

            if (connectionString == null) {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
                return 2;
            }
            return run.apply(connectionString);
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 3;
        }
    }
}

De toepassing uitvoeren

Geef de Verbinding maken ion-tekenreeks door vanuit het beleid voor gedeelde toegang om de toepassing uit te voeren. De volgende uitvoer is van het formulier waarop de toepassing wordt uitgevoerd:

> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"

Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.

AMQP-verwijdering en Service Bus-bewerkingstoewijzing

Hier ziet u hoe een AMQP-verwijdering wordt omgezet in een Service Bus-bewerking:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

JMS-onderwerpen versus Service Bus-onderwerpen

Het gebruik van Service Bus-onderwerpen en -abonnementen via de JMS-API biedt basismogelijkheden voor verzenden en ontvangen. Het is een handige keuze wanneer u toepassingen van andere berichtbrokers overdraaft met JMS-compatibele API's, zelfs als Service Bus-onderwerpen verschillen van JMS-onderwerpen en enkele aanpassingen vereisen.

Service Bus-onderwerpen routeren berichten naar benoemde, gedeelde en duurzame abonnementen die worden beheerd via de Azure Resource Management-interface, de Azure-opdrachtregelprogramma's of Azure Portal. Elk abonnement biedt maximaal 2000 selectieregels, die elk een filtervoorwaarde kunnen hebben en, voor SQL-filters, ook een actie voor metagegevenstransformatie. Elke overeenkomst met filtervoorwaarde selecteert het invoerbericht dat moet worden gekopieerd naar het abonnement.

Het ontvangen van berichten van abonnementen is identiek aan het ontvangen van berichten van wachtrijen. Elk abonnement heeft een gekoppelde wachtrij met onbestelbare berichten en de mogelijkheid om berichten automatisch door te sturen naar een andere wachtrij of onderwerpen.

Met JMS-onderwerpen kunnen clients dynamisch niet-beveiligbare en duurzame abonnees maken die optioneel filteren van berichten met berichtkiezers toestaan. Deze niet-gedeelde entiteiten worden niet ondersteund door Service Bus. De syntaxis van de SQL-filterregel voor Service Bus is vergelijkbaar met de syntaxis van de berichtkiezer die wordt ondersteund door JMS.

De uitgeverszijde van het JMS-onderwerp is compatibel met Service Bus, zoals wordt weergegeven in dit voorbeeld, maar dynamische abonnees zijn dat niet. De volgende topologiegerelateerde JMS-API's worden niet ondersteund met Service Bus.

Niet-ondersteunde functies en beperkingen

De volgende beperkingen bestaan wanneer u JMS gebruikt via AMQP 1.0 met Service Bus, namelijk:

  • Per sessie is slechts één MessageProducer - of MessageConsumer-object toegestaan. Als u meerdere MessageProducer- of MessageConsumer-objecten in een toepassing wilt maken, maakt u een toegewezen sessie voor elk van deze objecten.
  • Vluchtige onderwerpabonnementen worden momenteel niet ondersteund.
  • MessageSelector-objecten worden momenteel niet ondersteund.
  • Gedistribueerde transacties worden niet ondersteund, maar transacties worden wel ondersteund.

Service Bus splitst het besturingsvlak van het gegevensvlak, zodat het geen ondersteuning biedt voor verschillende dynamische topologiefuncties van JMS.

Niet-ondersteunde methode Replace with
createDurableSubscriber Maak een onderwerpabonnement dat de berichtkiezer aanwijst.
createDurableConsumer Maak een onderwerpabonnement dat de berichtkiezer aanwijst.
createSharedConsumer Service Bus-onderwerpen zijn altijd deelbaar. Zie de sectie 'JMS-onderwerpen versus Service Bus-onderwerpen'.
createSharedDurableConsumer Service Bus-onderwerpen zijn altijd deelbaar. Zie de sectie 'JMS-onderwerpen versus Service Bus-onderwerpen'.
createTemporaryTopic Maak een onderwerp via de beheer-API, hulpprogramma's of de portal waarvoor AutoDeleteOnIdle is ingesteld op een verloopperiode.
createTopic Maak een onderwerp via de beheer-API, hulpprogramma's of de portal.
Uitschrijven Verwijder de API, hulpprogramma's of portal voor onderwerpbeheer.
createBrowser Niet ondersteund. Gebruik de functie Peek() van de Service Bus-API.
createQueue Maak een wachtrij via de beheer-API, hulpprogramma's of de portal.
createTemporaryQueue Maak een wachtrij via de beheer-API, hulpprogramma's of de portal waarvoor AutoDeleteOnIdle is ingesteld op een verloopperiode.
receiveNoWait Gebruik de receive()-methode die wordt geleverd door de Service Bus SDK en geef een zeer lage of nul time-out op.

Samenvatting

In dit artikel hebt u geleerd hoe u Service Bus Brokered Messaging-functies, zoals wachtrijen, kunt gebruiken en onderwerpen kunt publiceren of abonneren in Java met behulp van de populaire JMS-API en AMQP 1.0.

U kunt Service Bus AMQP 1.0 ook gebruiken vanuit andere talen, zoals .NET, C, Python en PHP. Onderdelen die zijn gebouwd met behulp van deze verschillende talen, kunnen berichten betrouwbaar en betrouwbaar uitwisselen met behulp van de AMQP 1.0-ondersteuning in Service Bus.

Volgende stappen