Aggiornare o unire record in database SQL di Azure con Funzioni di Azure
Attualmente Analisi di flusso di Azure supporta solo l'inserimento (accodamento) di righe agli output SQL (database SQL di Azure e Azure Synapse Analytics). Questo articolo illustra le soluzioni alternative per abilitare UPDATE, UP edizione Standard RT o MERGE nei database SQL, con Funzioni di Azure come livello intermedio.
Le opzioni alternative per Funzioni di Azure vengono presentate alla fine.
Requisito
La scrittura di dati in una tabella può essere in genere eseguita nel modo seguente:
Modalità | Istruzione T-SQL equivalente | Requisiti |
---|---|---|
Aggiunta | INSERT … | None |
Sostituzione | MERGE (UP edizione Standard RT) | Chiave univoca |
Accumulare | MERGE (UP edizione Standard RT) con operatore di assegnazione composta (+= , -= ...) |
Chiave univoca e identificatore |
Per illustrare le differenze, è possibile esaminare cosa accade quando si inseriscono i due record seguenti:
Arrival_Time | Device_Id | Measure_Value |
---|---|---|
10.00 | A | 1 |
10:05 | A | 20 |
In modalità di accodamento vengono inseriti i due record. L'istruzione T-SQL equivalente è:
INSERT INTO [target] VALUES (...);
Risultato:
Modified_Time | Device_Id | Measure_Value |
---|---|---|
10.00 | A | 1 |
10:05 | A | 20 |
In modalità di sostituzione si ottiene solo l'ultimo valore per chiave. In questo caso si userà Device_Id come chiave. L'istruzione T-SQL equivalente è:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Risultato:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 20 |
Infine, in modalità di accumulo si somma Value
con un operatore di assegnazione composta (+=
). Qui si userà anche Device_Id come chiave:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Risultato:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | Un | 21 |
Per considerazioni sulle prestazioni , gli adattatori di output del database SQL asa supportano attualmente solo la modalità di accodamento in modo nativo. Questi adattatori usano l'inserimento bulk per ottimizzare la velocità effettiva e limitare la pressione.
Questo articolo illustra come usare Funzioni di Azure per implementare le modalità Replace e Accumulate per ASA. Usando una funzione come livello intermedio, le potenziali prestazioni di scrittura non influiscono sul processo di streaming. A questo proposito, l'uso di Funzioni di Azure funzionerà meglio con Azure SQL. Con Synapse SQL, il passaggio da istruzioni bulk a righe per riga può creare problemi di prestazioni maggiori.
output Funzioni di Azure
Nel processo si sostituirà l'output di ASA SQL con l'output del Funzioni di Azure asa. Le funzionalità UPDATE, UP edizione Standard RT o MERGE verranno implementate nella funzione .
Attualmente sono disponibili due opzioni per accedere a un database SQL in una funzione. Il primo è l'associazione di output sql di Azure. Attualmente è limitato a C# e offre solo la modalità di sostituzione. In secondo luogo, creare una query SQL da inviare tramite il driver SQL appropriato (Microsoft.Data.SqlClient per .NET).
Per entrambi gli esempi seguenti, si presuppone lo schema della tabella seguente. L'opzione di associazione richiede l'impostazione di una chiave primaria nella tabella di destinazione. Non è necessario, ma consigliato, quando si usa un driver SQL.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Una funzione deve soddisfare le aspettative seguenti da usare come output da ASA:
- Analisi di flusso di Azure prevede lo stato HTTP 200 dall'app Funzioni per batch elaborati correttamente
- Quando Analisi di flusso di Azure riceve un'eccezione 413 ("http Request Entity Too Large") da una funzione di Azure, riduce le dimensioni dei batch inviati alla funzione di Azure
- Durante la connessione di test, Analisi di flusso invia una richiesta POST con un batch vuoto a Funzioni di Azure e prevede lo stato HTTP 20x per convalidare il test
Opzione 1: Aggiornare per chiave con l'associazione SQL della funzione di Azure
Questa opzione usa l'associazione di output SQL della funzione di Azure. Questa estensione può sostituire un oggetto in una tabella, senza dover scrivere un'istruzione SQL. Al momento, non supporta operatori di assegnazione composti (accumuli).
Questo esempio è stato compilato in:
- Funzioni di Azure runtime versione 4
- .NET 6.0
- Microsoft.Azure.WebJobs.Extensions.Sql 0.1.131-preview
Per comprendere meglio l'approccio di associazione, è consigliabile seguire questa esercitazione.
Prima di tutto, creare un'app per le funzioni HttpTrigger predefinita seguendo questa esercitazione. Verranno usate le informazioni seguenti:
- Lingua:
C#
- Runtime:
.NET 6
(in funzione/runtime v4) - Modello:
HTTP trigger
Installare l'estensione di associazione eseguendo il comando seguente in un terminale che si trova nella cartella del progetto:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Aggiungere l'elemento nella sezione dell'oggetto SqlConnectionString
Values
local.settings.json
, inserendo il stringa di connessione del server di destinazione:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Sostituire l'intera funzione (file con estensione cs nel progetto) con il frammento di codice seguente. Aggiornare lo spazio dei nomi, il nome della classe e il nome della funzione in base al proprio:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Aggiornare il nome della tabella di destinazione nella sezione di associazione:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Aggiornare la sezione di Device
classe e mapping in modo che corrisponda al proprio schema:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
È ora possibile testare il collegamento tra la funzione locale e il database eseguendo il debug (F5 in VS Code). Il database SQL deve essere raggiungibile dal computer. SSMS può essere usato per controllare la connettività. È quindi possibile usare uno strumento come Postman per inviare richieste POST all'endpoint locale. Una richiesta con un corpo vuoto deve restituire http 204. Una richiesta con un payload effettivo deve essere mantenuta nella tabella di destinazione (in modalità replace/update). Ecco un payload di esempio corrispondente allo schema usato in questo esempio:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
La funzione può ora essere pubblicata in Azure. È necessario impostare un'impostazione dell'applicazione per SqlConnectionString
. Il firewall di SQL Server di Azure deve consentire ai servizi di Azure in per la funzione live di raggiungerlo.
La funzione può quindi essere definita come output nel processo ASA e usata per sostituire i record anziché inserirli.
Opzione 2: eseguire il merge con l'assegnazione composta (accumulare) tramite una query SQL personalizzata
Nota
Al riavvio e al ripristino, AsA può inviare nuovamente gli eventi di output già generati. Si tratta di un comportamento previsto che può causare l'esito negativo della logica di accumulo (raddoppiando i singoli valori). Per evitare questo problema, è consigliabile restituire gli stessi dati in una tabella tramite l'output SQL asa nativo. Questa tabella di controllo può quindi essere usata per rilevare i problemi e sincronizzare nuovamente l'accumulo quando necessario.
Questa opzione usa Microsoft.Data.SqlClient. Questa libreria consente di inviare query SQL a un database SQL.
Questo esempio è stato compilato in:
- Funzioni di Azure runtime versione 4
- .NET 6.0
- Microsoft.Data.SqlClient 4.0.0
Prima di tutto, creare un'app per le funzioni HttpTrigger predefinita seguendo questa esercitazione. Verranno usate le informazioni seguenti:
- Lingua:
C#
- Runtime:
.NET 6
(in funzione/runtime v4) - Modello:
HTTP trigger
Installare la libreria SqlClient eseguendo il comando seguente in un terminale che si trova nella cartella del progetto:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Aggiungere l'elemento nella sezione dell'oggetto SqlConnectionString
Values
local.settings.json
, inserendo il stringa di connessione del server di destinazione:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Sostituire l'intera funzione (file con estensione cs nel progetto) con il frammento di codice seguente. Aggiornare lo spazio dei nomi, il nome della classe e il nome della funzione in base al proprio:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Aggiornare la sqltext
sezione di compilazione dei comandi in modo che corrisponda al proprio schema (si noti come l'accumulo viene ottenuto tramite l'operatore all'aggiornamento +=
):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
È ora possibile testare il collegamento tra la funzione locale e il database eseguendo il debug (F5 in VS Code). Il database SQL deve essere raggiungibile dal computer. SSMS può essere usato per controllare la connettività. È quindi possibile usare uno strumento come Postman per inviare richieste POST all'endpoint locale. Una richiesta con un corpo vuoto deve restituire http 204. Una richiesta con un payload effettivo deve essere salvata in modo permanente nella tabella di destinazione (in modalità di accumulo/unione). Ecco un payload di esempio corrispondente allo schema usato in questo esempio:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
La funzione può ora essere pubblicata in Azure. È necessario impostare un'impostazione dell'applicazione per SqlConnectionString
. Il firewall di SQL Server di Azure deve consentire ai servizi di Azure in per la funzione live di raggiungerlo.
La funzione può quindi essere definita come output nel processo ASA e usata per sostituire i record anziché inserirli.
Alternative
Al di fuori di Funzioni di Azure, esistono diversi modi per ottenere il risultato previsto. Di seguito verranno menzionate le soluzioni più probabili.
Post-elaborazione nel database SQL di destinazione
Un'attività in background funzionerà dopo l'inserimento dei dati nel database tramite gli output STANDARD di ASA SQL.
Per Azure SQL, INSTEAD OF
i trigger DML possono essere usati per intercettare i comandi IN edizione Standard RT emessi da ASA:
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
Per Synapse SQL, ASA può inserire in una tabella di staging. Un'attività ricorrente può quindi trasformare i dati in base alle esigenze in una tabella intermedia. Infine, i dati vengono spostati nella tabella di produzione.
Pre-elaborazione in Azure Cosmos DB
Azure Cosmos DB supporta UP edizione Standard RT in modo nativo. Qui è possibile solo accodamento/sostituzione. Gli accumuli devono essere gestiti sul lato client in Azure Cosmos DB.
Se i requisiti corrispondono, un'opzione consiste nel sostituire il database SQL di destinazione da un'istanza di Azure Cosmos DB. In questo modo è necessaria una modifica importante nell'architettura complessiva della soluzione.
Per Synapse SQL, Azure Cosmos DB può essere usato come livello intermedio tramite Azure Collegamento a Synapse per Azure Cosmos DB. Collegamento a Synapse può essere usato per creare un archivio analitico. Questo archivio dati può quindi essere sottoposto a query direttamente in Synapse SQL.
Confronto delle alternative
Ogni approccio offre proposte e funzionalità di valore diverse:
Type | Opzione | Modalità | database SQL di Azure | Azure Synapse Analytics |
---|---|---|---|---|
Post-elaborazione | ||||
Trigger | Sostituisci, Accumula | + | N/D, i trigger non sono disponibili in Synapse SQL | |
Staging | Sostituisci, Accumula | + | + | |
Pre-elaborazione | ||||
Funzioni di Azure | Sostituisci, Accumula | + | - (prestazioni riga per riga) | |
Sostituzione di Azure Cosmos DB | Sostituzione | N/D | N/D | |
Collegamento a Synapse per Azure Cosmos DB | Sostituzione | N/D | + |
Ottenere supporto
Per maggiore supporto, provare la Pagina delle domande di Domande e risposte Microsoft per Analisi di flusso di Azure.
Passaggi successivi
- Informazioni sugli output di Analisi di flusso di Azure
- Output di Analisi di flusso di Azure nel database SQL di Azure
- Aumentare le prestazioni della velocità effettiva per il database SQL di Azure da Analisi di flusso di Azure
- Usare le identità gestite per accedere a database SQL di Azure o Azure Synapse Analytics da un processo di Analisi di flusso di Azure
- Usare i dati di riferimento di un database SQL per un processo di Analisi di flusso di Azure
- Eseguire Funzioni di Azure nei processi di Analisi di flusso di Azure - Esercitazione per l'output di Redis
- Avvio rapido: creare un processo di Analisi di flusso di Azure tramite il portale di Azure