• 4 min read

Análisis casi en tiempo real en Azure SQL Data Warehouse

A medida que aumenta el ritmo de creación de datos con los escenarios de IoT y los flujos de datos, aumenta también la demanda de un acceso más rápido a los datos para analizarlos.

A medida que aumenta el ritmo de creación de datos con los escenarios de IoT y los flujos de datos, aumenta también la demanda de un acceso más rápido a los datos para analizarlos. Esta demanda de análisis casi en tiempo real se ve en todos los sectores, desde los gigantes de la venta minorista que cambian los precios en tiempo real hasta las plantas manufactureras que utilizan detección de anomalías para determinar posibles problemas en la cadena de montaje antes de que ocurran, así como las compañías mineras y de gas que utilizan lecturas de sensores de perforación de alta tecnología para saber con exactitud lo que está ocurriendo a cientos de metros de profundidad en cada momento. En todos nuestros clientes hemos observado que las ventajas de un análisis en tiempo real pueden ser enormes.

Hoy nos complace anunciar la funcionalidad analítica casi en tiempo real de Azure SQL Data Warehouse. Esta arquitectura es posible con la versión preliminar pública de Streaming Ingestion en SQL DW a partir de marcos de datos de streaming de Azure Databricks.

Structured Streaming en Azure Databricks

Azure Databricks es un servicio de Microsoft en la nube totalmente administrado que ejecuta Databricks Runtime. El servicio proporciona una implementación de Apache Spark para empresas en Azure. Structured Streaming en Apache Spark permite a los usuarios definir una consulta para un flujo de datos de una forma escalable y con tolerancia a errores.

Structured Streaming es una forma escalable y tolerante a errores de ejecutar consultas en flujos de datos. El marco de datos de streaming es una tabla independiente y los nuevos datos del flujo se anexan a la tabla. Se pueden ejecutar consultas tanto en las secciones anexadas como en toda la tabla.

Flujo de datos

Azure SQL Data Warehouse como receptor de salida

Si bien las consultas de streaming son ideales para responder a preguntas obvias de un flujo de datos, como cuáles son los valores medio, mínimo y máximo con el tiempo, no permite que los analistas del canal de bajada tengan acceso a los datos casi en tiempo real. Para lograr esto, deberá obtener los datos en SQL Data Warehouse lo más rápido posible, de modo que los analistas puedan consultar, visualizar e interpretar los datos casi en tiempo real con herramientas como PowerBI.

Ejemplo de flujo de velocidad en SQL DW

Este es un sencillo ejemplo para explicar esta funcionalidad. Structured Stream tiene un generador de datos que produce una marca de tiempo y un valor a una velocidad determinada por segundo. Usaremos este mecanismo para crear un sencillo ejemplo de streaming.

Lo primero que hay que hacer es crear una tabla receptora en SQL Data Warehouse. Como hemos dicho, el flujo de velocidad generará una marca de tiempo y un valor, así que podremos usar esto como esquema para nuestra tabla. Para ello, abra la herramienta de administración de bases de datos que prefiera y cree la siguiente tabla:

```sql
CREATE TABLE [dbo].[Stream_CI]
(
   [timestamp] DATETIME NULL,
   [Value] BIGINT NULL
)
WITH
(
   DISTRIBUTION = ROUND_ROBIN,
   CLUSTERED INDEX ([timestamp])
)
```
Una tabla con una distribución ROUND_ROBIN y CLUSTERED INDEX en la marca de tiempo proporciona el mejor equilibrio entre velocidad de ingesta y rendimiento de las consultas para los datos de streaming en una instancia de SQL Data Warehouse. 

Ahora que ya tenemos nuestra tabla receptora en SQL DW, veamos la parte de Azure Databricks. Hemos escrito todo el código que sigue en Python, pero la misma funcionalidad está disponible en R, Python y Scala.

Lo primero que quiere hacer es configurar la conexión a la instancia de SQL DW donde acaba de crear la tabla y una cuenta de Azure Storage.

```python
# Configuración de SQL DW relacionada (se usa para configurar la conexión a la instancia de SQL DW)
dwDatabase = <nombreBaseDatos>
dwServer = <nombreServidor>
dwUser = <usuarioSQL>
dwPass = <contraseñaUsuarioSQL>
dwJdbcPort =  "1433"
dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;loginTimeout=30;"
sqlDwUrl = "jdbc:sqlserver://" + dwServer + ".database.windows.net:" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";"+dwJdbcExtraOptions
# Configuración de Blob Storage relacionada (se usa para almacenamiento temporal)
# El valor es la dirección URL de la cuenta de almacenamiento con el formato <nombreCuenta>.blob.core.windows.net
blobStorage = <cuentaBlobStorage>
blobContainer = <contenedorStorage>
blobAccessKey =  <claveAcceso>
# Configure la clave de acceso de la cuenta de Blob Storage en la configuración de la sesión del cuaderno
spark.conf.set(
   "fs.azure.account.key."+blobStorage ,
   blobAccessKey)
```

En este bloque de código, se utilizan nombres de usuario y contraseñas por simplicidad y velocidad de entrega, pero no es un procedimiento recomendado. Se recomienda usar ámbitos de secretos para almacenar los secretos de forma segura en Azure Key Vault.

Ahora que ya nos podemos conectar tanto a SQL DW como a la cuenta de Storage Account desde Azure Databricks, vamos a crear un flujo de lectura y escribir la salida en la instancia de SQL DW.

Los parámetros más importantes para el flujo de velocidad son rowsPerSecond y numPartitions. rowsPerSecond especifica cuántos eventos por segundo intentará crear el sistema. numPartitions especifica cuántas particiones se asignan para crear las filas. Si el valor de rowsPerSecond es alto y el sistema no genera suficientes datos, pruebe usando más particiones.

```python
# Prepare el origen de streaming
df = spark.readStream 
   .format("rate") 
   .option("rowsPerSecond", "10000") 
   .option("numPartitions", "5") 
   .load()
```

Para escribir el valor de readStream en SQL DW, debe usarse el formato "com.databricks.spark.sqldw". Esta opción de formato está integrada en DataBricks Runtime y está disponible en todos los clústeres que ejecutan Databricks 4.3 o posterior.

```python
# API Structured Streaming para escribir los datos de forma continua en una tabla en SQL DW.
df.writeStream 
   .format("com.databricks.spark.sqldw") 
   .option("url", sqlDwUrl) 
   .option("tempDir", "wasbs://"+blobContainer+ "@" + blobStorage + "/tmpdir/stream") 
   .option("forwardSparkAzureStorageCredentials", "true") 
   .option("dbTable", "Stream_CI") 
   .option("checkpointLocation", "/checkpoint") 
   .trigger(processingTime="30 seconds") 
   .start()
```

Esta instrucción procesa el parámetro rateStream cada 30 segundos. Los datos se toman del flujo y se escriben en la ubicación de almacenamiento temporal definida por el parámetro tempDir. Una vez que los archivos están en esta ubicación, SQL DW carga los datos en la tabla especificada usando PolyBase. Cuando finaliza la carga, los datos de la ubicación de almacenamiento se eliminan para asegurar que se lean solo una vez.
Ahora que el flujo ya está generando datos y escribiéndolos en SQL DW, podemos comprobarlo con una consulta de los datos en SQL DW.

```sql
SELECT COUNT(Value), DATEPART(mi,[timestamp]) AS [event_minute]
FROM Stream_ci
GROUP BY DATEPART(mi,[timestamp])
ORDER BY 2
```

De esta misma manera puede comenzar a consultar sus datos de streaming en SQL DW.

Conclusión

En este blog, hemos visto un ejemplo sencillo de cómo se puede utilizar la funcionalidad de Structured Streaming de Azure Databricks y SQL Data Warehouse para disponer de análisis casi en tiempo real. El ejemplo usa un parámetro rateStream y un sencillo comando de escritura por simplicidad, pero puede usar fácilmente flujos de Kafka como origen de datos y agregaciones de ventanas de saltos de tamaño constante específicas para su negocio.

Para obtener más información: