チュートリアル:Stream Analytics で不正な通話のデータを分析し、Power BI ダッシュボードで結果を視覚化する

このチュートリアルでは、Azure Stream Analytics を使用して通話データを分析する方法について説明します。 クライアント アプリケーションによって生成された通話データには、Stream Analytics ジョブによって検出される不正な通話が含まれています。 このチュートリアルの技法は、クレジット カード詐欺やなりすましなど、他の種類の不正行為検出にも使用できます。

このチュートリアルでは、以下のタスクを実行します。

  • サンプル通話データを生成して Azure Event Hubs に送信します。
  • Stream Analytics ジョブを作成します。
  • ジョブの入力と出力を構成します。
  • 不正な通話をフィルター処理するようクエリを定義します。
  • ジョブをテストして開始します。
  • Power BI で結果を視覚化します。

前提条件

始める前に、次の手順が完了していることを確認してください。

  • Azure サブスクリプションをお持ちでない場合は、無料アカウントを作成してください。
  • Microsoft ダウンロード センターから通話イベント ジェネレーター アプリTelcoGenerator.zip をダウンロードします。または、GitHub からソース コードを入手します。
  • Power BI アカウントが必要です。

Azure へのサインイン

Azure portal にサインインします。

イベント ハブの作成

Stream Analytics でデータ ストリームにおける不正な呼び出しを分析する前に、サンプル データをイベント ハブに送信する必要があります。 このチュートリアルでは、Azure Event Hubs を使用して Azure にデータを送信します。

イベント ハブを作成して呼び出しデータをそのイベント ハブに送信するには、次の手順を使用します。

  1. Azure portal にサインインします。

  2. 左側のメニュー [すべてのサービス] を選択し、[モノのインターネット] を選択して、[Event Hubs] 上にマウス ポインターを置いて [+ (追加)] ボタンを選択します。

    Event Hubs の作成ページを示すスクリーンショット。

  3. [名前空間の作成] ページで、これらの手順を実行します。

    1. イベント ハブを作成する Azure サブスクリプションを選びます。

    2. [リソース グループ] で、[新規作成] を選び、リソース グループの名前を入力します。 Event Hubs 名前空間は、このリソース グループに作成されます。

    3. [名前空間名] に、Event Hubs 名前空間の一意の名前を入力します。

    4. [場所] で、名前空間を作成するリージョンを選びます。

    5. [価格レベル] で、[Standard] を選びます。

    6. ページ下部にある [確認と作成] を選択します。

      [名前空間の作成] ページを示すスクリーンショット。

    7. 名前空間の作成ウィザードの [確認と作成] ページで、すべての設定を確認した後、ページの下部にある [作成] を選びます。

  4. 名前空間が正常にデプロイされたら、[リソースに移動] を選択して [Event Hubs 名前空間] ページに移動します。

  5. [Event Hubs 名前空間] ページで、コマンド バーの [+ イベント ハブ] を選択します。

    [Event Hubs 名前空間] ページの [イベント ハブの追加] ボタンを示すスクリーンショット。

  6. [イベント ハブの作成] ページで、イベント ハブの名前を入力します。 [パーティション数] を 2 に設定します。 残りの設定で既定のオプションを使用し、[確認と作成] を選びます。

    [イベント ハブの作成] ページを示すスクリーンショット。

  7. [確認と作成] ページで、ページの下部にある [作成] ボタンをクリックします。 デプロイが成功するまで待ちます。

イベント ハブへのアクセスを許可し、接続文字列を取得する

アプリケーションから Azure Event Hubs にデータを送信できるようにするには、アクセスを許可するポリシーがイベント ハブに必要です。 アクセス ポリシーにより、承認情報を含む接続文字列が生成されます。

  1. [Event Hubs 名前空間] ページで、左側のメニューの [共有アクセス ポリシー] を選択します。

  2. ポリシーの一覧から RootManageSharedAccessKey を選択します。

  3. 次に、[接続文字列 - 主キー] の隣にあるコピー ボタンを選択します。

  4. 接続文字列をテキスト エディターに貼り付けます。 この接続文字列は、次のセクションで必要になります。

    接続文字列は次のようになります。

    Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>

    接続文字列には、EndpointSharedAccessKeyNameSharedAccessKey という複数のキーと値のペアが含まれ、セミコロンで区切られていることに注目してください。

イベント ジェネレーター アプリケーションを起動する

TelcoGenerator アプリを起動する前に、以前に作成した Azure Event Hubs にデータを送信するよう構成する必要があります。

  1. TelcoGenerator.zip ファイルの内容を抽出します。

  2. 任意のテキスト エディターで TelcoGenerator\TelcoGenerator\telcodatagen.exe.config ファイルを開きます。複数の .config ファイルがあるので、正しいものを開くように注意してください。

  3. 構成ファイル内の <appSettings> 要素を次の詳細で更新します。

    • EventHubName キーの値を、接続文字列の末尾にある EntityPath の値に設定します。
    • Microsoft.ServiceBus.ConnectionString キーの値を、名前空間への接続文字列に設定します。 名前空間ではなくイベント ハブへの接続文字列を使用する場合は、末尾の EntityPath 値 (;EntityPath=myeventhub) を削除します。 EntityPath 値の前にあるセミコロンを必ず削除してください。
  4. ファイルを保存します。

  5. 次に、コマンド ウィンドウを開き、TelcoGenerator アプリケーションを解凍したフォルダーに変更します。 次のコマンドを入力します。

    .\telcodatagen.exe 1000 0.2 2
    

    このコマンドは、次のパラメーターを受け取ります。

    • 1 時間あたりの呼び出しデータ レコードの数。
    • 不正の確率のパーセンテージ。これは、アプリが不正な呼び出しをシミュレートする頻度です。 値 0.2 は、呼び出しレコードの約 20% が不正に見えることを意味します。
    • 継続時間。これはアプリを実行する時間数です。 また、コマンド ラインでプロセスを終了する (Ctrl + C) ことで、いつでもアプリを停止できます。

    数秒後に、アプリはイベント ハブに送信する呼び出しレコードを画面に表示し始めます。 通話データには、次のフィールドが含まれています。

    レコード 定義
    CallrecTime 通話開始時刻のタイムスタンプ。
    SwitchNum 通話の接続に使われた電話交換機。 この例では、交換機は発信国/地域を表す文字列です (US、China、UK、Germany、Australia)。
    CallingNum 発信元の電話番号。
    CallingIMSI IMSI (International Mobile Subscriber Identity: 国際携帯機器加入者識別番号)。 発信元の一意識別子。
    CalledNum 通話受信者の電話番号。
    CalledIMSI IMSI (International Mobile Subscriber Identity: 国際携帯機器加入者識別番号)。 通話受信者の一意識別子。

Stream Analytics のジョブの作成

通話イベントのストリームを準備できたら、イベント ハブからデータを読み取る Stream Analytics ジョブを作成できます。

  1. Stream Analytics ジョブを作成するには、Azure portal に移動します。
  2. [リソースの作成] を選択し、 [Stream Analytics ジョブ] を検索します。 [Stream Analytics ジョブ] タイルを選択し、 [作成] を選択します。
  3. [新しい Stream Analytics ジョブ] ページで、次の手順のようにします。
    1. [サブスクリプション] で、Event Hubs 名前空間を含むサブスクリプションを選択します。

    2. [リソース グループ] で、前に作成したリソース グループを選びます。

    3. [インスタンスの詳細] セクションの [名前] に、Stream Analytics ジョブの一意の名前を入力します。

    4. [リージョン] で、Stream Analytics ジョブを作成するリージョンを選びます。 最適なパフォーマンスを実現し、リージョン間でのデータ転送の料金がかからないように、ジョブとイベント ハブを同じリージョンに配置することをお勧めします。

    5. [ホスティング環境] で、まだ選んでいない場合は [クラウド] を選びます。 Stream Analytics ジョブは、クラウドまたはエッジにデプロイすることができます。 クラウドでは Azure Cloud にデプロイすることができ、エッジでは IoT Edge デバイスにデプロイすることができます。

    6. [ストリーミング ユニット] で、[1] を選びます。 ストリーミング ユニットとは、ジョブの実行に必要なコンピューティング リソースのことです。 既定では、この値は 1 に設定されています。 ストリーミング ユニットのスケーリングについては、ストリーミング ユニットの理解と調整に関する記事を参照してください。

    7. ページ下部にある [確認と作成] を選択します。

      [Azure Stream Analytics ジョブの作成] ページを示すスクリーンショット。

  4. [確認と作成] ページで設定を確認し、[作成] を選んで Stream Analytics ページを作成します。
  5. ジョブがデプロイされたら、[リソースに移動] を選んで [Stream Analytics ジョブ] ページに移動します。

ジョブの入力を構成する

次の手順では、前のセクションで作成したイベント ハブを使用してデータを読み取るためにジョブの入力ソースを定義します。

  1. [Stream Analytics ジョブ] ページの左側のメニューの [ジョブ トポロジ] セクションで、[入力] を選択します。

  2. [入力] ページで、[+ 入力の追加][イベント ハブ] を選択します。

    Stream Analytics ジョブの [入力] ページを示すスクリーンショット。

  3. [イベント ハブ] ページで、次の手順のようにします。

    1. [入力のエイリアス] に「CallStream」と入力します。 [入力のエイリアス] には、入力を識別するためのわかりやすい名前を入力します。 入力のエイリアスに含めることのできる文字は、英数字、ハイフン、アンダースコアのみであり、長さは 3 文字以上 63 文字以下でなければなりません。

    2. [Subscription] で、イベント ハブを作成した Azure サブスクリプションを選択します。 イベント ハブは、Stream Analytics ジョブと同じサブスクリプションにも別のサブスクリプションにも含めることができます。

    3. [イベント ハブの名前空間] には、前のセクションで作成したイベント ハブの名前空間を選択します。 現在のサブスクリプションで利用可能な名前空間がすべて、ドロップダウンに表示されます。

    4. [イベント ハブの名前] には、前のセクションで作成したイベント ハブを選びます。 選択した名前空間で利用可能なイベント ハブがすべて、ドロップダウンに表示されます。

    5. [イベント ハブ コンシューマー グループ] で、[新規作成] オプションをオンのままにして、イベント ハブに新しいコンシューマー グループが作成されるようにします。 Stream Analytics ジョブごとに個別のコンシューマー グループを使用することをお勧めします。 コンシューマー グループが指定されていない場合、Stream Analytics ジョブは $Default コンシューマー グループを使用します。 ジョブに自己結合または複数の入力が含まれる場合、一部の入力は複数の閲覧者ダウンストリームによって読み取られる可能性があります。 この状況は 1 つのコンシューマー グループの閲覧者数に影響を与えます。

    6. [認証モード] で、[接続文字列] を選びます。 このオプションを使用してチュートリアルをテストした方が簡単です。

    7. [イベント ハブ ポリシー名] で、[既存のものを使用] を選び、以前に作成したポリシーを選択します。

    8. ページの下部にある [保存] を選択します。

      入力用の [Event Hubs 構成] ページを示すスクリーンショット。

ジョブの出力を構成する

最後の手順では、変換後のデータを書き込むことができる場所として、ジョブの出力シンクを定義します。 このチュートリアルでは、Power BI を使用してデータの出力と視覚化を行います。

  1. Azure portal から [すべてのリソース] を開き、ASATutorial Stream Analytics ジョブを選択します。

  2. [Stream Analytics ジョブ] の [ジョブ トポロジ] セクションで、 [出力] オプションを選択します。

  3. [+ 追加]>[Power BI] の順に選択します。

  4. 出力フォームに、次の詳細を入力します。

    設定 推奨値
    出力エイリアス MyPBIoutput
    グループ ワークスペース マイ ワークスペース
    データセットの名前 ASAdataset
    テーブル名 ASATable
    認証モード ユーザー トークン
  5. [認可] を選択し、プロンプトに従って Power BI を認証します。

    Stream Analytics の出力を構成する

  6. [Power BI] ページの下部にある [保存] を選択します。

    このチュートリアルでは、 [ユーザー トークン] 認証モードを使用します。 マネージド ID を使用するには、「マネージド ID を使用して、Power BI 出力に対して Azure Stream Analytics ジョブを認証する」を参照してください。

リアルタイム データを変換するクエリを作成する

ここまでで、着信データ ストリームを読み取る Stream Analytics ジョブを設定しました。 次に、リアルタイムでデータを分析するクエリを作成します。 このクエリは、Stream Analytics に固有のいくつかの拡張を含む SQL に似た言語を使います。

チュートリアルのこのセクションでは、複数のクエリを作成してテストし、分析のために入力ストリームを変換するいくつかの方法を学習します。

ここで作成するクエリは、変換されたデータを画面に表示するだけです。 後のセクションでは、変換されたデータを Power BI に書き込みます。

言語に関する詳細については、 Azure Stream Analytics クエリ言語リファレンスを参照してください。

パススルー クエリを使ってテストする

すべてのイベントをアーカイブする場合は、パススルー クエリを使って、イベントのペイロード内のすべてのフィールドを読み取ることができます。

  1. Azure portal で Stream Analytics ジョブに移動し、左側のメニューの [ジョブ トポロジ][クエリ] を選択します。

  2. クエリ ウィンドウに次のクエリを入力します。

    SELECT 
        *
    FROM 
        CallStream
    

    Note

    SQL と同じように、キーワードに大文字と小文字の区別はなく、空白は重要ではありません。

    このクエリで、CallStream は入力を作成するときに指定した別名です。 別の別名を使った場合は、代わりにその名前を使います。

  3. [Test query](クエリのテスト) を選択します。

    Stream Analytics ジョブは、入力からのサンプル データに対してクエリを実行し、ウィンドウの下部に出力を表示します。 結果は、イベント ハブと Streaming Analytics ジョブが正しく構成されていることを示します

    クエリのテストからのサンプル出力。

    表示される正確なレコード数は、サンプルでキャプチャされたレコードの数によって異なります。

列のプロジェクションを使ってフィールドの数を減らす

多くの場合、分析に入力ストリームのすべての列は必要ありません。 クエリを使って、パススルー クエリより少ない数のフィールドを返すことができます。

次のクエリを実行し、出力を確認します。

SELECT CallRecTime, SwitchNum, CallingIMSI, CallingNum, CalledNum 
INTO
    [MyPBIoutput]
FROM 
    CallStream

リージョン別に着信通話の数をカウントする:集計を含むタンブリング ウィンドウ

リージョンごとに受信通話の数をカウントしたいものとします。 ストリーミング データでは、カウントのような集計関数を実行する場合、ストリームをテンポラル ユニットに分割する必要があります。これは、データ ストリーム自体が事実上エンドレスであるためです。 これを行うには、Streaming Analytics のウィンドウ関数を使います。 その後、そのウィンドウ内のデータを単位として処理できます。

この変換では、重ならないテンポラル ウィンドウのシーケンスを使います。各ウィンドウの個別のデータ セットをグループ化して集計できます。 この種のウィンドウは、"タンブリング ウィンドウ" と呼ばれます。 タンブリング ウィンドウでは、発信元の国/地域を表す SwitchNum でグループ化された受信通話の数を取得できます。

  1. 次のクエリをクエリ エディターに貼り付けます。

    SELECT 
        System.Timestamp as WindowEnd, SwitchNum, COUNT(*) as CallCount 
    FROM
        CallStream TIMESTAMP BY CallRecTime 
    GROUP BY TUMBLINGWINDOW(s, 5), SwitchNum
    

    このクエリでは、FROM 句で Timestamp By キーワードを使って、タンブリング ウィンドウの定義に使う入力ストリームのタイムスタンプ フィールドを指定します。 この場合、ウィンドウは各レコードの CallRecTime フィールドによってデータをセグメントに分割します。 フィールドを指定しないと、各イベントがイベント ハブに到着した時刻がウィンドウ化操作に使われます。 「Stream Analytics クエリ言語リファレンス」の 「到着時刻とアプリケーション時刻」 をご覧ください。

    プロジェクションに含まれる System.Timestamp は、各ウィンドウの終わりのタイムスタンプを返します。

    タンブリング ウィンドウを使うことを指定するには、GROUP BY 句で TUMBLINGWINDOW 関数を使います。 この関数では、時間単位 (1 マイクロ秒から 1 日まで) とウィンドウ サイズ (単位数) を指定します。 この例では、タンブリング ウィンドウは 5 秒間隔で構成されるので、5 秒間の国/地域別の通話数が表示されます。

  2. [Test query](クエリのテスト) を選択します。 結果では、WindowEnd のタイムスタンプが 5 秒刻みになっていることに注目してください。

自己結合を使って SIM 不正を検出する

この例では、不正な使用を、5 秒以内に別の場所で同じユーザーから発信された呼び出しと考えます。 たとえば、合法的に同じユーザーが米国とオーストラリアで同時に呼び出しを行うことはできません。

このようなケースをチェックするには、ストリーミング データの自己結合を使って、CallRecTime の値に基づいてストリームをそれ自体に結合します。 その後、CallingIMSI の値 (発信番号) が同じなのに SwitchNum の値 (発信国/地域) が同じではない呼び出しレコードを探すことができます。

ストリーミング データで結合を使うときは、一致する行と見なす最大時間差を結合で制限する必要があります 前に説明したように、ストリーミング データは事実上エンドレスです。 リレーションシップの時間限界は、結合の ON 句で DATEDIFF 関数を使って指定します。 この例では、結合は 5 秒間隔の通話データに基づきます。

  1. 次のクエリをクエリ エディターに貼り付けます。

    SELECT System.Timestamp AS WindowEnd, COUNT(*) AS FraudulentCalls
        INTO "MyPBIoutput"
        FROM "CallStream" CS1 TIMESTAMP BY CallRecTime
        JOIN "CallStream" CS2 TIMESTAMP BY CallRecTime
        ON CS1.CallingIMSI = CS2.CallingIMSI
        AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5
        WHERE CS1.SwitchNum != CS2.SwitchNum
        GROUP BY TumblingWindow(Duration(second, 1))
    

    このクエリは、DATEDIFF 関数が結合に含まれることを除けば、他の SQL 結合と似ています。 DATEDIFF のこのバージョンは Streaming Analytics に固有であり、ON...BETWEEN 句で使う必要があります。 パラメーターは、時間単位 (この例では秒) と、結合の 2 つのソースの別名です この機能は、SQL の標準的な DATEDIFF 関数と異なります。

    WHERE 句には、不正な通話に対して、発信元の交換機が同じではないというフラグを設定する条件が含まれます。

  2. [Test query](クエリのテスト) を選択します。 出力を確認し、 [クエリの保存] を選択します。

ジョブを開始して出力を視覚化する

  1. ジョブを開始するには、ジョブの [概要] に移動し、 [開始] を選択します。

  2. ジョブ出力の開始時刻に [現在] を選択し、 [開始] を選択します。 通知バーでジョブの状態を確認できます。

  3. ジョブが成功したら Power BI に移動し、職場または学校アカウントを使用してサインインします。 Stream Analytics ジョブ クエリによる結果の出力が進行中の場合、作成した ASAdataset データセットは [データセット] タブにあります。

  4. Power BI ワークスペースで [+ 作成] を選択し、Fraudulent Calls という名前の新しいダッシュボードを作成します。

  5. ウィンドウの上部にある [編集][タイルの追加] を選択します。 次に、 [カスタム ストリーミング データ][次に] を選択します。 [データセット]ASAdataset を選択します。 [視覚化タイプ] ドロップダウンで [カード] を選択し、[フィールド]不正な呼び出しを追加します。 [次へ] を選択してタイルに名前を入力し、 [適用] を選択してタイルを作成します。

    Power BI ダッシュボードのタイルを作成する

  6. 次のオプションを使用して、もう一度手順 5. を実行します。

    • [視覚化タイプ] では、[折れ線グラフ] を選択します。
    • 軸を追加し、 [windowend] を選びます。
    • 値を追加し、[fraudulentcalls] を選択します。
    • [表示する時間枠] で、過去 10 分間を選びます。
  7. 2 つのタイルが追加されると、ダッシュボードは次の例のようになります。 イベント ハブの送信側アプリケーションと Streaming Analytics アプリケーションが実行されている場合に新しいデータが到着すると Power BI ダッシュボードが定期的に更新されることに注目してください。

    Power BI ダッシュボードでの結果のスクリーンショット。

Web アプリケーションに Power BI ダッシュボードを埋め込む

チュートリアルのこの部分では、Power BI チームが作成したサンプルの ASP.NET Web アプリケーションを使用して、ダッシュボードを埋め込みます。 ダッシュボードの埋め込みの詳細については、記事「Power BI で埋め込み」を参照してください。

アプリケーションを設定するには、PowerBI-Developer-Samples GitHub リポジトリに移動し、User Owns Data セクションの指示に従います (integrate-web-app サブセクションのリダイレクト URL およびホーム ページ URL を使用します)。 ダッシュボードの例を使用しているため、GitHub リポジトリにある integrate-web-app サンプル コードを使用します。 ブラウザーでアプリケーションが実行されるようになったら、次の手順に従って、先ほど作成したダッシュボードを Web ページに埋め込みます。

  1. [Power BI にサインイン] を選択します。これにより、Power BI アカウントのダッシュボードへのアクセスがアプリケーションに許可されます。

  2. [ダッシュボードの取得] ボタンを選択します。これにより、アカウントのダッシュボードがテーブルに表示されます。 前に作成したダッシュボードの名前 powerbi-embedded-dashboar を見つけて、対応する EmbedUrl をコピーします。

  3. 最後に、その EmbedUrl を対応するテキスト フィールドに貼り付け、[Embed Dashboard](ダッシュボードの埋め込み) を選択します。 これで、Web アプリケーション内に埋め込まれた同じダッシュボードを表示できるようになりました。

次のステップ

このチュートリアルでは、簡単な Stream Analytics ジョブを作成し、着信データを分析して、結果を Power BI ダッシュボードに表示しました。 Stream Analytics ジョブの詳細については、次のチュートリアルに進んでください。