Stream Analytics に入力としてデータをストリーム配信する

Stream Analytics は、Azure データ ストリームとの統合にきわめて優れており、次の 3 種類のリソースを入力とすることができます。

これらの入力ソースは、Stream Analytics ジョブと同じ Azure サブスクリプションに存在していても、異なるサブスクリプションに存在していてもかまいません。

圧縮

Stream Analytics では、すべての入力ソースにわたる圧縮がサポートされています。 サポートされている圧縮の種類は、None、Gzip、Deflate です。 参照データの圧縮はサポートされていません。 入力データが Avro データに圧縮されている場合、Stream Analytics によって透過的に処理されます。 Avro のシリアル化に圧縮の種類を指定する必要はありません。

入力の作成、編集、またはテスト

Azure portalVisual Studio、および Visual Studio Code を使用して、ストリーミング ジョブに既存の入力を追加、表示、または編集できます。 また、Azure portal、Visual Studio、および Visual Studio Code のサンプル データから、入力接続をテストして、クエリをテストすることができます。 クエリを記述する場合は、FROM 句に入力の一覧を指定します。 ポータルの [クエリ] ページで、使用できる入力の一覧を取得できます。 複数の入力を使用する場合は、それらを JOIN するか、複数の SELECT クエリを記述します。

Note

最良のローカル開発エクスペリエンスを実現するためには、Stream Analytics Tools for Visual Studio Code を使用することを強くお勧めします。 Stream Analytics Tools for Visual Studio 2019 (バージョン 2.6.3000.0) には既知の機能差があり、それは今後改善されません。

Event Hubs からのデータのストリーム配信

Azure Event Hubs は非常にスケーラブルなパブリッシュ/サブスクライブ イベント インジェスターです。 接続されたデバイスとアプリケーションによって生成される大量のデータを処理および分析できるように、イベント ハブでは 1 秒間に数百万件のイベントを収集できます。 Event Hubs と Stream Analytics を一緒に使用することで、リアルタイム分析用のエンド ツー エンドのソリューションを得ることができます。 Event Hubs で Azure にイベントをリアルタイムでフィードし、Stream Analytics ジョブでこれらのイベントをリアルタイムで処理できます。 たとえば、Web クリック、センサーの読み取り、オンライン ログ イベントを Event Hubs に送信できます。 続いて、リアルタイムのフィルター処理、集計、相関関係に入力データとして Event Hubs を使用する Stream Analytics ジョブを作成できます。

EventEnqueuedUtcTime は、イベントが Event Hubs に到着したときのタイムスタンプであり、Event Hubs から Stream Analytics に送信されるイベントの既定のタイムスタンプです。 イベント ペイロードのタイムスタンプを利用してデータをストリームとして処理するには、TIMESTAMP BY キーワードを使用する必要があります。

Event Hubs コンシューマー グループ

イベント ハブの各入力は、独自のコンシューマー グループを持つように構成する必要があります。 ジョブに自己結合または複数の入力が含まれる場合、一部の入力は複数の閲覧者ダウンストリームによって読み取られる可能性があります。 この状況は 1 つのコンシューマー グループの閲覧者数に影響を与えます。 閲覧者の数を各パーティションのコンシューマー グループ別に 5 人とする Event Hubs の上限を回避するには、Stream Analytics ジョブごとにコンシューマー グループを指定するのが最良事例となります。 1 つの Standard レベルのイベント ハブに対して 20 個のコンシューマー グループという制限もあります。 詳細については、Azure Stream Analytics の入力のトラブルシューティングに関するページを参照してください。

Event Hubs から入力を作成する

次の表に示したのは、Azure Portal の [新しい入力] ページで Event Hubs からのデータ入力をストリーム配信するときに使用される各プロパティの説明です。

プロパティ 説明
入力のエイリアス この入力を参照するジョブのクエリで使用するわかりやすい名前。
サブスクリプション イベント ハブ リソースが存在する Azure サブスクリプションを選択します。
イベント ハブの名前空間 Event Hubs 名前空間はイベント ハブのコンテナーです。 イベント ハブを作成するときは、名前空間も作成します。
イベント ハブ名 入力として使用するイベント ハブの名前。
イベント ハブ コンシューマー グループ (推奨) Stream Analytics ジョブごとに個別のコンシューマー グループを使用することをお勧めします。 イベント ハブからデータを取り込むために使用するコンシューマー グループが、この文字列によって識別されます。 コンシューマー グループが指定されていない場合、Stream Analytics ジョブは $Default コンシューマー グループを使用します。
認証モード イベント ハブへの接続に使用する認証の種類を指定します。 接続文字列またはマネージド ID を使用して、イベント ハブで認証できます。 マネージド ID オプションでは、Stream Analytics ジョブに対するシステム割り当てマネージド ID かユーザー割り当てマネージド ID を作成して、イベント ハブで認証することができます。 マネージド ID を使用する場合、マネージド ID は Azure Event Hubs データ受信者または Azure Event Hubs データ所有者のロールのメンバーである必要があります。
イベント ハブ ポリシー名 Event Hubs にアクセスできるようにする共有アクセス ポリシー。 各共有アクセス ポリシーには、名前、設定したアクセス許可、アクセス キーが含まれています。 このオプションは、Event Hubs 設定を手動で指定するオプションを選択しない限り、自動的に設定されます。
パーティション キー これは、ジョブが 1.2 以上の互換性レベルを使用するように構成されている場合にのみ使用できる省略可能なフィールドです。 入力がプロパティでパーティション分割される場合、ここでこのプロパティの名前を追加できます。 このプロパティに PARTITION BY または GROUP BY 句が含まれている場合に、クエリのパフォーマンスを向上させるために使用されます。 このジョブで互換性レベル 1.2 以上が使用されている場合、このフィールドの既定値は PartitionId. です
イベントのシリアル化の形式 受信データ ストリームのシリアル化形式 (JSON、CSV、Avro、Parquet、またはその他 (Protobuf、XML、専用など))。 JSON 形式が仕様に準拠しており、10 進数の先頭に 0 が含まれていないことを確認します。
[エンコード] 現在のところ、UTF-8 が、唯一サポートされているエンコード形式です。
イベントの圧縮タイプ 受信データ ストリームを読み取るために使用される圧縮の種類 (None (デフォルト)、Gzip、Deflate など)。
スキーマ レジストリ (プレビュー) イベント ハブから受信したイベント データのスキーマを含むスキーマ レジストリを選択できます。

Event Hubs ストリーム入力からデータが取得される場合、Stream Analytics クエリで次のメタデータ フィールドにアクセスできます。

プロパティ 説明
EventProcessedUtcTime Stream Analytics でイベントを処理する日時。
EventEnqueuedUtcTime Event Hubs でイベントを受信する日時。
PartitionId 入力アダプターの 0 から始まるパーティション ID。

たとえば、これらのフィールドを使用して、次の例のようにクエリを記述できます。

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Note

IoT Hub ルートのエンドポイントとして Event Hubs を使用する場合は、GetMetadataPropertyValue 関数を使用して IoT Hub メタデータにアクセスすることができます。

IoT Hub からのデータのストリーム配信

Azure IoT Hub は、IoT シナリオ向けに最適化された高度にスケーラブルな発行/サブスクライブ イベント インジェスターです。

IoT Hub から Stream Analytics に着信するイベントの既定のタイムスタンプは、このイベントが IoT Hub に到達したときのタイムスタンプです。このタイムスタンプが EventEnqueuedUtcTime です。 イベント ペイロードのタイムスタンプを利用してデータをストリームとして処理するには、TIMESTAMP BY キーワードを使用する必要があります。

IoT Hub コンシューマー グループ

Stream Analytics IoT Hub の各入力は、独自のコンシューマー グループを持つように構成する必要があります。 ジョブに自己結合または複数の入力が含まれる場合、一部の入力は複数の閲覧者ダウンストリームによって読み取られる可能性があります。 この状況は 1 つのコンシューマー グループの閲覧者数に影響を与えます。 閲覧者の数を各パーティションのコンシューマー グループ別に 5 人とする Azure IoT Hub の上限を回避するには、Stream Analytics ジョブごとにコンシューマー グループを指定するのが最良事例となります。

データ ストリーム入力として IoT Hub を構成する

次の表に、IoT Hub をストリーム入力として構成する場合に使用する、Azure Portal の [新しい入力] ページにある各プロパティの説明を示しています。

プロパティ 説明
入力のエイリアス この入力を参照するジョブのクエリで使用するわかりやすい名前。
サブスクリプション IoT Hub リソースが存在するサブスクリプションを選択します。
IoT Hub 入力として使用する IoT Hub の名前。
コンシューマー グループ Stream Analytics ジョブごとに個別のコンシューマー グループを使用することをお勧めします。 IoT Hub からのデータを取り込むために使用するコンシューマー グループです。 明示的に指定されない限り、Stream Analytics は $Default コンシューマー グループを使用します。
共有アクセス ポリシー名 IoT Hub へのアクセスを提供する共有アクセス ポリシー。 各共有アクセス ポリシーには、名前、設定したアクセス許可、アクセス キーが含まれています。
共有アクセス ポリシー キー IoT Hub へのアクセスを承認するために使用する共有アクセス キー。 IoT Hub の設定を手動で入力するオプションを選択しない限り、このオプションは自動的に事前設定されます。
エンドポイント IoT Hub のエンドポイント。
パーティション キー これは、ジョブが 1.2 以上の互換性レベルを使用するように構成されている場合にのみ使用できる省略可能なフィールドです。 入力がプロパティでパーティション分割される場合、ここでこのプロパティの名前を追加できます。 これは、このプロパティに PARTITION BY または GROUP BY 句が含まれている場合に、クエリのパフォーマンスを向上させるために使用されます。 このジョブで互換性レベル 1.2 以上が使用されている場合、このフィールドの既定値は "PartitionId" です。
イベントのシリアル化の形式 受信データ ストリームのシリアル化形式 (JSON、CSV、Avro、Parquet、またはその他 (Protobuf、XML、専用など))。 JSON 形式が仕様に準拠しており、10 進数の先頭に 0 が含まれていないことを確認します。
[エンコード] 現在のところ、UTF-8 が、唯一サポートされているエンコード形式です。
イベントの圧縮タイプ 受信データ ストリームを読み取るために使用される圧縮の種類 (None (デフォルト)、Gzip、Deflate など)。

IoT Hub からのストリーム データを使用する場合、Stream Analytics クエリで次のメタデータ フィールドにアクセスできます。

プロパティ 説明
EventProcessedUtcTime イベントが処理された日時。
EventEnqueuedUtcTime IoT Hub でイベントを受信する日時。
PartitionId 入力アダプターの 0 から始まるパーティション ID。
IoTHub.MessageId IoT Hub での双方向通信の関連付けに使用する ID。
IoTHub.CorrelationId IoT Hub でのメッセージ応答とフィードバックに使用する ID。
IoTHub.ConnectionDeviceId このメッセージの送信に使用される認証 ID。 この値は、IoT Hub によりサービス宛てのメッセージにスタンプされます。
IoTHub.ConnectionDeviceGenerationId このメッセージの送信に使用された認証済みデバイスの世代 ID。 この値は、IoT Hub により servicebound メッセージにスタンプされます。
IoTHub.EnqueuedTime IoT Hub でメッセージを受信する時刻。

Azure Blob Storage または Data Lake Storage Gen2 からデータをストリーム配信する

大量の非構造化データをクラウドに保存するシナリオでは、Azure Blob Storage または Azure Data Lake Storage Gen2 によって、コスト効率の高いスケーラブルなソリューションが提供されます。 Blob Storage または Azure Data Lake Storage Gen2 内のデータは、保存データと見なされます。 ただし、このデータは Stream Analytics でデータ ストリームとして処理できます。

ログの処理は、Stream Analytics の入力などを使用する場合によく使用されるシナリオです。 このシナリオでは、利用統計情報ファイルがシステムからキャプチャされます。これを解析および処理して意味のあるデータを抽出する必要があります。

Blob Storage の既定のタイムスタンプまたは Stream Analytics の Azure Data Lake Storage Gen2 イベントは、最後に変更されたタイムスタンプであり、BlobLastModifiedUtcTime です。 13:00 に BLOB がストレージ アカウントにアップロードされ、13:01 にオプション [Now](今すぐ) を使用して Azure Stream Analytics ジョブが開始された場合、変更された時間がジョブの実行期間外であるため、BLOB は選択されません。

13:00 に BLOB がストレージ アカウント コンテナーにアップロードされ、13:00 またはそれ以前に [Custom Time](ユーザー設定時刻) を使用して Azure Stream Analytics ジョブが開始された場合、変更された時間がジョブの実行期間内であるため、BLOB は選択されます。

13:00 に [今すぐ] を使用して Azure Stream Analytics ジョブが開始され、13:01 に BLOB がストレージ アカウント コンテナーにアップロードされた場合、Azure Stream Analytics では BLOB を選択します。 各 BLOB に割り当てられたタイムスタンプは BlobLastModifiedTime のみに基づいています。 BLOB が含まれているフォルダーは、割り当てられたタイムスタンプとは関係がありません。 たとえば、2019/10-01/00/b1.txt という BLOB の BlobLastModifiedTime2019-11-11 である場合、この BLOB に割り当てられるタイムスタンプは 2019-11-11 です。

イベント ペイロードのタイムスタンプを利用してデータをストリームとして処理するには、TIMESTAMP BY キーワードを使用する必要があります。 Stream Analytics ジョブでは、BLOB ファイルが使用可能な場合に、毎秒 Azure Blob Storage または Azure Data Lake Storage Gen2 入力からデータをプルします。 BLOB ファイルが使用不可能な場合は、最大で 90 秒の時間遅延がある指数関数的バックオフがあります。

Note

Stream Analytics では、既存の BLOB ファイルにコンテンツを追加できません。 Stream Analytics は各ファイルを 1 回だけ表示します。ジョブがデータを読み取った後にファイルで発生した変更は処理されません。 ベスト プラクティスとして、すべての BLOB ファイルのデータを一度にアップロードし、追加の新しいイベントを、別の新しい BLOB ファイルに追加することをお勧めします。

多くの BLOB が継続的に追加され、追加されるにつれて BLOB が Stream Analytics で処理されるシナリオでは、BlobLastModifiedTime の細分性のため、まれに一部の BLOB がスキップされる可能性があります。 このケースは、2 秒以上の間隔で BLOB をアップロードすることで軽減できます。 このオプションが実行可能でない場合は、Event Hubs を使用して大量のイベントをストリーミングできます。

ストリーム入力として Blob Storage を構成する

次の表に、Blob Storage をストリーム入力として構成する場合に使用する、Azure Portal の [新しい入力] ページにある各プロパティの説明を示しています。

プロパティ 説明
入力のエイリアス この入力を参照するジョブのクエリで使用するわかりやすい名前。
サブスクリプション ストレージ リソースが存在するサブスクリプションを選択します。
ストレージ アカウント BLOB ファイルが配置されるストレージ アカウントの名前。
ストレージ アカウント キー ストレージ アカウントに関連付けられている秘密キー。 この設定を手動で入力するオプションを選択しない限り、このオプションは自動的に事前設定されます。
コンテナー コンテナーは、BLOB の論理的なグループ化を提供します。 コンテナーには、[既存のものを使用] を選択できるほか、[新規作成] を選択して新しいコンテナーを作成することもできます。
認証モード ストレージ アカウントへの接続に使用する認証の種類を指定します。 接続文字列またはマネージド ID を使用して、ストレージ アカウントで認証できます。 マネージド ID オプションでは、Stream Analytics ジョブに対するシステム割り当てマネージド ID かユーザー割り当てマネージド ID を作成して、ストレージ アカウントで認証することができます。 マネージド ID を使用する場合、マネージド ID は、ストレージ アカウントで適切なロールのメンバーである必要があります。
パス パターン (省略可能) 指定されたコンテナー内に BLOB を配置するために使用されるファイル パス。 コンテナーのルートから BLOB を読み取る場合は、パス パターンを設定しないでください。 このパス内に、3 つの変数 ({date}{time}{partition}) の 1 つ以上のインスタンスを指定できます。

例 1: cluster1/logs/{date}/{time}/{partition}

例 2: cluster1/logs/{date}

* 文字はパス プレフィックスの許容値ではありません。 許容値は、有効な Azure BLOB 文字のみです。 コンテナー名またはファイル名を含めないでください。
日付形式 (省略可能) パスで日付変数を使用する場合は、ファイルを編成する日付形式です。 例: YYYY/MM/DD

BLOB 入力のパスに {date} または {time} が含まれている場合、フォルダーは時間の昇順に検索されます。
時刻形式 (省略可能) パスで時刻変数を使用する場合は、ファイルを編成する時刻形式です。 現在唯一サポートされている値は HH (時) です。
パーティション キー これは、ジョブが 1.2 以上の互換性レベルを使用するように構成されている場合にのみ使用できる省略可能なフィールドです。 入力がプロパティでパーティション分割される場合、ここでこのプロパティの名前を追加できます。 これは、このプロパティに PARTITION BY または GROUP BY 句が含まれている場合に、クエリのパフォーマンスを向上させるために使用されます。 このジョブで互換性レベル 1.2 以上が使用されている場合、このフィールドの既定値は "PartitionId" です。
入力パーティション数 このフィールドは、{partition} がパス パターンに存在する場合にのみ存在します。 このプロパティの値は、1 以上の整数です。 pathPattern に {partition} が表示されている場合は、0 からこのフィールド -1 の値までの範囲の数値が使用されます。
イベントのシリアル化の形式 受信データ ストリームのシリアル化形式 (JSON、CSV、Avro、Parquet、またはその他 (Protobuf、XML、専用など))。 JSON 形式が仕様に準拠しており、10 進数の先頭に 0 が含まれていないことを確認します。
[エンコード] CSV と JSON では、現在のところ、UTF-8 が唯一サポートされているエンコード形式です。
圧縮 受信データ ストリームを読み取るために使用される圧縮の種類 (None (デフォルト)、Gzip、Deflate など)。

データが Blob Storage のソースから送信される場合、Stream Analytics クエリでは次のメタデータ フィールドにアクセスできます。

プロパティ 説明
BlobName イベントに起因する入力 BLOB の名前。
EventProcessedUtcTime Stream Analytics でイベントを処理する日時。
BlobLastModifiedUtcTime BLOB が最後に変更された日時。
PartitionId 入力アダプターの 0 から始まるパーティション ID。

たとえば、これらのフィールドを使用して、次の例のようにクエリを記述できます。

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Apache Kafka からデータをストリーミングする

Azure Stream Analytics では、Apache Kafka クラスターに直接接続してデータを取り込むことができます。 このソリューションはロー コードで、Microsoft の Azure Stream Analytics チームによって完全に管理されているため、ビジネス コンプライアンス基準を満たすことができます。 Kafka 入力には下位互換性があり、バージョン 0.10 以降の最新のクライアント リリースを含むすべてのバージョンをサポートしています。 ユーザーは、構成に応じて、仮想ネットワーク内の Kafka クラスターやパブリック エンドポイントを含む Kafka クラスターに接続できます。 この構成は、既存の Kafka 構成規則に依存しています。 サポートされている圧縮の種類は、None、Gzip、Snappy、LZ4、Zstd です。

詳細については、「Kafka から Azure Stream Analytics へのデータのストリーム (プレビュー)」を参照してください。

次のステップ