Använda Python User Defined Functions (UDF) med Apache Hive och Apache Pig i HDInsight

Lär dig hur du använder användardefinierade Python-funktioner (UDF) med Apache Hive och Apache Pig i Apache Hadoop i Azure HDInsight.

Python på HDInsight

Python2.7 installeras som standard på HDInsight 3.0 och senare. Apache Hive kan användas med den här versionen av Python för dataströmbearbetning. Dataströmbearbetning använder STDOUT och STDIN för att skicka data mellan Hive och UDF.

HDInsight innehåller även Jython, som är en Python-implementering skriven i Java. Jython körs direkt på den virtuella Java-datorn och använder inte strömning. Jython är den rekommenderade Python-tolken när du använder Python med Pig.

Förutsättningar

Anteckning

Lagringskontot som användes i den här artikeln var Azure Storage med säker överföring aktiverat och används därför wasbs i hela artikeln.

Storage-konfiguration

Ingen åtgärd krävs om lagringskontot som används är av typen Storage (general purpose v1) eller StorageV2 (general purpose v2). Processen i den här artikeln ger utdata till minst /tezstaging. En standardkonfiguration för hadoop innehåller i fs.azure.page.blob.dir konfigurationsvariabeln /tezstaging i core-site.xml för tjänsten HDFS. Den här konfigurationen gör att utdata till katalogen är sidblobar, som inte stöds för lagringskontots typ BlobStorage. Om du vill använda BlobStorage för den här artikeln tar du bort /tezstaging från konfigurationsvariabeln fs.azure.page.blob.dir . Konfigurationen kan nås från Ambari-användargränssnittet. Annars får du felmeddelandet: Page blob is not supported for this account type.

Varning

Stegen i det här dokumentet gör följande antaganden:

  • Du skapar Python-skript i din lokala utvecklingsmiljö.
  • Du laddar upp skripten till HDInsight med antingen scp kommandot eller det angivna PowerShell-skriptet.

Om du vill använda Azure Cloud Shell (bash) för att arbeta med HDInsight måste du:

  • Skapa skripten i Cloud Shell-miljön.
  • Använd scp för att ladda upp filerna från Cloud Shell till HDInsight.
  • Använd ssh från Cloud Shell för att ansluta till HDInsight och köra exemplen.

Apache Hive UDF

Python kan användas som UDF från Hive via HiveQL-instruktionen TRANSFORM . Följande HiveQL anropar hiveudf.py till exempel filen som lagras i azure storage-standardkontot för klustret.

add file wasbs:///hiveudf.py;

SELECT TRANSFORM (clientid, devicemake, devicemodel)
    USING 'python hiveudf.py' AS
    (clientid string, phoneLabel string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;

Det här exemplet gör följande:

  1. - add file instruktionen i början av filen lägger till hiveudf.py filen i den distribuerade cachen, så den är tillgänglig för alla noder i klustret.
  2. Instruktionen SELECT TRANSFORM ... USING väljer data från hivesampletable. Den skickar även värdena clientid, devicemake och devicemodel till skriptet hiveudf.py .
  3. Satsen AS beskriver fälten som returneras från hiveudf.py.

Skapa fil

Skapa en textfil med namnet hiveudf.pyi utvecklingsmiljön. Använd följande kod som innehållet i filen:

#!/usr/bin/env python
import sys
import string
import hashlib

while True:
    line = sys.stdin.readline()
    if not line:
        break

    line = string.strip(line, "\n ")
    clientid, devicemake, devicemodel = string.split(line, "\t")
    phone_label = devicemake + ' ' + devicemodel
    print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])

Det här skriptet utför följande åtgärder:

  1. Läser en datarad från STDIN.
  2. Det avslutande newline-tecknet tas bort med hjälp av string.strip(line, "\n ").
  3. När dataströmbearbetning utförs innehåller en enda rad alla värden med ett fliktecken mellan varje värde. Så string.split(line, "\t") kan användas för att dela indata på varje flik och returnera bara fälten.
  4. När bearbetningen är klar måste utdata skrivas till STDOUT som en enda rad, med en flik mellan varje fält. Till exempel print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()]).
  5. Loopen while upprepas tills inget line läsfel har lästs.

Skriptutdata är en sammanlänkning av indatavärdena för devicemake och devicemodel, och en hash för det sammanfogade värdet.

Ladda upp fil (gränssnitt)

Följande kommando ersätter sshuser med det faktiska användarnamnet om det är annorlunda. Ersätt mycluster med det faktiska klusternamnet. Kontrollera att arbetskatalogen är där filen finns.

  1. Använd scp för att kopiera filerna till HDInsight-klustret. Redigera och ange kommandot:

    scp hiveudf.py sshuser@mycluster-ssh.azurehdinsight.net:
    
  2. Använd SSH för att ansluta till klustret. Redigera och ange kommandot:

    ssh sshuser@mycluster-ssh.azurehdinsight.net
    
  3. Från SSH-sessionen lägger du till Python-filerna som laddades upp tidigare till lagringen för klustret.

    hdfs dfs -put hiveudf.py /hiveudf.py
    

Använda Hive UDF (shell)

  1. Om du vill ansluta till Hive använder du följande kommando från den öppna SSH-sessionen:

    beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http'
    

    Det här kommandot startar Beeline-klienten.

  2. Ange följande fråga i prompten 0: jdbc:hive2://headnodehost:10001/> :

    add file wasbs:///hiveudf.py;
    SELECT TRANSFORM (clientid, devicemake, devicemodel)
        USING 'python hiveudf.py' AS
        (clientid string, phoneLabel string, phoneHash string)
    FROM hivesampletable
    ORDER BY clientid LIMIT 50;
    
  3. När den sista raden har angetts ska jobbet starta. När jobbet har slutförts returneras utdata som liknar följande exempel:

    100041    RIM 9650    d476f3687700442549a83fac4560c51c
    100041    RIM 9650    d476f3687700442549a83fac4560c51c
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    
  4. Om du vill avsluta Beeline anger du följande kommando:

    !q
    

Ladda upp fil (PowerShell)

PowerShell kan också användas för att fjärrköra Hive-frågor. Kontrollera att arbetskatalogen finns där hiveudf.py . Använd följande PowerShell-skript för att köra en Hive-fråga som använder skriptet hiveudf.py :

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Revise file path as needed
$pathToStreamingFile = ".\hiveudf.py"

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
   -ResourceGroupName $resourceGroup `
   -Name $storageAccountName)[0].Value

# Create an Azure Storage context
$context = New-AzStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey

# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
    -File $pathToStreamingFile `
    -Blob "hiveudf.py" `
    -Container $container `
    -Context $context

Anteckning

Mer information om hur du laddar upp filer finns i dokumentet Ladda upp data för Apache Hadoop-jobb i HDInsight .

Använda Hive UDF

# Script should stop on failures
$ErrorActionPreference = "Stop"

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"

$HiveQuery = "add file wasbs:///hiveudf.py;" +
                "SELECT TRANSFORM (clientid, devicemake, devicemodel) " +
                "USING 'python hiveudf.py' AS " +
                "(clientid string, phoneLabel string, phoneHash string) " +
                "FROM hivesampletable " +
                "ORDER BY clientid LIMIT 50;"

# Create Hive job object
$jobDefinition = New-AzHDInsightHiveJobDefinition `
    -Query $HiveQuery

# For status bar updates
$activity="Hive query"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting query..."

# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting on query to complete..."

# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
    -JobId $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds

# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
   -Clustername $clusterName `
   -JobId $job.JobId `
   -HttpCredential $creds `
   -DisplayOutputType StandardError
#>

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Retrieving output..."

# Gets the log output
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Utdata för Hive-jobbet bör se ut ungefär som i följande exempel:

100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9

Apache Pig UDF

Ett Python-skript kan användas som en UDF från Pig via -instruktionen GENERATE . Du kan köra skriptet med antingen Jython eller C Python.

  • Jython körs på JVM och kan anropas internt från Pig.
  • C Python är en extern process, så data från Pig på JVM skickas ut till skriptet som körs i en Python-process. Utdata från Python-skriptet skickas tillbaka till Pig.

Om du vill ange Python-tolken använder du register när du refererar till Python-skriptet. Följande exempel registrerar skript med Pig som myfuncs:

  • Så här använder du Jython: register '/path/to/pigudf.py' using jython as myfuncs;
  • Så här använder du C Python: register '/path/to/pigudf.py' using streaming_python as myfuncs;

Viktigt

När du använder Jython kan sökvägen till pig_jython-filen vara antingen en lokal sökväg eller en WASBS:// sökväg. När du använder C Python måste du dock referera till en fil i det lokala filsystemet för noden som du använder för att skicka Pig-jobbet.

När registreringen har passerat är Pig Latin för det här exemplet detsamma för båda:

LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
LOG = FILTER LOGS by LINE is not null;
DETAILS = FOREACH LOG GENERATE myfuncs.create_structure(LINE);
DUMP DETAILS;

Det här exemplet gör följande:

  1. Den första raden läser in exempeldatafilen sample.log till LOGS. Den definierar även varje post som en chararray.
  2. Nästa rad filtrerar bort alla null-värden och lagrar resultatet av åtgärden i LOG.
  3. Därefter itererar den över posterna i LOG och använder GENERATE för att anropa create_structure metoden som finns i Python/Jython-skriptet som lästs in som myfuncs. LINE används för att skicka den aktuella posten till funktionen.
  4. Slutligen dumpas utdata till STDOUT med kommandot DUMP . Det här kommandot visar resultatet när åtgärden har slutförts.

Skapa fil

Skapa en textfil med namnet pigudf.pyi utvecklingsmiljön. Använd följande kod som innehållet i filen:

# Uncomment the following if using C Python
#from pig_util import outputSchema


@outputSchema("log: {(date:chararray, time:chararray, classname:chararray, level:chararray, detail:chararray)}")
def create_structure(input):
    if (input.startswith('java.lang.Exception')):
        input = input[21:len(input)] + ' - java.lang.Exception'
    date, time, classname, level, detail = input.split(' ', 4)
    return date, time, classname, level, detail

I exemplet LINE Pig Latin definieras indata som en chararray eftersom det inte finns något konsekvent schema för indata. Python-skriptet omvandlar data till ett konsekvent schema för utdata.

  1. - @outputSchema instruktionen definierar formatet för de data som returneras till Pig. I det här fallet är det en datapåse, som är en Pig-datatyp. Påsen innehåller följande fält, som alla är chararray (strängar):

    • date – det datum då loggposten skapades
    • time – den tid då loggposten skapades
    • classname – klassnamnet som posten skapades för
    • level – loggnivån
    • detail – utförlig information för loggposten
  2. def create_structure(input) Därefter definierar funktionen som Pig skickar radobjekt till.

  3. Exempeldata, sample.log, överensstämmer främst med schemat date, time, classname, level och detail. Den innehåller dock några rader som börjar med *java.lang.Exception*. Dessa rader måste ändras för att matcha schemat. Instruktionen if söker efter dessa och masserar sedan indata för att flytta strängen *java.lang.Exception* till slutet, vilket gör att data är i linje med det förväntade utdataschemat.

  4. split Därefter används kommandot för att dela upp data med de fyra första blankstegen. Utdata tilldelas till date, time, classname, leveloch detail.

  5. Slutligen returneras värdena till Pig.

När data returneras till Pig har de ett konsekvent schema enligt definitionen i -instruktionen @outputSchema .

Ladda upp fil (gränssnitt)

I kommandona nedan ersätter du sshuser med det faktiska användarnamnet om det är annorlunda. Ersätt mycluster med det faktiska klusternamnet. Kontrollera att arbetskatalogen är där filen finns.

  1. Använd scp för att kopiera filerna till HDInsight-klustret. Redigera och ange kommandot:

    scp pigudf.py sshuser@mycluster-ssh.azurehdinsight.net:
    
  2. Använd SSH för att ansluta till klustret. Redigera och ange kommandot:

    ssh sshuser@mycluster-ssh.azurehdinsight.net
    
  3. Från SSH-sessionen lägger du till Python-filerna som laddades upp tidigare till lagringen för klustret.

    hdfs dfs -put pigudf.py /pigudf.py
    

Använda Pig UDF (shell)

  1. Anslut till pig genom att använda följande kommando från den öppna SSH-sessionen:

    pig
    
  2. Ange följande instruktioner i prompten grunt> :

    Register wasbs:///pigudf.py using jython as myfuncs;
    LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    
  3. När du har angett följande rad bör jobbet starta. När jobbet har slutförts returneras utdata som liknar följande data:

    ((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
    ((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
    ((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
    ((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
    ((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))
    
  4. Använd quit för att avsluta Grunt-gränssnittet och använd sedan följande för att redigera pigudf.py-filen i det lokala filsystemet:

    nano pigudf.py
    
  5. När du är i redigeringsprogrammet avkommentarer du följande rad genom att ta bort # tecknet från början av raden:

    #from pig_util import outputSchema
    

    Den här raden ändrar Python-skriptet så att det fungerar med C Python i stället för Jython. När ändringen har gjorts använder du Ctrl+X för att avsluta redigeraren. Välj Y och sedan Retur för att spara ändringarna.

  6. pig Använd kommandot för att starta gränssnittet igen. När du är i prompten grunt> använder du följande för att köra Python-skriptet med hjälp av C Python-tolken.

    Register 'pigudf.py' using streaming_python as myfuncs;
    LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    

    När det här jobbet har slutförts bör du se samma utdata som när du tidigare körde skriptet med Jython.

Ladda upp fil (PowerShell)

PowerShell kan också användas för att fjärrköra Hive-frågor. Kontrollera att arbetskatalogen finns där pigudf.py . Använd följande PowerShell-skript för att köra en Hive-fråga som använder skriptet pigudf.py :

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Revise file path as needed
$pathToJythonFile = ".\pigudf.py"


# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
   -ResourceGroupName $resourceGroup `
   -Name $storageAccountName)[0].Value

# Create an Azure Storage context
$context = New-AzStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey

# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
    -File $pathToJythonFile `
    -Blob "pigudf.py" `
    -Container $container `
    -Context $context

Använda Pig UDF (PowerShell)

Anteckning

När du skickar ett jobb via fjärranslutning med PowerShell går det inte att använda C Python som tolk.

PowerShell kan också användas för att köra Pig Latin-jobb. Om du vill köra ett Pig Latin-jobb som använder skriptet pigudf.py använder du följande PowerShell-skript:

# Script should stop on failures
$ErrorActionPreference = "Stop"

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"


$PigQuery = "Register wasbs:///pigudf.py using jython as myfuncs;" +
            "LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);" +
            "LOG = FILTER LOGS by LINE is not null;" +
            "DETAILS = foreach LOG generate myfuncs.create_structure(LINE);" +
            "DUMP DETAILS;"

# Create Pig job object
$jobDefinition = New-AzHDInsightPigJobDefinition -Query $PigQuery

# For status bar updates
$activity="Pig job"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."

# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."

# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
    -Job $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds

# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds `
    -DisplayOutputType StandardError
#>

# Progress bar (optional)
Write-Progress -Activity $activity "Retrieving output..."

# Gets the log output
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Utdata för Pig-jobbet bör se ut ungefär som följande data:

((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))

Felsökning

Fel vid körning av jobb

När du kör hive-jobbet kan det uppstå ett fel som liknar följande text:

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20001]: An error occurred while reading or writing to your custom script. It may have crashed with an error.

Det här problemet kan bero på radsluten i Python-filen. Många Windows-redigerare använder som standard CRLF som radslut, men Linux-program förväntar sig vanligtvis LF.

Du kan använda följande PowerShell-instruktioner för att ta bort CR-tecknen innan du laddar upp filen till HDInsight:

Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."

# Wait for completion or failure of specified job

PowerShell-skript

Båda PowerShell-exempelskripten som används för att köra exemplen innehåller en kommenterad rad som visar felutdata för jobbet. Om du inte ser förväntade utdata för jobbet avkommentarer du följande rad och ser om felinformationen indikerar ett problem.

$activity="Pig job"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."

Felinformationen (STDERR) och resultatet av jobbet (STDOUT) loggas också till HDInsight-lagringen.

För det här jobbet... Titta på de här filerna i blobcontainern
Hive /HivePython/stderr

/HivePython/stdout

Pig /PigPython/stderr

/PigPython/stdout

Nästa steg

Om du behöver läsa in Python-moduler som inte tillhandahålls som standard läser du Så här distribuerar du en modul till Azure HDInsight.

Andra sätt att använda Pig, Hive och mer information om hur du använder MapReduce finns i följande dokument: