Apache HBase verilerini okuyup yazmak için Apache Spark kullanma

Apache HBase genellikle alt düzey API'si (tarar, alır ve yerleştirir) veya Apache Phoenix kullanılarak sql söz dizimi ile sorgulanır. Apache ayrıca Apache Spark HBase Bağlan or sağlar. Bağlan or, HBase tarafından depolanan verileri sorgulamak ve değiştirmek için kullanışlı ve verimli bir alternatiftir.

Önkoşullar

  • Aynı sanal ağda dağıtılan iki ayrı HDInsight kümesi. Bir HBase ve en az Spark 2.1 (HDInsight 3.6) yüklü olan bir Spark. Daha fazla bilgi için bkz . Azure portalını kullanarak HDInsight'ta Linux tabanlı kümeler oluşturma.

  • Kümelerinizin birincil depolama alanı için URI şeması. Bu düzen Azure Blob Depolama, abfs:// Azure Data Lake Storage 1. Nesil için Azure Data Lake Storage 2. Nesil veya adl:// için wasb:// olacaktır. Blob Depolama için güvenli aktarım etkinleştirilirse, URI olacaktırwasbs://. Ayrıca bkz. güvenli aktarım.

Genel işlem

Spark kümenizin HBase kümenizi sorgulamasını etkinleştirmeye yönelik üst düzey işlem aşağıdaki gibidir:

  1. HBase'de bazı örnek veriler hazırlayın.
  2. HBase küme yapılandırma klasörünüzden (/etc/hbase/conf) hbase-site.xml dosyasını alın ve Spark 2 yapılandırma klasörünüzde (/etc/spark2/conf) hbase-site.xml bir kopyasını yerleştirin. (İsteğE BAĞLI: Bu işlemi otomatikleştirmek için HDInsight ekibi tarafından sağlanan betiği kullanın)
  3. Seçeneğinde Spark HBase Bağlan veya Maven koordinatlarına packages başvurarak komutunu çalıştırınspark-shell.
  4. Şemayı Spark'tan HBase'e eşleyen bir katalog tanımlayın.
  5. RDD veya DataFrame API'lerini kullanarak HBase verileriyle etkileşime geçin.

Apache HBase'de örnek verileri hazırlama

Bu adımda, Apache HBase'de spark kullanarak sorgulayabileceğiniz bir tablo oluşturup doldurursunuz.

  1. ssh HBase kümenize bağlanmak için komutunu kullanın. komutunu HBase kümenizin adıyla değiştirerek HBASECLUSTER düzenleyin ve ardından şu komutu girin:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. hbase shell HBase etkileşimli kabuğunu başlatmak için komutunu kullanın. SSH bağlantınıza aşağıdaki komutu girin:

    hbase shell
    
  3. create komutunu kullanarak iki sütunlu ailelerle bir HBase tablosu oluşturun. Aşağıdaki komutu girin:

    create 'Contacts', 'Personal', 'Office'
    
  4. Belirli bir tablodaki put belirtilen satırda belirtilen sütuna değer eklemek için komutunu kullanın. Aşağıdaki komutu girin:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. exit HBase etkileşimli kabuğunu durdurmak için komutunu kullanın. Aşağıdaki komutu girin:

    exit
    

Kümeler arasında bağlantı kurmak için betikleri çalıştırma

Kümeler arasındaki iletişimi ayarlamak için, kümelerinizde iki betik çalıştırma adımlarını izleyin. Bu betikler, 'İletişimi el ile ayarlama' bölümünde açıklanan dosya kopyalama işlemini otomatikleştirir.

  • HBase kümesinden çalıştırdığınız betik, Spark kümenize bağlı varsayılan depolama alanına ve HBase IP eşleme bilgilerini karşıya yükler hbase-site.xml .
  • Spark kümesinden çalıştırdığınız betik, düzenli aralıklarla iki yardımcı betik çalıştırmak için iki cron işi ayarlar:
    1. HBase cron işi – Spark varsayılan depolama hesabından yerel düğüme yeni hbase-site.xml dosyaları ve HBase IP eşlemesini indirin
    2. Spark cron işi – Spark ölçeklendirmesi olup olmadığını ve kümenin güvenli olup olmadığını denetler. Öyleyse, yerel olarak depolanan HBase IP eşlemesini içerecek şekilde düzenleyin /etc/hosts

NOT: Devam etmeden önce Spark kümesinin depolama hesabını HBase kümenize ikincil depolama hesabı olarak eklediğinizden emin olun. Betikleri belirtildiği gibi sırasıyla kullandığınızdan emin olun.

  1. Değişiklikleri uygulamak için HBase kümenizde Betik Eylemi'ni kullanın ve aşağıdaki noktaları dikkate alın:

    Özellik Değer
    Bash betik URI'si https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
    Düğüm türleri Bölge
    Parametreler -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
    Kalıcı evet
    • SECONDARYS_STORAGE_URL Spark tarafı varsayılan depolama alanının URL'sidir. Parametre Örneği: -s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
  2. Değişiklikleri uygulamak için Spark kümenizde Betik Eylemi'ni kullanın ve aşağıdaki noktaları dikkate alın:

    Özellik Değer
    Bash betik URI'si https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
    Düğüm türleri Baş, Çalışan, Zookeeper
    Parametreler -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
    Kalıcı evet
    • Bu kümenin güncelleştirme olup olmadığını otomatik olarak denetlemesini istediğiniz sıklıkta belirtebilirsiniz. Varsayılan: -s "*/1 * * * * " -h 0 (Bu örnekte Spark cron dakikada bir çalıştırılırken, HBase cron çalışmaz)
    • HBase cron varsayılan olarak ayarlanmadığından, HBase kümenize ölçeklendirme yaparken bu betiği yeniden çalıştırmanız gerekir. HBase kümeniz sık ölçeklendiriliyorsa HBase cron işini otomatik olarak ayarlamayı seçebilirsiniz. Örneğin: -s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc" betiği her 30 dakikada bir denetim gerçekleştirecek şekilde yapılandırılır. Bu, ortak depolama hesabındaki yeni HBase bilgilerinin yerel düğüme indirilmesini otomatikleştirmek için HBase cron zamanlamasını düzenli aralıklarla çalıştırır.

Not

Bu betikler yalnızca HDI 5.0 ve HDI 5.1 kümelerinde çalışır.

İletişimi el ile ayarlama (Yukarıdaki adımda sağlanan betik başarısız olursa isteğe bağlı)

NOT: Kümelerden biri ölçeklendirme etkinliğine her geçtiğinde bu adımların gerçekleştirilmesi gerekir.

  1. hbase-site.xml yerel depolamadan Spark kümenizin varsayılan depolama alanının köküne kopyalayın. Komutunu yapılandırmanızı yansıtacak şekilde düzenleyin. Ardından, açık SSH oturumunuzdan HBase kümesine komutunu girin:

    Söz dizimi değeri Yeni değer
    URI düzeni Depolama alanınızı yansıtacak şekilde değiştirin. Söz dizimi, güvenli aktarım etkin blob depolama içindir.
    SPARK_STORAGE_CONTAINER değerini Spark kümesi için kullanılan varsayılan depolama kapsayıcısı adıyla değiştirin.
    SPARK_STORAGE_ACCOUNT değerini Spark kümesi için kullanılan varsayılan depolama hesabı adıyla değiştirin.
    hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
    
  2. Ardından HBase kümenize ssh bağlantınızdan çıkın.

    exit
    
  3. SSH kullanarak Spark kümenizin baş düğümüne Bağlan. komutunu Spark kümenizin adıyla değiştirerek SPARKCLUSTER düzenleyin ve ardından şu komutu girin:

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
    
  4. Spark kümenizin varsayılan depolama alanından kümenin yerel depolamasındaki Spark 2 yapılandırma klasörüne kopyalamak hbase-site.xml için komutunu girin:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

Spark HBase Bağlan veya başvuran Spark Shell'i çalıştırın

Önceki adımı tamamladıktan sonra, Uygun Spark HBase Bağlan or sürümüne başvurarak Spark kabuğunu çalıştırabilmeniz gerekir.

Örnek olarak, aşağıdaki tabloda iki sürüm ve HDInsight ekibinin şu anda kullandığı ilgili komutlar listelenmiştir. HBase ve Spark sürümleri tabloda belirtilenlerle aynıysa kümeleriniz için aynı sürümleri kullanabilirsiniz.

  1. Spark kümesine açık SSH oturumunuzda bir Spark kabuğu başlatmak için aşağıdaki komutu girin:

    Spark sürümü HDI HBase sürümü SHC sürümü Command
    2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
  2. Bu Spark kabuğu örneğini açık tutun ve Katalog ve sorgu tanımlamaya devam edin. SHC Core deposunda sürümlerinize karşılık gelen jar'ları bulamazsanız okumaya devam edin.

Spark ve HBase sürümlerinin sonraki birleşimleri için bu yapıtlar artık yukarıdaki depoda yayımlanmaz. Jar'ları doğrudan spark-hbase-connector GitHub dalından oluşturabilirsiniz. Örneğin, Spark 2.4 ve HBase 2.1 ile çalışıyorsanız şu adımları tamamlayın:

  1. Depoyu kopyalayın:

    git clone https://github.com/hortonworks-spark/shc
    
  2. Branch-2.4'e gidin:

    git checkout branch-2.4
    
  3. Daldan derleme (.jar dosyası oluşturur):

    mvn clean package -DskipTests
    
  4. Aşağıdaki komutu çalıştırın (oluşturduğunuz .jar dosyasına karşılık gelen .jar adını değiştirdiğinizden emin olun):

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
    
  5. Bu Spark kabuğu örneğini açık tutun ve sonraki bölüme geçin.

Katalog ve sorgu tanımlama

Bu adımda, şemayı Apache Spark'tan Apache HBase'e eşleyen bir katalog nesnesi tanımlarsınız.

  1. Açık Spark Shell'inize aşağıdaki import deyimleri girin:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. HBase'de oluşturduğunuz Kişiler tablosu için bir katalog tanımlamak için aşağıdaki komutu girin:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    Kod:

    1. adlı ContactsHBase tablosu için bir katalog şeması tanımlar.
    2. Satır tuşlarını olarak keytanımlar ve Spark'ta kullanılan sütun adlarını HBase'de kullanılan sütun ailesi, sütun adı ve sütun türüyle eşler.
    3. Satır tuşlarını ayrıntılı olarak, belirli bir sütun ailesi cfrowkeyolan adlandırılmış sütun ()rowkey olarak tanımlar.
  3. HBase'de tablonuzun Contacts çevresinde DataFrame sağlayan bir yöntem tanımlamak için komutunu girin:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. DataFrame'in bir örneğini oluşturun:

    val df = withCatalog(catalog)
    
  5. DataFrame'i sorgulama:

    df.show()
    

    İki veri satırı görmeniz gerekir:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  6. Spark SQL kullanarak HBase tablosunu sorgulayabileceğiniz geçici bir tablo kaydedin:

    df.createTempView("contacts")
    
  7. Tabloda bir SQL sorgusu oluşturun contacts :

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    

    Aşağıdaki gibi sonuçlar görmeniz gerekir:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

Yeni veri ekleme

  1. Yeni bir Kişi kaydı eklemek için bir ContactRecord sınıf tanımlayın:

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. örneğini ContactRecord oluşturun ve bir diziye yerleştirin:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. Yeni veri dizisini HBase'e kaydedin:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. Sonuçları inceleyin:

    df.show()
    

    Şunun gibi bir çıktı görmeniz gerekir:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  5. Aşağıdaki komutu girerek spark kabuğunu kapatın:

    :q
    

Sonraki adımlar