IoT シナリオとデータのストリームによるデータ作成のペースの加速に伴い、分析対象データへのより高速なアクセスに対するニーズがますます高まっています。このようなほぼリアルタイムの分析に対するニーズは、リアルタイムに価格を変更する巨大小売店から、組立ラインで発生する前に潜在的な問題を判別するために異常検出を使用する製造工場、事象の発生時に地下数百フィートで何が起こっているかを正確に把握するためにハイテク ドリル センサー測定値を使用する炭鉱およびガス会社にいたるまで、あらゆる業界分野全体で見られます。すべてのお客様にとって、ほぼリアルタイムの分析による利点は非常に大きなものであることは認識しています。
本日、Azure SQL Data Warehouse のほぼリアルタイムの分析機能を発表いたします。このアーキテクチャは、Azure Databricks Streaming Dataframes から SQL DW へのストリーミング インジェストのパブリック プレビューで可能になります。
Azure Databricks の構造化ストリーミング
Azure Databricks は、Databricks Runtime を実行する Microsoft のフル マネージド クラウド サービスです。このサービスでは、Azure でのエンタープライズ レベルの Apache Spark 実装が提供されます。Apache Spark での構造化ストリーミングを使用することで、ユーザーはスケーラブルかつフォールト トレラントな方法でデータ ストリームに対するクエリを定義できます。
構造化ストリーミングは、データ ストリームに対してクエリを実行するためのスケーラブルかつフォールト トレラントな方法です。ストリーミング データフレームは非バインド テーブルであり、ストリームからの新しいデータはそのテーブルに追加されます。クエリは、追加されたセクションに対して、またテーブル全体で実行できます。
出力シンクとしての Azure SQL Data Warehouse
ストリーミング クエリは経時的な平均値、最小値、最大値のようなデータ ストリームに関する簡単な質問に答えるのに優れていますが、ダウンストリーム分析でほぼリアルタイムのデータにアクセスすることはできません。これを実現するには、できるだけ早く SQL Data Warehouse にデータを取り込む必要があります。そうすることで、アナリストは PowerBI などのツールを使用してほぼリアルタイムのデータをクエリ、可視化、および解釈することができます。
例: SQL DW へのレート ストリームの例
これは、機能のしくみを説明するためのシンプルな例です。構造化ストリームには、タイムスタンプと、特定のレート (秒単位) の値を生成するデータ ジェネレーターがあります。ここではこのメカニズムを使用して、シンプルなストリーミングの例を作成します。
まず、SQL Data Warehouse でシンク テーブルを作成する必要があります。上記のとおり、レート ストリームではタイムスタンプと値が生成されるため、それをテーブルのスキーマとして使用できます。そのためには、任意のデータベース管理ツールを開き、以下のテーブルを作成します。
```sql CREATE TABLE [dbo].[Stream_CI] ( [timestamp] DATETIME NULL, [Value] BIGINT NULL ) WITH ( DISTRIBUTION = ROUND_ROBIN, CLUSTERED INDEX ([timestamp]) ) ``` ディストリビューションが ROUND_ROBIN でタイムスタンプが CLUSTERED INDEX のテーブルは、SQL Data Warehouse でのストリーミング データに関するインジェストの速度とクエリのパフォーマンスの両方に最適です。
これで、SQL DW のシンク テーブルが用意できました。次は、Azure Databricks の部分を見てみましょう。以下のコードはすべて Python で記述しましたが、R、Python、および Scala でも同じ機能を使用できます。
まず、先ほどテーブルと Azure Storage アカウントを作成した、SQL DW への接続を設定する必要があります。
```python # SQL DW に関連する設定 (SQL DW インスタンスへの接続を設定するために使用) dwDatabase = <databaseName> dwServer = <servername> dwUser = <sqlUser> dwPass = <sqlUserPassword> dwJdbcPort = "1433" dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;loginTimeout=30;" sqlDwUrl = "jdbc:sqlserver://" + dwServer + ".database.windows.net:" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";"+dwJdbcExtraOptions # Blob Storage に関する設定 (一時ストレージで使用) # 値は、<accountName>.blob.core.windows.net という形式のストレージ アカウント URL です blobStorage = <blobStorageAccount> blobContainer = <storageContainer> blobAccessKey = <accessKey> # ノートブック セッション構成で BLOB ストレージ アカウントのアクセス キーを設定します spark.conf.set( "fs.azure.account.key."+blobStorage , blobAccessKey) ```
このコード ブロックでは、わかりやすくするため、また配信の高速化のためにユーザー名とパスワードを使用しますが、これはベスト プラクティスではありません。シークレット スコープを使用して、Azure Key Vault でシークレットを安全に格納することをお勧めします。
これで、Azure Databricks から SQL DW とストレージ アカウントの両方に接続できるようになりました。次は、読み取りストリームを作成して、SQL DW に出力を書き込みます。
レート ストリームで最も重要なパラメーターは、rowsPerSecond と the numPartitions です。rowsPerSecond では、システムで作成が試行される 1 秒あたりのイベント数が指定されます。numPartitions では、行の作成に割り当てられるパーティションの数が指定されます。rowsPerSecond の数が大きく、システムで十分なデータが生成されない場合は、パーティションを増やして使用してみてください。
```python # ストリーミング ソースを準備します df = spark.readStream .format("rate") .option("rowsPerSecond", "10000") .option("numPartitions", "5") .load() ```
readStream を SQL DW に書き込むには、"com.databricks.spark.sqldw" という形式を使用する必要があります。この形式のオプションは DataBricks ランタイムに組み込まれており、Databricks 4.3 以上を実行するすべてのクラスターで利用できます。
```python # SQL DW のテーブルにデータを継続的に書き込むための構造化ストリーミング API。 df.writeStream .format("com.databricks.spark.sqldw") .option("url", sqlDwUrl) .option("tempDir", "wasbs://"+blobContainer+ "@" + blobStorage + "/tmpdir/stream") .option("forwardSparkAzureStorageCredentials", "true") .option("dbTable", "Stream_CI") .option("checkpointLocation", "/checkpoint") .trigger(processingTime="30 seconds") .start() ```
このステートメントでは、30 秒ごとに rateStream が処理されます。データはストリームから取得され、tempDir パラメーターで定義された一時的なストレージの場所に書き込まれます。ファイルがこの場所に配置されると、SQL DW によって、PolyBase を使用して指定されたテーブルにデータが読み込まれます。読み込みが完了した後、ストレージの場所にあるデータは削除され、データが確実に一度だけ読み取られるようになります。
これで、ストリームでデータが生成され、SQL DW に書き込まれるようになったため、SQL DW のデータに対してクエリを実行して確認できます。
```sql SELECT COUNT(Value), DATEPART(mi,[timestamp]) AS [event_minute] FROM Stream_ci GROUP BY DATEPART(mi,[timestamp]) ORDER BY 2 ```
このように SQL DW のストリーミング データに対するクエリの実行を開始できます。
まとめ
このブログでは、Azure Databricks の構造化ストリーミング機能と SQL Data Warehouse を使用して、ほぼリアルタイムに分析できるようにする方法のシンプルな例について説明しました。この例では、わかりやすくするためにシンプルな書き込みコマンドと rateStream を使用しましたが、データ ソースとして Kafka ストリームを簡単に使用し、ビジネス固有のタンブリング ウィンドウ集計を利用することもできました。
詳細については、以下を参照してください。