Integrieren der Apache Kafka Connect-Unterstützung in Azure Event Hubs

Apache Kafka Connect ist ein Framework zum Verbinden und Importieren/Exportieren von Daten aus einem beliebigen externen System bzw. in ein beliebiges externes System wie MySQL, HDFS und ein Dateisystem über einen Kafka-Cluster. In diesem Tutorial wird die Nutzung eines Kafka Connect-Frameworks mit Event Hubs Schritt für Schritt beschrieben.

In diesem Tutorial wird die Vorgehensweise zum Integrieren von Kafka Connect in einen Event Hub und zum Bereitstellen von einfachen FileStreamSource- und FileStreamSink-Connectors beschrieben. Die Connectors sind zwar nicht für die Verwendung in der Produktion bestimmt, aber sie stellen ein End-to-End-Szenario für Kafka Connect dar, bei dem Azure Event Hubs als Kafka-Broker fungiert.

Hinweis

Dieses Beispiel ist auf GitHub verfügbar.

In diesem Tutorial führen Sie die folgenden Schritte aus:

  • Erstellen eines Event Hubs-Namespace
  • Klonen des Beispielprojekts
  • Konfigurieren von Kafka Connect für Event Hubs
  • Ausführen von Kafka Connect
  • Erstellen von Connectors

Voraussetzungen

Stellen Sie für das Durcharbeiten dieses Tutorials zur Vorgehensweise sicher, dass die folgenden Voraussetzungen erfüllt sind:

Erstellen eines Event Hubs-Namespace

Ein Event Hubs-Namespace ist erforderlich, um Nachrichten an einen Event Hubs-Dienst zu senden und von diesem zu empfangen. Anweisungen zum Erstellen eines Namespace und eines Event Hub finden Sie unter Erstellen eines Event Hubs. Rufen Sie die Event Hubs-Verbindungszeichenfolge und den vollqualifizierten Domänennamen (Fully Qualified Domain Name, FQDN) zur späteren Verwendung ab. Anweisungen hierzu finden Sie unter Get an Event Hubs connection string (Abrufen einer Event Hubs-Verbindungszeichenfolge).

Klonen des Beispielprojekts

Klonen Sie das Azure Event Hubs-Repository, und navigieren Sie zum Unterordner „tutorials/connect“:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Konfigurieren von Kafka Connect für Event Hubs

Es ist nur eine minimale Neukonfiguration erforderlich, wenn Sie den Kafka Connect-Durchsatz von Kafka an Event Hubs umleiten. Im folgenden Beispiel connect-distributed.properties ist dargestellt, wie Sie Connect konfigurieren, um die Authentifizierung und Kommunikation mit dem Kafka-Endpunkt unter Event Hubs einzurichten:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Wichtig

Ersetzen Sie {YOUR.EVENTHUBS.CONNECTION.STRING} durch die Verbindungszeichenfolge für Ihren Event Hubs-Namespace. Anweisungen zum Abrufen der Verbindungszeichenfolge finden Sie unter Abrufen einer Event Hubs-Verbindungszeichenfolge. Hier sehen Sie eine Beispielkonfiguration: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Ausführen von Kafka Connect

In diesem Schritt wird ein Kafka Connect-Worker lokal im verteilten Modus gestartet, indem Event Hubs zum Warten des Clusterzustands verwendet wird.

  1. Speichern Sie die obige Datei connect-distributed.properties lokal. Achten Sie darauf, alle Werte in geschweiften Klammern zu ersetzen.
  2. Navigieren Sie zum Speicherort des Kafka-Release auf Ihrem Computer.
  3. Führen Sie ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties aus. Die Connect-Worker-REST-API ist für die Interaktion bereit, wenn 'INFO Finished starting connectors and tasks' angezeigt wird.

Hinweis

Kafka Connect verwendet die Kafka-AdminClient-API, um automatisch Themen mit empfohlenen Konfigurationen, einschließlich Komprimierung, zu erstellen. Eine schnelle Überprüfung des Namespace im Azure-Portal ergibt, dass die internen Themen des Connect-Workers automatisch erstellt wurden.

Interne Kafka Connect-Themen müssen Komprimierung verwenden. Das Event Hubs-Team ist nicht für die Korrektur falscher Konfigurationen zuständig, sollten interne Connect-Themen nicht ordnungsgemäß konfiguriert sein.

Erstellen von Connectors

In diesem Abschnitt wird die Einrichtung der Connectors FileStreamSource und FileStreamSink Schritt für Schritt beschrieben.

  1. Erstellen Sie ein Verzeichnis für Ein- und Ausgabedatendateien.

    mkdir ~/connect-quickstart
    
  2. Erstellen Sie zwei Dateien: eine Datei mit Startdaten, aus der der FileStreamSource-Connector liest, und eine weitere Datei, in die der FileStreamSink-Connector schreibt.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Erstellen Sie einen FileStreamSource-Connector. Achten Sie darauf, dass Sie den Inhalt der geschweiften Klammern durch den Pfad Ihres Basisverzeichnisses ersetzen.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    Auf Ihrer Event Hubs-Instanz sollte nach dem Ausführen des obigen Befehls der Event Hub connect-quickstart angezeigt werden.

  4. Überprüfen Sie den Status des Quellconnectors.

    curl -s http://localhost:8083/connectors/file-source/status
    

    Optional können Sie den Service Bus-Explorer nutzen, um zu überprüfen, ob Ereignisse im Thema connect-quickstart eingegangen sind.

  5. Erstellen Sie einen FileStreamSink-Connector. Achten Sie auch hier wieder darauf, dass Sie den Inhalt der geschweiften Klammern durch den Pfad Ihres Basisverzeichnisses ersetzen.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Überprüfen Sie den Status des Senkenconnectors.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Stellen Sie sicher, dass die Daten zwischen den Dateien repliziert wurden und in beiden Dateien identisch sind.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Cleanup

Mit Kafka Connect werden Event Hub-Themen zum Speichern von Konfigurationen, Offsets und des Status erstellt, die auch dann beibehalten werden, nachdem der Connect-Cluster heruntergefahren wurde. Wenn diese Beibehaltung nicht erwünscht ist, sollten diese Themen gelöscht werden. Sie können auch die connect-quickstart-Event Hubs löschen, der in dieser exemplarischen Vorgehensweise erstellt wurden.

Nächste Schritte

Weitere Informationen zu Event Hubs für Kafka finden Sie in folgenden Artikeln: