Hızlı Başlangıç: Apache Storm kullanarak Event Hubs'dan olay alma

Apache Storm , sınırsız veri akışlarının güvenilir bir şekilde işlenmesini basitleştiren dağıtılmış bir gerçek zamanlı hesaplama sistemidir. Bu bölümde, Event Hubs'dan olay almak için Azure Event Hubs Storm spout'larının nasıl kullanılacağı gösterilmektedir. Apache Storm'u kullanarak olayları farklı düğümlerde barındırılan birden çok işleme bölebilirsiniz. Storm ile Event Hubs tümleştirmesi, Storm'un Zookeeper yüklemesini kullanarak ilerleme durumunu şeffaf bir şekilde denetleyerek, Event Hubs'dan kalıcı denetim noktalarını ve paralel almaları yöneterek olay tüketimini basitleştirir.

Event Hubs alma desenleri hakkında daha fazla bilgi için bkz. Event Hubs'a genel bakış.

Önkoşullar

Hızlı başlangıç ile başlamadan önce bir Event Hubs ad alanı ve bir olay hub'ı oluşturun. Event Hubs türünde bir ad alanı oluşturmak ve uygulamanızın olay hub'ı ile iletişim kurmak için ihtiyaç duyduğu yönetim kimlik bilgilerini almak için Azure portal kullanın. Ad alanı ve olay hub'ı oluşturmak için bu makaledeki yordamı izleyin.

Proje oluşturma ve kod ekleme

  1. Paketi yerel Maven deposuna yüklemek için aşağıdaki komutu kullanın. Bu, bunu sonraki bir adımda Storm projesine başvuru olarak eklemenize olanak tanır.

    mvn install:install-file -Dfile=target\eventhubs-storm-spout-0.9-jar-with-dependencies.jar -DgroupId=com.microsoft.eventhubs -DartifactId=eventhubs-storm-spout -Dversion=0.9 -Dpackaging=jar
    
  2. Eclipse'te yeni bir Maven projesi oluşturun ( Dosya'ya, Yeni'ye ve sonra Proje'ye tıklayın).

    Dosya -> Yeni -> Proje

  3. Varsayılan Çalışma Alanı konumunu kullan'ı seçin ve İleri'ye tıklayın

  4. maven-archetype-quickstart archetype öğesini seçin ve İleri'ye tıklayın

  5. GroupId ve ArtifactId ekleyin, ardından Son'a tıklayın

  6. pom.xmliçinde düğüme aşağıdaki bağımlılıkları <dependency> ekleyin.

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.2-incubating</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.microsoft.eventhubs</groupId>
        <artifactId>eventhubs-storm-spout</artifactId>
        <version>0.9</version>
    </dependency>
    <dependency>
        <groupId>com.netflix.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>1.3.3</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
        <scope>provided</scope>
    </dependency>
    
  7. src klasöründe Config.properties adlı bir dosya oluşturun ve ve event hub name değerlerini değiştirerek receive rule key aşağıdaki içeriği kopyalayın:

    eventhubspout.username = ReceiveRule
    eventhubspout.password = {receive rule key}
    eventhubspout.namespace = ioteventhub-ns
    eventhubspout.entitypath = {event hub name}
    eventhubspout.partitions.count = 16
    
    # if not provided, will use storm's zookeeper settings
    # zookeeper.connectionstring=localhost:2181
    
    eventhubspout.checkpoint.interval = 10
    eventhub.receiver.credits = 10
    

    eventhub.receiver.credits değeri, storm işlem hattında yayınlanmadan önce kaç olayın toplu işlendiğini belirler. Kolaylık olması açısından, bu örnek bu değeri 10 olarak ayarlar. Üretimde genellikle daha yüksek değerlere ayarlanmalıdır; örneğin, 1024. 1 . Aşağıdaki kodla LoggerBolt adlı yeni bir sınıf oluşturun:

    import java.util.Map;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Tuple;
    
    public class LoggerBolt extends BaseRichBolt {
        private OutputCollector collector;
        private static final Logger logger = LoggerFactory
                  .getLogger(LoggerBolt.class);
    
        @Override
        public void execute(Tuple tuple) {
            String value = tuple.getString(0);
            logger.info("Tuple value: " + value);
    
            collector.ack(tuple);
        }
    
        @Override
        public void prepare(Map map, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
            this.count = 0;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // no output fields
        }
    
    }
    

    Bu Storm cıvatası alınan olayların içeriğini günlüğe kaydeder. Bu, tanımlama listelerini bir depolama hizmetinde depolamak için kolayca genişletilebilir. Olay Hub'ı ile HDInsight Storm örneği, verileri Azure Depolama ve Power BI'da depolamak için de aynı yaklaşımı kullanır.

  8. Aşağıdaki kodla LogTopology adlı bir sınıf oluşturun:

    import java.io.FileReader;
    import java.util.Properties;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.topology.TopologyBuilder;
    import com.microsoft.eventhubs.samples.EventCount;
    import com.microsoft.eventhubs.spout.EventHubSpout;
    import com.microsoft.eventhubs.spout.EventHubSpoutConfig;
    
    public class LogTopology {
        protected EventHubSpoutConfig spoutConfig;
        protected int numWorkers;
    
        protected void readEHConfig(String[] args) throws Exception {
            Properties properties = new Properties();
            if (args.length > 1) {
                properties.load(new FileReader(args[1]));
            } else {
                properties.load(EventCount.class.getClassLoader()
                        .getResourceAsStream("Config.properties"));
            }
    
            String username = properties.getProperty("eventhubspout.username");
            String password = properties.getProperty("eventhubspout.password");
            String namespaceName = properties
                    .getProperty("eventhubspout.namespace");
            String entityPath = properties.getProperty("eventhubspout.entitypath");
            String zkEndpointAddress = properties
                    .getProperty("zookeeper.connectionstring"); // opt
            int partitionCount = Integer.parseInt(properties
                    .getProperty("eventhubspout.partitions.count"));
            int checkpointIntervalInSeconds = Integer.parseInt(properties
                    .getProperty("eventhubspout.checkpoint.interval"));
            int receiverCredits = Integer.parseInt(properties
                    .getProperty("eventhub.receiver.credits")); // prefetch count
                                                                // (opt)
            System.out.println("Eventhub spout config: ");
            System.out.println("  partition count: " + partitionCount);
            System.out.println("  checkpoint interval: "
                    + checkpointIntervalInSeconds);
            System.out.println("  receiver credits: " + receiverCredits);
    
            spoutConfig = new EventHubSpoutConfig(username, password,
                    namespaceName, entityPath, partitionCount, zkEndpointAddress,
                    checkpointIntervalInSeconds, receiverCredits);
    
            // set the number of workers to be the same as partition number.
            // the idea is to have a spout and a logger bolt co-exist in one
            // worker to avoid shuffling messages across workers in storm cluster.
            numWorkers = spoutConfig.getPartitionCount();
    
            if (args.length > 0) {
                // set topology name so that sample Trident topology can use it as
                // stream name.
                spoutConfig.setTopologyName(args[0]);
            }
        }
    
        protected StormTopology buildTopology() {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
    
            EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig);
            topologyBuilder.setSpout("EventHubsSpout", eventHubSpout,
                    spoutConfig.getPartitionCount()).setNumTasks(
                    spoutConfig.getPartitionCount());
            topologyBuilder
                    .setBolt("LoggerBolt", new LoggerBolt(),
                            spoutConfig.getPartitionCount())
                    .localOrShuffleGrouping("EventHubsSpout")
                    .setNumTasks(spoutConfig.getPartitionCount());
            return topologyBuilder.createTopology();
        }
    
        protected void runScenario(String[] args) throws Exception {
            boolean runLocal = true;
            readEHConfig(args);
            StormTopology topology = buildTopology();
            Config config = new Config();
            config.setDebug(false);
    
            if (runLocal) {
                config.setMaxTaskParallelism(2);
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("test", config, topology);
                Thread.sleep(5000000);
                localCluster.shutdown();
            } else {
                config.setNumWorkers(numWorkers);
                StormSubmitter.submitTopology(args[0], config, topology);
            }
        }
    
        public static void main(String[] args) throws Exception {
            LogTopology topology = new LogTopology();
            topology.runScenario(args);
        }
    }
    

    Bu sınıf, örneği oluşturmak için yapılandırma dosyasındaki özellikleri kullanarak yeni bir Event Hubs spout oluşturur. Bu örneğin, olay hub'ı tarafından izin verilen en yüksek paralelliği kullanmak için olay hub'ında bölüm sayısı kadar spouts görevi oluşturduğunu unutmayın.

Sonraki adımlar

Aşağıdaki bağlantıları inceleyerek Event Hubs hakkında daha fazla bilgi edinebilirsiniz: