Share via


Aggiornare o unire record in database SQL di Azure con Funzioni di Azure

Attualmente Analisi di flusso di Azure supporta solo l'inserimento (accodamento) di righe agli output SQL (database SQL di Azure e Azure Synapse Analytics). Questo articolo illustra le soluzioni alternative per abilitare UPDATE, UP edizione Standard RT o MERGE nei database SQL, con Funzioni di Azure come livello intermedio.

Le opzioni alternative per Funzioni di Azure vengono presentate alla fine.

Requisito

La scrittura di dati in una tabella può essere in genere eseguita nel modo seguente:

Modalità Istruzione T-SQL equivalente Requisiti
Aggiunta INSERT None
Sostituzione MERGE (UP edizione Standard RT) Chiave univoca
Accumulare MERGE (UP edizione Standard RT) con operatore di assegnazione composta (+=, -=...) Chiave univoca e identificatore

Per illustrare le differenze, è possibile esaminare cosa accade quando si inseriscono i due record seguenti:

Arrival_Time Device_Id Measure_Value
10.00 A 1
10:05 A 20

In modalità di accodamento vengono inseriti i due record. L'istruzione T-SQL equivalente è:

INSERT INTO [target] VALUES (...);

Risultato:

Modified_Time Device_Id Measure_Value
10.00 A 1
10:05 A 20

In modalità di sostituzione si ottiene solo l'ultimo valore per chiave. In questo caso si userà Device_Id come chiave. L'istruzione T-SQL equivalente è:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Risultato:

Modified_Time Device_Key Measure_Value
10:05 A 20

Infine, in modalità di accumulo si somma Value con un operatore di assegnazione composta (+=). Qui si userà anche Device_Id come chiave:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Risultato:

Modified_Time Device_Key Measure_Value
10:05 Un 21

Per considerazioni sulle prestazioni , gli adattatori di output del database SQL asa supportano attualmente solo la modalità di accodamento in modo nativo. Questi adattatori usano l'inserimento bulk per ottimizzare la velocità effettiva e limitare la pressione.

Questo articolo illustra come usare Funzioni di Azure per implementare le modalità Replace e Accumulate per ASA. Usando una funzione come livello intermedio, le potenziali prestazioni di scrittura non influiscono sul processo di streaming. A questo proposito, l'uso di Funzioni di Azure funzionerà meglio con Azure SQL. Con Synapse SQL, il passaggio da istruzioni bulk a righe per riga può creare problemi di prestazioni maggiori.

output Funzioni di Azure

Nel processo si sostituirà l'output di ASA SQL con l'output del Funzioni di Azure asa. Le funzionalità UPDATE, UP edizione Standard RT o MERGE verranno implementate nella funzione .

Attualmente sono disponibili due opzioni per accedere a un database SQL in una funzione. Il primo è l'associazione di output sql di Azure. Attualmente è limitato a C# e offre solo la modalità di sostituzione. In secondo luogo, creare una query SQL da inviare tramite il driver SQL appropriato (Microsoft.Data.SqlClient per .NET).

Per entrambi gli esempi seguenti, si presuppone lo schema della tabella seguente. L'opzione di associazione richiede l'impostazione di una chiave primaria nella tabella di destinazione. Non è necessario, ma consigliato, quando si usa un driver SQL.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Una funzione deve soddisfare le aspettative seguenti da usare come output da ASA:

  • Analisi di flusso di Azure prevede lo stato HTTP 200 dall'app Funzioni per batch elaborati correttamente
  • Quando Analisi di flusso di Azure riceve un'eccezione 413 ("http Request Entity Too Large") da una funzione di Azure, riduce le dimensioni dei batch inviati alla funzione di Azure
  • Durante la connessione di test, Analisi di flusso invia una richiesta POST con un batch vuoto a Funzioni di Azure e prevede lo stato HTTP 20x per convalidare il test

Opzione 1: Aggiornare per chiave con l'associazione SQL della funzione di Azure

Questa opzione usa l'associazione di output SQL della funzione di Azure. Questa estensione può sostituire un oggetto in una tabella, senza dover scrivere un'istruzione SQL. Al momento, non supporta operatori di assegnazione composti (accumuli).

Questo esempio è stato compilato in:

Per comprendere meglio l'approccio di associazione, è consigliabile seguire questa esercitazione.

Prima di tutto, creare un'app per le funzioni HttpTrigger predefinita seguendo questa esercitazione. Verranno usate le informazioni seguenti:

  • Lingua: C#
  • Runtime: .NET 6 (in funzione/runtime v4)
  • Modello: HTTP trigger

Installare l'estensione di associazione eseguendo il comando seguente in un terminale che si trova nella cartella del progetto:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Aggiungere l'elemento nella sezione dell'oggetto SqlConnectionStringValueslocal.settings.json, inserendo il stringa di connessione del server di destinazione:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Sostituire l'intera funzione (file con estensione cs nel progetto) con il frammento di codice seguente. Aggiornare lo spazio dei nomi, il nome della classe e il nome della funzione in base al proprio:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Aggiornare il nome della tabella di destinazione nella sezione di associazione:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Aggiornare la sezione di Device classe e mapping in modo che corrisponda al proprio schema:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

È ora possibile testare il collegamento tra la funzione locale e il database eseguendo il debug (F5 in VS Code). Il database SQL deve essere raggiungibile dal computer. SSMS può essere usato per controllare la connettività. È quindi possibile usare uno strumento come Postman per inviare richieste POST all'endpoint locale. Una richiesta con un corpo vuoto deve restituire http 204. Una richiesta con un payload effettivo deve essere mantenuta nella tabella di destinazione (in modalità replace/update). Ecco un payload di esempio corrispondente allo schema usato in questo esempio:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

La funzione può ora essere pubblicata in Azure. È necessario impostare un'impostazione dell'applicazione per SqlConnectionString. Il firewall di SQL Server di Azure deve consentire ai servizi di Azure in per la funzione live di raggiungerlo.

La funzione può quindi essere definita come output nel processo ASA e usata per sostituire i record anziché inserirli.

Opzione 2: eseguire il merge con l'assegnazione composta (accumulare) tramite una query SQL personalizzata

Nota

Al riavvio e al ripristino, AsA può inviare nuovamente gli eventi di output già generati. Si tratta di un comportamento previsto che può causare l'esito negativo della logica di accumulo (raddoppiando i singoli valori). Per evitare questo problema, è consigliabile restituire gli stessi dati in una tabella tramite l'output SQL asa nativo. Questa tabella di controllo può quindi essere usata per rilevare i problemi e sincronizzare nuovamente l'accumulo quando necessario.

Questa opzione usa Microsoft.Data.SqlClient. Questa libreria consente di inviare query SQL a un database SQL.

Questo esempio è stato compilato in:

Prima di tutto, creare un'app per le funzioni HttpTrigger predefinita seguendo questa esercitazione. Verranno usate le informazioni seguenti:

  • Lingua: C#
  • Runtime: .NET 6 (in funzione/runtime v4)
  • Modello: HTTP trigger

Installare la libreria SqlClient eseguendo il comando seguente in un terminale che si trova nella cartella del progetto:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Aggiungere l'elemento nella sezione dell'oggetto SqlConnectionStringValueslocal.settings.json, inserendo il stringa di connessione del server di destinazione:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Sostituire l'intera funzione (file con estensione cs nel progetto) con il frammento di codice seguente. Aggiornare lo spazio dei nomi, il nome della classe e il nome della funzione in base al proprio:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Aggiornare la sqltext sezione di compilazione dei comandi in modo che corrisponda al proprio schema (si noti come l'accumulo viene ottenuto tramite l'operatore all'aggiornamento += ):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

È ora possibile testare il collegamento tra la funzione locale e il database eseguendo il debug (F5 in VS Code). Il database SQL deve essere raggiungibile dal computer. SSMS può essere usato per controllare la connettività. È quindi possibile usare uno strumento come Postman per inviare richieste POST all'endpoint locale. Una richiesta con un corpo vuoto deve restituire http 204. Una richiesta con un payload effettivo deve essere salvata in modo permanente nella tabella di destinazione (in modalità di accumulo/unione). Ecco un payload di esempio corrispondente allo schema usato in questo esempio:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

La funzione può ora essere pubblicata in Azure. È necessario impostare un'impostazione dell'applicazione per SqlConnectionString. Il firewall di SQL Server di Azure deve consentire ai servizi di Azure in per la funzione live di raggiungerlo.

La funzione può quindi essere definita come output nel processo ASA e usata per sostituire i record anziché inserirli.

Alternative

Al di fuori di Funzioni di Azure, esistono diversi modi per ottenere il risultato previsto. Di seguito verranno menzionate le soluzioni più probabili.

Post-elaborazione nel database SQL di destinazione

Un'attività in background funzionerà dopo l'inserimento dei dati nel database tramite gli output STANDARD di ASA SQL.

Per Azure SQL, INSTEAD OFi trigger DML possono essere usati per intercettare i comandi IN edizione Standard RT emessi da ASA:

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

Per Synapse SQL, ASA può inserire in una tabella di staging. Un'attività ricorrente può quindi trasformare i dati in base alle esigenze in una tabella intermedia. Infine, i dati vengono spostati nella tabella di produzione.

Pre-elaborazione in Azure Cosmos DB

Azure Cosmos DB supporta UP edizione Standard RT in modo nativo. Qui è possibile solo accodamento/sostituzione. Gli accumuli devono essere gestiti sul lato client in Azure Cosmos DB.

Se i requisiti corrispondono, un'opzione consiste nel sostituire il database SQL di destinazione da un'istanza di Azure Cosmos DB. In questo modo è necessaria una modifica importante nell'architettura complessiva della soluzione.

Per Synapse SQL, Azure Cosmos DB può essere usato come livello intermedio tramite Azure Collegamento a Synapse per Azure Cosmos DB. Collegamento a Synapse può essere usato per creare un archivio analitico. Questo archivio dati può quindi essere sottoposto a query direttamente in Synapse SQL.

Confronto delle alternative

Ogni approccio offre proposte e funzionalità di valore diverse:

Type Opzione Modalità database SQL di Azure Azure Synapse Analytics
Post-elaborazione
Trigger Sostituisci, Accumula + N/D, i trigger non sono disponibili in Synapse SQL
Staging Sostituisci, Accumula + +
Pre-elaborazione
Funzioni di Azure Sostituisci, Accumula + - (prestazioni riga per riga)
Sostituzione di Azure Cosmos DB Sostituzione N/D N/D
Collegamento a Synapse per Azure Cosmos DB Sostituzione N/D +

Ottenere supporto

Per maggiore supporto, provare la Pagina delle domande di Domande e risposte Microsoft per Analisi di flusso di Azure.

Passaggi successivi