Share via


Azure Functions の Apache Kafka トリガー

Azure Functions で Apache Kafka トリガーを使用して、Kafka トピックのメッセージに応答して関数コードを実行できます。 Kafka 出力バインドを使用して、関数からトピックに書き込むこともできます。 セットアップと構成の詳細については、「Azure Functions における Apache Kafka バインドの概要」を参照してください。

重要

Kafka バインドは、エラスティック Premium プランおよび 専用 (App Service) プランの Functions でのみ使用できます。 これらは、バージョン 3.x 以降のバージョンの Functions ランタイムでのみサポートされます。

トリガーの使用方法は、拡張機能パッケージのバージョンと、関数アプリで使用される C# のモダリティによって異なり、次のモードのいずれかになります。

分離ワーカー プロセス クラス ライブラリでコンパイルされた C# 関数は、ランタイムから分離されたプロセスで実行されます。

使用する属性は、個別のイベント プロバイダーによって異なります。

次の例は、Kafka メッセージを Kafka イベントとして読み取り、ログする C# 関数を示しています。

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

バッチでイベントを受信するには、次の例に示すように、文字列配列を入力として使用します。

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

次の関数により、Kafka イベントのメッセージとヘッダーがログされます。

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

動作する .NET の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。

注意

同等の TypeScript の例については、Kafka 拡張機能リポジトリを参照してください

function.json ファイルの個別のプロパティは、イベント プロバイダー (以下の例では、Confluent または Azure Event Hubs のいずれかです) によって異なります。 次の例は、Kafka メッセージを読み取ってログする関数の Kafka トリガーを示しています。

次の function.json では、特定のプロバイダーのトリガーを定義しています。

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "topic",
            "brokerList": "%BrokerList%",
            "username": "%ConfluentCloudUserName%",
            "password": "%ConfluentCloudPassword%",
            "protocol": "saslSsl",
            "authenticationMode": "plain",
            "consumerGroup" : "$Default",
            "dataType": "string"
        }
    ]
}

その後、関数がトリガーされたときに、次のコードが実行されます。

module.exports = async function (context, event) {
    // context.log.info(event)
    context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

バッチでイベントを受信するには、次の例に示すように、function.json ファイルで cardinality の値を many に設定します。

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

次のコードにより、イベントの配列が解析され、イベント データがログされます。

module.exports = async function (context, events) {
    function print(event) {
        var eventJson = JSON.parse(event)
        context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
    }
    events.map(print);
};

また、次のコードにより、ヘッダー データがログされます。

module.exports = async function (context, event) {
  function print(kevent) {
    var keventJson = JSON.parse(kevent)
    context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
    context.log.info(`Headers for this message:`)
    let headers =  keventJson.Headers;
    headers.forEach(element => {
        context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`) 
    });
  }
  event.map(print);
};

トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。 次の function.json では、汎用の Avro スキーマを使用して、特定のプロバイダーのトリガーを定義しています。

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaAvroGenericSingle",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

その後、関数がトリガーされたときに、次のコードが実行されます。

module.exports = async function (context, event) {
    context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};

動作する JavaScript の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。

function.json ファイルの個別のプロパティは、イベント プロバイダー (以下の例では、Confluent または Azure Event Hubs のいずれかです) によって異なります。 次の例は、Kafka メッセージを読み取ってログする関数の Kafka トリガーを示しています。

次の function.json では、特定のプロバイダーのトリガーを定義しています。

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

その後、関数がトリガーされたときに、次のコードが実行されます。

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

バッチでイベントを受信するには、次の例に示すように、function.json ファイルで cardinality の値を many に設定します。

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

次のコードにより、イベントの配列が解析され、イベント データがログされます。

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

また、次のコードにより、ヘッダー データがログされます。

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。 次の function.json では、汎用の Avro スキーマを使用して、特定のプロバイダーのトリガーを定義しています。

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

その後、関数がトリガーされたときに、次のコードが実行されます。

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

動作する PowerShell の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。

function.json ファイルの個別のプロパティは、イベント プロバイダー (以下の例では、Confluent または Azure Event Hubs のいずれかです) によって異なります。 次の例は、Kafka メッセージを読み取ってログする関数の Kafka トリガーを示しています。

次の function.json では、特定のプロバイダーのトリガーを定義しています。

{
      "scriptFile": "main.py",
      "bindings": [
        {
          "type": "kafkaTrigger",
          "name": "kevent",
          "topic": "topic",
          "brokerList": "%BrokerList%",
          "username": "%ConfluentCloudUserName%",
          "password": "%ConfluentCloudPassword%",
          "consumerGroup" : "functions",
          "protocol": "saslSsl",
          "authenticationMode": "plain"
        }
    ]
}

その後、関数がトリガーされたときに、次のコードが実行されます。

import logging
from azure.functions import KafkaEvent

def main(kevent : KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

バッチでイベントを受信するには、次の例に示すように、function.json ファイルで cardinality の値を many に設定します。

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "topic" : "message_python",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "BrokerList" : "%BrokerList%"    
        }
    ]
}

次のコードにより、イベントの配列が解析され、イベント データがログされます。

import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

また、次のコードにより、ヘッダー データがログされます。

import logging
import typing
from azure.functions import KafkaEvent
import json
import base64

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        event_dec = event.get_body().decode('utf-8')
        event_json = json.loads(event_dec)
        logging.info("Python Kafka trigger function called for message " + event_json["Value"])
        headers = event_json["Headers"]
        for header in headers:
            logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))

トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。 次の function.json では、汎用の Avro スキーマを使用して、特定のプロバイダーのトリガーを定義しています。

{
  "scriptFile": "main.py",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaTriggerAvroGeneric",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

その後、関数がトリガーされたときに、次のコードが実行されます。

import logging
from azure.functions import KafkaEvent

def main(kafkaTriggerAvroGeneric : KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

動作する Python の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。

トリガーを構成するために使用する注釈は、個別のイベント プロバイダーによって異なります。

次の例は、Kafka イベントの内容を読み取り、ログする Java 関数を示しています。

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

バッチでイベントを受信するには、次の例に示すように、入力文字列を配列として使用します。

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

次の関数により、Kafka イベントのメッセージとヘッダーがログされます。

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。 次の関数では、汎用の Avro スキーマを使用して、特定のプロバイダーのトリガーを定義しています。

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

Confluent で動作する Java の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。

属性

インプロセス分離ワーカー プロセスの C# ライブラリはどちらも、KafkaTriggerAttribute を使用して関数トリガーを定義します。

次の表では、このトリガー属性を使用して設定できるプロパティについて説明します。

パラメーター 説明
BrokerList (必須) トリガーによって監視される Kafka ブローカーの一覧。 詳細については、「接続」を参照してください。
トピック (必須) トリガーによって監視されるトピック。
ConsumerGroup (省略可能) トリガーで使用される Kafka コンシューマー グループ。
AvroSchema (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。
AuthenticationMode (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされる値は、GssapiPlain (既定値)、ScramSha256ScramSha512 です。
ユーザー名 (省略可能) SASL 認証のユーザー名。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
パスワード (省略可能) SASL 認証のパスワード。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
プロトコル (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされる値は、plaintext (既定値)、sslsasl_plaintextsasl_ssl です。
SslCaLocation (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。
SslCertificateLocation (省略可能) クライアントの証明書へのパス。
SslKeyLocation (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。
SslKeyPassword (省略可能) クライアントの証明書のパスワード。

注釈

KafkaTrigger 注釈を使用すると、トピックが受信されたときに実行する関数を作成できます。 サポートされるオプションには、次の要素が含まれます。

要素 説明
name (必須) 関数コード内のキューまたはトピック メッセージを表す変数の名前。
brokerList (必須) トリガーによって監視される Kafka ブローカーの一覧。 詳細については、「接続」を参照してください。
topic (必須) トリガーによって監視されるトピック。
cardinality (省略可能) トリガー入力のカーディナリティを示します。 サポートされる値は、ONE (既定値) とMANY です。 入力が 1 つのメッセージである場合は ONE、入力がメッセージの配列である場合は MANY を使用します。 MANY を使用する場合は、dataType も設定する必要があります。
dataType Functions によりパラメーター値が処理される方法を定義します。 既定では、値は文字列として取得され、Functions により文字列を実際の単純な従来の Java オブジェクト (POJO) への逆シリアル化が試みられます。 string の場合、入力は単なる文字列として扱われます。 binary の場合、メッセージはバイナリ データとして受信され、Functions により実際のパラメーター型 byte[] への逆シリアル化が試みられます。
consumerGroup (省略可能) トリガーで使用される Kafka コンシューマー グループ。
avroSchema (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。
authenticationMode (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされる値は、GssapiPlain (既定値)、ScramSha256ScramSha512 です。
username (省略可能) SASL 認証のユーザー名。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
password (省略可能) SASL 認証のパスワード。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
protocol (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされる値は、plaintext (既定値)、sslsasl_plaintextsasl_ssl です。
sslCaLocation (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。
sslCertificateLocation (省略可能) クライアントの証明書へのパス。
sslKeyLocation (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。
sslKeyPassword (省略可能) クライアントの証明書のパスワード。

構成

次の表は、function.json ファイルで設定したバインド構成のプロパティを説明しています。

function.json のプロパティ 説明
type (必須) kafkaTrigger に設定する必要があります。
direction (必須) in に設定する必要があります。
name (必須) 関数コード内のブローカー データを表す変数の名前。
brokerList (必須) トリガーによって監視される Kafka ブローカーの一覧。 詳細については、「接続」を参照してください。
topic (必須) トリガーによって監視されるトピック。
cardinality (省略可能) トリガー入力のカーディナリティを示します。 サポートされる値は、ONE (既定値) とMANY です。 入力が 1 つのメッセージである場合は ONE、入力がメッセージの配列である場合は MANY を使用します。 MANY を使用する場合は、dataType も設定する必要があります。
dataType Functions によりパラメーター値が処理される方法を定義します。 既定では、値は文字列として取得され、Functions により文字列を実際の単純な従来の Java オブジェクト (POJO) への逆シリアル化が試みられます。 string の場合、入力は単なる文字列として扱われます。 binary の場合、メッセージはバイナリ データとして受信され、Functions により実際のパラメーター型 byte[] への逆シリアル化が試みられます。
consumerGroup (省略可能) トリガーで使用される Kafka コンシューマー グループ。
avroSchema (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。
authenticationMode (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされる値は、GssapiPlain (既定値)、ScramSha256ScramSha512 です。
username (省略可能) SASL 認証のユーザー名。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
password (省略可能) SASL 認証のパスワード。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
protocol (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされる値は、plaintext (既定値)、sslsasl_plaintextsasl_ssl です。
sslCaLocation (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。
sslCertificateLocation (省略可能) クライアントの証明書へのパス。
sslKeyLocation (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。
sslKeyPassword (省略可能) クライアントの証明書のパスワード。

使用方法

Kafka イベントは現在、JSON ペイロードである文字列および文字列配列としてサポートされています。

Kafka メッセージは、JSON ペイロードである文字列および文字列配列として関数に渡されます。

Premium プランでは、Kafka 出力のランタイム スケール監視を有効にして、複数のインスタンスにスケール アウトできるようにする必要があります。 詳細については、「ランタイム スケールを有効にする」を参照してください。

Azure Portal の [コードとテスト] ページの [テストと実行] 機能を使って Kafka トリガーを操作することはできません。 代わりに、トリガーによって監視されているトピックにテスト イベントを直接送信する必要があります。

Kafka トリガーでサポートされている host.json 設定の完全なセットについては、「host.json 設定」を参照してください。

接続

トリガーとバインドに必要なすべての接続情報は、コード内のバインド定義ではなく、アプリケーション設定に保持する必要があります。 これは、コードに格納してはならない資格情報にも言えることです。

重要

資格情報の設定はアプリケーション設定を参照する必要があります。 コードや構成のファイル内に資格情報をハードコーディングしないでください。 ローカルで実行する場合は、資格情報に local.settings.json ファイルを使用します。local.settings.json ファイルは公開しないでください。

Azure の Confluent によって提供されるマネージド Kafka クラスターに接続する場合は、Confluent Cloud 環境の次の認証資格情報がトリガーまたはバインドに設定されていることを確認します。

設定 推奨値 説明
BrokerList BootstrapServer BootstrapServer という名前のアプリ設定には、Confluent Cloud の設定ページで検出されたブートストラップ サーバーの値が含まれています。 値は xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 のようになります。
ユーザー名 ConfluentCloudUsername ConfluentCloudUsername という名前のアプリ設定には、Confluent Cloud Web サイトからの API アクセス キーが含まれています。
パスワード ConfluentCloudPassword ConfluentCloudPassword という名前のアプリ設定には、Confluent Cloud Web サイトから取得した API シークレットが含まれています。

これらの設定に使用する文字列値は、Azure のアプリケーション設定として、またはローカル開発中に local.settings.json ファイル内の Values コレクションに存在する必要があります。

また、バインド定義で ProtocolAuthenticationModeSslCaLocation を設定する必要があります。

次のステップ