HDInsight üzerinde Apache Hive ve Apache Hadoop kullanarak Twitter verilerini analiz etme

Twitter verilerini işlemek için Apache Hive kullanmayı öğrenin. Sonuç, belirli bir sözcüğü içeren en çok tweet gönderen Twitter kullanıcılarının listesidir.

Önemli

Bu belgedeki adımlar HDInsight 3.6 üzerinde test edilmiştir.

Verileri alma

Twitter, her tweetin verilerini bir REST API aracılığıyla JavaScript Nesne Gösterimi (JSON) belgesi olarak almanıza olanak tanır. API'de kimlik doğrulaması için OAuth gereklidir.

Twitter uygulaması oluşturma

  1. Bir web tarayıcısından adresinde https://developer.twitter.comoturum açın. Twitter hesabınız yoksa Şimdi kaydolun bağlantısını seçin.

  2. Yeni Uygulama Oluştur'u seçin.

  3. Ad, Açıklama, Web sitesi girin. Web Sitesi alanı için bir URL oluşturabilirsiniz. Aşağıdaki tabloda, kullanılacak bazı örnek değerler gösterilmektedir:

    Alan Değer
    Ad MyHDInsightApp
    Açıklama MyHDInsightApp
    Web Sitesi https://www.myhdinsightapp.com
  4. Evet, kabul ediyorum'u ve ardından Twitter uygulamanızı oluşturun'u seçin.

  5. İzinler sekmesini seçin. Varsayılan izin Salt okunurdur.

  6. Anahtarlar ve Erişim Belirteçleri sekmesini seçin.

  7. Erişim belirtecimi oluştur'u seçin.

  8. Sayfanın sağ üst köşesindeki Test OAuth'ı seçin.

  9. Tüketici anahtarını, Tüketici gizli dizisini, Erişim belirtecini ve Erişim belirteci gizli dizisini not edin.

Tweet'leri indirme

Aşağıdaki Python kodu Twitter'dan 10.000 tweet indirir ve bunları tweets.txtadlı bir dosyaya kaydeder.

Not

Python zaten yüklü olduğundan HDInsight kümesinde aşağıdaki adımlar gerçekleştirilir.

  1. Kümenize bağlanmak için ssh komutunu kullanın. CLUSTERNAME değerini kümenizin adıyla değiştirerek aşağıdaki komutu düzenleyin ve komutu girin:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Tweepy, İlerleme çubuğu ve diğer gerekli paketleri yüklemek için aşağıdaki komutları kullanın:

    sudo apt install python-dev libffi-dev libssl-dev
    sudo apt remove python-openssl
    python -m pip install virtualenv
    mkdir gettweets
    cd gettweets
    virtualenv gettweets
    source gettweets/bin/activate
    pip install tweepy progressbar pyOpenSSL requests[security]
    
  3. gettweets.py adlı bir dosya oluşturmak için aşağıdaki komutu kullanın:

    nano gettweets.py
    
  4. , , Your consumer keyYour access tokenve Your access token secret yerine Your consumer secrettwitter uygulamanızdaki ilgili bilgileri yazarak aşağıdaki kodu düzenleyin. Ardından, düzenlenen kodu gettweets.py dosyasının içeriği olarak yapıştırın.

    #!/usr/bin/python
    
    from tweepy import Stream, OAuthHandler
    from tweepy.streaming import StreamListener
    from progressbar import ProgressBar, Percentage, Bar
    import json
    import sys
    
    #Twitter app information
    consumer_secret='Your consumer secret'
    consumer_key='Your consumer key'
    access_token='Your access token'
    access_token_secret='Your access token secret'
    
    #The number of tweets we want to get
    max_tweets=100
    
    #Create the listener class that receives and saves tweets
    class listener(StreamListener):
        #On init, set the counter to zero and create a progress bar
        def __init__(self, api=None):
            self.num_tweets = 0
            self.pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=max_tweets).start()
    
        #When data is received, do this
        def on_data(self, data):
            #Append the tweet to the 'tweets.txt' file
            with open('tweets.txt', 'a') as tweet_file:
                tweet_file.write(data)
                #Increment the number of tweets
                self.num_tweets += 1
                #Check to see if we have hit max_tweets and exit if so
                if self.num_tweets >= max_tweets:
                    self.pbar.finish()
                    sys.exit(0)
                else:
                    #increment the progress bar
                    self.pbar.update(self.num_tweets)
            return True
    
        #Handle any errors that may occur
        def on_error(self, status):
            print status
    
    #Get the OAuth token
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    #Use the listener class for stream processing
    twitterStream = Stream(auth, listener())
    #Filter for these topics
    twitterStream.filter(track=["azure","cloud","hdinsight"])
    

    İpucu

    Popüler anahtar sözcükleri izlemek için son satırdaki konu filtresini ayarlayın. Betiği çalıştırdığınız sırada popüler anahtar sözcüklerin kullanılması verilerin daha hızlı yakalanmasını sağlar.

  5. Dosyayı kaydetmek için Ctrl + X ve ardından Y tuşlarını kullanın.

  6. Dosyayı çalıştırmak ve tweet'leri indirmek için aşağıdaki komutu kullanın:

    python gettweets.py
    

    İlerleme göstergesi görüntülenir. Tweetler indirildikçe %100'e kadar sayar.

    Not

    İlerleme çubuğunun ilerlemesi uzun sürüyorsa, filtreyi popüler konuları izleyecek şekilde değiştirmeniz gerekir. Filtrenizde konu hakkında birçok tweet olduğunda, gereken 100 tweet'i hızla alabilirsiniz.

Verileri karşıya yükleme

Verileri HDInsight depolama alanına yüklemek için aşağıdaki komutları kullanın:

hdfs dfs -mkdir -p /tutorials/twitter/data
hdfs dfs -put tweets.txt /tutorials/twitter/data/tweets.txt

Bu komutlar verileri kümedeki tüm düğümlerin erişebileceği bir konumda depolar.

HiveQL işini çalıştırma

  1. HiveQL deyimlerini içeren bir dosya oluşturmak için aşağıdaki komutu kullanın:

    nano twitter.hql
    

    Dosyanın içeriği olarak aşağıdaki metni kullanın:

    set hive.exec.dynamic.partition = true;
    set hive.exec.dynamic.partition.mode = nonstrict;
    -- Drop table, if it exists
    DROP TABLE tweets_raw;
    -- Create it, pointing toward the tweets logged from Twitter
    CREATE EXTERNAL TABLE tweets_raw (
        json_response STRING
    )
    STORED AS TEXTFILE LOCATION '/tutorials/twitter/data';
    -- Drop and recreate the destination table
    DROP TABLE tweets;
    CREATE TABLE tweets
    (
        id BIGINT,
        created_at STRING,
        created_at_date STRING,
        created_at_year STRING,
        created_at_month STRING,
        created_at_day STRING,
        created_at_time STRING,
        in_reply_to_user_id_str STRING,
        text STRING,
        contributors STRING,
        retweeted STRING,
        truncated STRING,
        coordinates STRING,
        source STRING,
        retweet_count INT,
        url STRING,
        hashtags array<STRING>,
        user_mentions array<STRING>,
        first_hashtag STRING,
        first_user_mention STRING,
        screen_name STRING,
        name STRING,
        followers_count INT,
        listed_count INT,
        friends_count INT,
        lang STRING,
        user_location STRING,
        time_zone STRING,
        profile_image_url STRING,
        json_response STRING
    );
    -- Select tweets from the imported data, parse the JSON,
    -- and insert into the tweets table
    FROM tweets_raw
    INSERT OVERWRITE TABLE tweets
    SELECT
        cast(get_json_object(json_response, '$.id_str') as BIGINT),
        get_json_object(json_response, '$.created_at'),
        concat(substr (get_json_object(json_response, '$.created_at'),1,10),' ',
        substr (get_json_object(json_response, '$.created_at'),27,4)),
        substr (get_json_object(json_response, '$.created_at'),27,4),
        case substr (get_json_object(json_response,    '$.created_at'),5,3)
            when "Jan" then "01"
            when "Feb" then "02"
            when "Mar" then "03"
            when "Apr" then "04"
            when "May" then "05"
            when "Jun" then "06"
            when "Jul" then "07"
            when "Aug" then "08"
            when "Sep" then "09"
            when "Oct" then "10"
            when "Nov" then "11"
            when "Dec" then "12" end,
        substr (get_json_object(json_response, '$.created_at'),9,2),
        substr (get_json_object(json_response, '$.created_at'),12,8),
        get_json_object(json_response, '$.in_reply_to_user_id_str'),
        get_json_object(json_response, '$.text'),
        get_json_object(json_response, '$.contributors'),
        get_json_object(json_response, '$.retweeted'),
        get_json_object(json_response, '$.truncated'),
        get_json_object(json_response, '$.coordinates'),
        get_json_object(json_response, '$.source'),
        cast (get_json_object(json_response, '$.retweet_count') as INT),
        get_json_object(json_response, '$.entities.display_url'),
        array(
            trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[1].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[2].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[3].text'))),
            trim(lower(get_json_object(json_response, '$.entities.hashtags[4].text')))),
        array(
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))),
            trim(lower(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
        get_json_object(json_response, '$.user.screen_name'),
        get_json_object(json_response, '$.user.name'),
        cast (get_json_object(json_response, '$.user.followers_count') as INT),
        cast (get_json_object(json_response, '$.user.listed_count') as INT),
        cast (get_json_object(json_response, '$.user.friends_count') as INT),
        get_json_object(json_response, '$.user.lang'),
        get_json_object(json_response, '$.user.location'),
        get_json_object(json_response, '$.user.time_zone'),
        get_json_object(json_response, '$.user.profile_image_url'),
        json_response
    WHERE (length(json_response) > 500);
    
  2. Dosyayı kaydetmek için Ctrl + X tuşlarına basın ve ardından Y tuşuna basın.

  3. Dosyada bulunan HiveQL'i çalıştırmak için aşağıdaki komutu kullanın:

    beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http' -i twitter.hql
    

    Bu komut twitter.hql dosyasını çalıştırır. Sorgu tamamlandıktan sonra bir jdbc:hive2//localhost:10001/> istem görürsünüz.

  4. Beeline isteminde, verilerin içeri aktarıldığını doğrulamak için aşağıdaki sorguyu kullanın:

    SELECT name, screen_name, count(1) as cc
    FROM tweets
    WHERE text like "%Azure%"
    GROUP BY name,screen_name
    ORDER BY cc DESC LIMIT 10;
    

    Bu sorgu, ileti metninde Azure sözcüğünü içeren en fazla 10 tweet döndürür.

    Not

    Betikteki filtreyi değiştirdiyseniz Azure'ıgettweets.py kullandığınız filtrelerden biriyle değiştirin.

Sonraki adımlar

Yapılandırılmamış bir JSON veri kümesini yapılandırılmış bir Apache Hive tablosuna dönüştürmeyi öğrendin. HDInsight'ta Hive hakkında daha fazla bilgi edinmek için aşağıdaki belgelere bakın: