Integrieren der Apache Kafka Connect-Unterstützung in Azure Event Hubs
Apache Kafka Connect ist ein Framework zum Verbinden und Importieren/Exportieren von Daten aus einem beliebigen externen System bzw. in ein beliebiges externes System wie MySQL, HDFS und ein Dateisystem über einen Kafka-Cluster. In diesem Tutorial wird die Nutzung eines Kafka Connect-Frameworks mit Event Hubs Schritt für Schritt beschrieben.
In diesem Tutorial wird die Vorgehensweise zum Integrieren von Kafka Connect in einen Event Hub und zum Bereitstellen von einfachen FileStreamSource- und FileStreamSink-Connectors beschrieben. Die Connectors sind zwar nicht für die Verwendung in der Produktion bestimmt, aber sie stellen ein End-to-End-Szenario für Kafka Connect dar, bei dem Azure Event Hubs als Kafka-Broker fungiert.
Hinweis
Dieses Beispiel ist auf GitHub verfügbar.
In diesem Tutorial führen Sie die folgenden Schritte aus:
- Erstellen eines Event Hubs-Namespace
- Klonen des Beispielprojekts
- Konfigurieren von Kafka Connect für Event Hubs
- Ausführen von Kafka Connect
- Erstellen von Connectors
Voraussetzungen
Stellen Sie für das Durcharbeiten dieses Tutorials zur Vorgehensweise sicher, dass die folgenden Voraussetzungen erfüllt sind:
- Azure-Abonnement. Falls Sie nicht über ein Abonnement verfügen, können Sie ein kostenloses Konto erstellen.
- Git-Client
- Linux/macOS
- Die neueste Kafka-Version von kafka.apache.org.
- Lesen Sie den Einführungsartikel Event Hubs für Apache Kafka.
Erstellen eines Event Hubs-Namespace
Ein Event Hubs-Namespace ist erforderlich, um Nachrichten an einen Event Hubs-Dienst zu senden und von diesem zu empfangen. Anweisungen zum Erstellen eines Namespace und eines Event Hub finden Sie unter Erstellen eines Event Hubs. Rufen Sie die Event Hubs-Verbindungszeichenfolge und den vollqualifizierten Domänennamen (Fully Qualified Domain Name, FQDN) zur späteren Verwendung ab. Anweisungen hierzu finden Sie unter Get an Event Hubs connection string (Abrufen einer Event Hubs-Verbindungszeichenfolge).
Klonen des Beispielprojekts
Klonen Sie das Azure Event Hubs-Repository, und navigieren Sie zum Unterordner „tutorials/connect“:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Konfigurieren von Kafka Connect für Event Hubs
Es ist nur eine minimale Neukonfiguration erforderlich, wenn Sie den Kafka Connect-Durchsatz von Kafka an Event Hubs umleiten. Im folgenden Beispiel connect-distributed.properties
ist dargestellt, wie Sie Connect konfigurieren, um die Authentifizierung und Kommunikation mit dem Kafka-Endpunkt unter Event Hubs einzurichten:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
Wichtig
Ersetzen Sie {YOUR.EVENTHUBS.CONNECTION.STRING}
durch die Verbindungszeichenfolge für Ihren Event Hubs-Namespace. Anweisungen zum Abrufen der Verbindungszeichenfolge finden Sie unter Abrufen einer Event Hubs-Verbindungszeichenfolge. Hier sehen Sie eine Beispielkonfiguration: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Ausführen von Kafka Connect
In diesem Schritt wird ein Kafka Connect-Worker lokal im verteilten Modus gestartet, indem Event Hubs zum Warten des Clusterzustands verwendet wird.
- Speichern Sie die obige Datei
connect-distributed.properties
lokal. Achten Sie darauf, alle Werte in geschweiften Klammern zu ersetzen. - Navigieren Sie zum Speicherort des Kafka-Release auf Ihrem Computer.
- Führen Sie
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
aus. Die Connect-Worker-REST-API ist für die Interaktion bereit, wenn'INFO Finished starting connectors and tasks'
angezeigt wird.
Hinweis
Kafka Connect verwendet die Kafka-AdminClient-API, um automatisch Themen mit empfohlenen Konfigurationen, einschließlich Komprimierung, zu erstellen. Eine schnelle Überprüfung des Namespace im Azure-Portal ergibt, dass die internen Themen des Connect-Workers automatisch erstellt wurden.
Interne Kafka Connect-Themen müssen Komprimierung verwenden. Das Event Hubs-Team ist nicht für die Korrektur falscher Konfigurationen zuständig, sollten interne Connect-Themen nicht ordnungsgemäß konfiguriert sein.
Erstellen von Connectors
In diesem Abschnitt wird die Einrichtung der Connectors FileStreamSource und FileStreamSink Schritt für Schritt beschrieben.
Erstellen Sie ein Verzeichnis für Ein- und Ausgabedatendateien.
mkdir ~/connect-quickstart
Erstellen Sie zwei Dateien: eine Datei mit Startdaten, aus der der FileStreamSource-Connector liest, und eine weitere Datei, in die der FileStreamSink-Connector schreibt.
seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
Erstellen Sie einen FileStreamSource-Connector. Achten Sie darauf, dass Sie den Inhalt der geschweiften Klammern durch den Pfad Ihres Basisverzeichnisses ersetzen.
curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
Auf Ihrer Event Hubs-Instanz sollte nach dem Ausführen des obigen Befehls der Event Hub
connect-quickstart
angezeigt werden.Überprüfen Sie den Status des Quellconnectors.
curl -s http://localhost:8083/connectors/file-source/status
Optional können Sie den Service Bus-Explorer nutzen, um zu überprüfen, ob Ereignisse im Thema
connect-quickstart
eingegangen sind.Erstellen Sie einen FileStreamSink-Connector. Achten Sie auch hier wieder darauf, dass Sie den Inhalt der geschweiften Klammern durch den Pfad Ihres Basisverzeichnisses ersetzen.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
Überprüfen Sie den Status des Senkenconnectors.
curl -s http://localhost:8083/connectors/file-sink/status
Stellen Sie sicher, dass die Daten zwischen den Dateien repliziert wurden und in beiden Dateien identisch sind.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Cleanup
Mit Kafka Connect werden Event Hub-Themen zum Speichern von Konfigurationen, Offsets und des Status erstellt, die auch dann beibehalten werden, nachdem der Connect-Cluster heruntergefahren wurde. Wenn diese Beibehaltung nicht erwünscht ist, sollten diese Themen gelöscht werden. Sie können auch die connect-quickstart
-Event Hubs löschen, der in dieser exemplarischen Vorgehensweise erstellt wurden.
Nächste Schritte
Weitere Informationen zu Event Hubs für Kafka finden Sie in folgenden Artikeln: