Analytique en temps quasi réel dans Azure SQL Data Warehouse

Publié le 25 septembre, 2018

Program Manager II, Azure SQL Data Warehouse

Étant donné que le rythme de création de données augmente avec les scénarios et les flux de données IoT, la demande pour un accès plus rapide aux données pour analyse ne cesse de grandir. Cette demande pour une analytique en temps quasi réel est observée dans tous les segments de l’industrie. Par exemple, les géants de la distribution changent les prix en temps réel, les usines de fabrication utilisent la détection des anomalies pour prévoir les problèmes éventuels sur la ligne d’assemblage avant qu’ils n’arrivent, les compagnies pétrolières et minières se servent de relevés de capteur pour foreuse dernier cri afin de savoir précisément ce qui se passe à plusieurs centaines de mètres de profondeur, sous la surface de la terre, pendant que cela arrive. Grâce à tous nos clients, nous avons pu observer les incroyables avantages de l’analytique en temps quasi réel.

Aujourd’hui, nous sommes ravis d’annoncer l’ajout de fonctionnalités d’analytique en temps quasi réel dans Azure SQL Data Warehouse. Cette architecture est rendue possible grâce à la préversion publique de l’ingestion de flux dans SQL DW à partir des trames de données de diffusion en continu Azure Databricks.

Flux structuré dans Azure Databricks

Azure Databricks est un service cloud entièrement managé de Microsoft qui exécute Databricks Runtime. Le service fournit une implémentation d’Azure Spark de classe Entreprise sur Azure. Le flux structuré dans Apache Spark permet aux utilisateurs de définir une requête sur un flux de données de manière évolutive et tolérante aux pannes.

Le flux structuré est une manière évolutive et tolérante aux pannes d’exécuter les requêtes sur des flux de données. La trame de données de diffusion en continu est une table indépendante et les nouvelles données du flux sont ajoutées à la table. Les requêtes peuvent être exécutées dans les sections ajoutées et dans toute la table.

Flux de données

Azure SQL Data Warehouse en tant que récepteur de sortie

Bien que les requêtes de diffusion en continu soient une excellente façon de répondre aux questions évidentes sur un flux de données, notamment concernant les valeurs moyennes, minimales et maximales au fil du temps, elles ne permettent pas aux analystes en aval d’accéder aux données en temps quasi réel. Pour cela, vous aurez besoin de placer les données dans un entrepôt SQL Data Warehouse aussi vite que possible, de sorte que les analystes puissent interroger, visualiser et interpréter les données en temps quasi réel grâce à des outils comme PowerBI.

Exemple : exemple de flux de fréquence dans SQL DW

Ceci est un exemple simple qui explique comment marche cette fonction. Le flux structuré dispose d’un générateur de données qui produit un timestamp et une valeur à une fréquence par seconde donnée. Nous utiliserons ce mécanisme pour créer un exemple de diffusion simple.

Premièrement, vous devez créer une table de récepteurs dans votre entrepôt SQL Data Warehouse. Comme mentionné précédemment, le flux de fréquence générera un timestamp et une valeur que nous pourrons utiliser comme schéma de notre table. Pour cela, ouvrez l’outil de gestion de base de données de votre choix et créez la table suivante :

```sql
CREATE TABLE [dbo].[Stream_CI]
(
   [timestamp] DATETIME NULL,
   [Value] BIGINT NULL
)
WITH
(
   DISTRIBUTION = ROUND_ROBIN,
   CLUSTERED INDEX ([timestamp])
)
```
Une table avec une distribution ROUND_ROBIN (tourniquet) et un CLUSTERED INDEX (index de cluster) sur Timestamp offre le meilleur compromis entre la vitesse d’ingestion et les performances de requête pour la diffusion en continu de données dans un entrepôt SQL Data Warehouse. 

Maintenant que nous avons notre table de récepteurs dans SQL DW, jetons un coup d’œil à la partie d’Azure Databricks. Nous avons écrit la totalité du code ci-dessous en Python, mais la même fonction de langage est disponible en R, Python et Scala.

La première chose que vous aurez besoin de faire est de configurer la connexion à votre entrepôt SQL DW là où vous venez de créer la table, ainsi qu’un compte de stockage Azure.

```python
# Paramètres relatifs à SQL DW (utilisés pour configurer la connexion à l’instance SQL DW)
dwDatabase = <databaseName>
dwServer = <servername>
dwUser = <sqlUser>
dwPass = <sqlUserPassword>
dwJdbcPort =  "1433"
dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;loginTimeout=30;"
sqlDwUrl = "jdbc:sqlserver://" + dwServer + ".database.windows.net:" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";"+dwJdbcExtraOptions
# Paramètres relatifs au Stockage Blob (utilisés pour le stockage temporaire)
# La valeur est l’URL du compte de stockage au format <accountName>.blob.core.windows.net
blobStorage = <blobStorageAccount>
blobContainer = <storageContainer>
blobAccessKey =  <accessKey>
# Configurez la clé d’accès au compte de Stockage Blob dans le fichier conf de la session notebook.
spark.conf.set(
   "fs.azure.account.key."+blobStorage ,
   blobAccessKey)
```

Dans ce bloc de code, nous utilisons des noms d’utilisateur et des mots de passe pour une livraison plus simple et plus rapide, bien que ce ne soit pas une meilleure pratique. Nous recommandons d’utiliser des étendues de secrets pour stocker les secrets de manière sécurisée dans Azure Key Vault.

Maintenant que nous pouvons nous connecter à notre entrepôt SQL DW et à notre compte de stockage à partir d’Azure Databricks, nous allons créer un flux de lecture et écrire la sortie dans notre entrepôt SQL DW.

Les paramètres les plus importants pour le flux de fréquence sont les paramètres rowsPerSecond et numPartitions. Le paramètre rowsPerSecond spécifie le nombre d’événements par seconde que le système essaiera de créer. Le paramètre numPartitions spécifie le nombre de partitions allouées à la création de lignes. Si le paramètre rowsPerSecond est élevé alors que le système ne génère pas assez de données, essayez d’utiliser plus de partitions.

```python
# Préparez la source de diffusion en continu
df = spark.readStream \
   .format("rate") \
   .option("rowsPerSecond", "10000") \
   .option("numPartitions", "5") \
   .load()
```

Pour écrire le readStream dans l’entrepôt SQL DW, nous devons utiliser le format « com.databricks.spark.sqldw ». Cette option de format est intégrée au runtime Databricks et elle est disponible dans tous les clusters qui exécutent Databricks 4.3 ou version ultérieure.

```python
# API de flux structuré pour écrire les données de manière continue dans une table dans l’entrepôt SQL DW.
df.writeStream \
   .format("com.databricks.spark.sqldw") \
   .option("url", sqlDwUrl) \
   .option("tempDir", "wasbs://"+blobContainer+ "@" + blobStorage + "/tmpdir/stream") \
   .option("forwardSparkAzureStorageCredentials", "true") \
   .option("dbTable", "Stream_CI") \
   .option("checkpointLocation", "/checkpoint") \
   .trigger(processingTime="30 seconds") \
   .start()
```

Cette instruction traite le rateStream toutes les 30 secondes. Les données sont collectées à partir du flux et écrites dans l’emplacement de stockage temporaire défini par le paramètre tempDir. Une fois que les fichiers sont arrivés à l’emplacement, SQL DW charge les données dans la table spécifiée à l’aide de PolyBase. À la fin du chargement, les données sur votre emplacement de stockage sont supprimées afin de s’assurer que les données ne seront lues qu’une fois.
Maintenant que le flux génère des données et les écrit dans l’entrepôt SQL DW, nous pouvons vérifier cela en interrogeant les données dans l’entrepôt SQL DW.

```sql
SELECT COUNT(Value), DATEPART(mi,[timestamp]) AS [event_minute]
FROM Stream_ci
GROUP BY DATEPART(mi,[timestamp])
ORDER BY 2
```

Ainsi, vous pouvez commencer à interroger vos données de diffusion en continu dans SQL DW.

Conclusion

Dans ce blog, nous avons abordé un exemple simple montrant comment vous pouvez utiliser les capacités du flux structuré d’Azure Databricks et SQL Data Warehouse pour permettre une analytique en temps quasi réel. L’exemple utilisait un rateStream et une commande d’écriture simple à des fins de simplicité, mais vous pourriez aisément utiliser les flux Kafka comme source de données ainsi que des agrégations de fenêtres bascule spécifiques à l’entreprise.

Pour en savoir plus :