Questions? Feedback? powered by Olark live chat software
Hoppa över navigering

Java-based example of using the Kafka Consumer, Producer, and Streaming APIs

Senast uppdaterad: 2016-11-16
Deltagare:
Redigera på GitHub

The examples in this repository demonstrate how to use the Kafka Consumer, Producer, and Streaming APIs with a Kafka on HDInsight cluster.

There are two projects included in this repository:

  • Producer-Consumer: This contains a producer and consumer that use a Kafka topic named test.

  • Streaming: This contains an application that uses the Kafka streaming API (in Kafka 0.10.0 or higher) that reads data from the test topic, splits the data into words, and writes a count of words into the wordcounts topic.

NOTE: This both projects assume Kafka 0.10.0, which is available with Kafka on HDInsight cluster version 3.5.

Producer and Consumer

To run the consumer and producer example, use the following steps:

  1. Fork/Clone the repository to your development environment.

  2. Install Java JDK 7 or higher. This was tested with Oracle Java 7 and 8, but should work under things like OpenJDK as well.

  3. Install Maven.

  4. Assuming Java and Maven are both in the path, and everything is configured fine for JAVA_HOME, use the following commands to build the consumer and producer example:

    cd Producer-Consumer
    mvn clean package
    

    A file named kafka-producer-consumer-1.0-SNAPSHOT.jar is now available in the target directory.

  5. Use SCP to upload the file to the Kafka cluster:

    scp ./target/kafka-producer-consumer-1.0-SNAPSHOT.jar SSHUSER@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

    Replace SSHUSER with the SSH user for your cluster, and replace CLUSTERNAME with the name of your cluster. When prompted enter the password for the SSH user.

  6. Use SSH to connect to the cluster:

    ssh USERNAME@CLUSTERNAME
    
  7. Use the following commands in the SSH session to get the Zookeeper hosts and Kafka brokers for the cluster. You need this information when working with Kafka. Note that JQ is also installed, as it makes it easier to parse the JSON returned from Ambari. Replace PASSWORD with the login (admin) password for the cluster. Replace KAFKANAME with the name of the Kafka on HDInsight cluster.

    sudo apt -y install JQ
    export KAFKAZKHOSTS=`curl --silent -u admin:PASSWORD -G http://headnodehost:8080/api/v1/clusters/KAFKANAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")'`
    export KAFKABROKERS=`curl --silent -u admin:PASSWORD -G http://headnodehost:8080/api/v1/clusters/KAFKANAME/services/HDFS/components/DATANODE | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")'`
    
  8. Use the following to verify that the environment variables have been correctly populated:

    echo '$KAFKAZKHOSTS='$KAFKAZKHOSTS
    echo '$KAFKABROKERS='$KAFKABROKERS
    

    The following is an example of the contents of $KAFKAZKHOSTS:

    zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk3-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181
    

    The following is an example of the contents of $KAFKABROKERS:

    wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
    

    NOTE: This information may change as you perform scaling operations on the cluster, as this adds and removes worker nodes. You should always retrieve the Zookeeper and Broker information before working with Kafka.

  9. This example uses a topic named test. Use the following to create this topic:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
    
  10. Use the producer-consumer example to write records to the topic:

    ./kafka-producer-consumer.jar producer $KAFKABROKERS
    

    If you receive a permission denied error, use the following to make the file executable and then run it again:

    chmod +x kafka-producer-consumer.jar
    

    A counter displays how many records have been written.

  11. Use the producer-consumer to read the records that were just written:

    ./kafka-producer-consumer.jar consumer $KAFKABROKERS
    

    This returns a list of the random sentences, along with a count of how many are read.

Streaming

NOTE: The streaming example expects that you have already setup the test topic from the previous section.

  1. On your development environment, change to the Streaming directory and use the following to create a jar for this project:

    mvn clean package
    
  2. Use SCP to copy the kafka-streaming-1.0-SNAPSHOT.jar file to your HDInsight cluster:

    scp ./target/kafka-streaming-1.0-SNAPSHOT.jar SSHUSER@CLUSTERNAME-ssh.azurehdinsight.net:kafka-streaming.jar
    

    Replace SSHUSER with the SSH user for your cluster, and replace CLUSTERNAME with the name of your cluster. When prompted enter the password for the SSH user.

  3. Once the file has been uploaded, return to the SSH connection to your HDInsight cluster and use the following to start the streaming process in the background:

    ./kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS 2>/dev/null &
    
  4. While it is running, use the producer to send messages to the test topic:

    ./kafka-producer-consumer.jar producer $KAFKABROKERS &>/dev/null &
    
  5. Use the following to view the output that is written to the wordcounts topic:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper $KAFKAZKHOSTS --topic wordcounts --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
    

    NOTE: You have to tell the consumer to print the key (which contains the word value) and the deserializer to use for the key and value in order to view the data.

    The output is similar to the following:

    dwarfs  13635
    ago     13664
    snow    13636
    dwarfs  13636
    ago     13665
    a       13803
    ago     13666
    a       13804
    ago     13667
    ago     13668
    jumped  13640
    jumped  13641
    a       13805
    snow    13637
    
  6. Use Ctrl + C to exit the consumer, then use the fg command to bring the streaming background task to the foreground. Use Ctrl + C to exit it also.