Przyrostowe ładowanie danych z wielu tabel w programie SQL Server do usługi Azure SQL Database przy użyciu programu PowerShell

DOTYCZY: Azure Data Factory Azure Synapse Analytics

Napiwek

Wypróbuj usługę Data Factory w usłudze Microsoft Fabric — rozwiązanie analityczne typu all-in-one dla przedsiębiorstw. Usługa Microsoft Fabric obejmuje wszystko, od przenoszenia danych do nauki o danych, analizy w czasie rzeczywistym, analizy biznesowej i raportowania. Dowiedz się, jak bezpłatnie rozpocząć nową wersję próbną !

W tym samouczku utworzysz usługę Azure Data Factory z potokiem, który ładuje dane różnicowe z wielu tabel w bazie danych programu SQL Server do usługi Azure SQL Database.

Ten samouczek obejmuje następujące procedury:

  • Przygotowanie źródłowych i docelowych magazynów danych
  • Tworzenie fabryki danych.
  • Utwórz własne środowisko Integration Runtime.
  • Instalowanie środowiska Integration Runtime.
  • Tworzenie połączonych usług.
  • Tworzenie zestawów danych źródła, ujścia i limitu.
  • Tworzenie, uruchamianie i monitorowanie potoku
  • Przejrzyj wyniki.
  • Dodawanie lub aktualizowanie danych w tabelach źródłowych.
  • Ponowne uruchamianie i monitorowanie potoku.
  • Przegląd wyników końcowych.

Omówienie

Poniżej przedstawiono ważne czynności związane z tworzeniem tego rozwiązania:

  1. Wybierz kolumnę limitu.

    Wybierz jedną kolumnę dla każdej tabeli w magazynie danych źródłowych, które można zidentyfikować nowe lub zaktualizowane rekordy dla każdego przebiegu. Zazwyczaj dane w tej wybranej kolumnie (na przykład last_modify_time lub ID) rosną wraz z tworzeniem i aktualizacją wierszy. Maksymalna wartość w tej kolumnie jest używana jako limit.

  2. Przygotuj magazyn danych do przechowywania wartości limitu.

    W tym samouczku wartość limitu jest przechowywana w bazie danych SQL.

  3. Utwórz potok z następującymi działaniami:

    1. Utwórz działanie ForEach służące do przeprowadzania iteracji po liście nazw tabel źródłowych przekazywanych jako parametr do potoku. Dla każdej tabeli źródłowej wywołuje ono następujące działania służące do wykonywania ładowania przyrostowego dla tej tabeli.

    2. Utwórz dwa działania lookup. Użyj pierwszego działania Lookup do pobrania ostatniej wartości limitu. Użyj drugiego działania Lookup do pobrania nowej wartości limitu. Te wartości limitu są przekazywane do działania Copy.

    3. Utwórz działanie Kopiuj, który kopiuje wiersze ze źródłowego magazynu danych z wartością kolumny limitu większej niż stara wartość limitu i mniejsza lub równa nowej wartości limitu. Następnie kopiuje dane różnicowe ze źródłowego magazynu danych do usługi Azure Blob Storage jako nowy plik.

    4. Utwórz działanie StoredProcedure, które aktualizuje wartość limitu dla potoku przy następnym uruchomieniu.

    Diagram ogólny rozwiązania wygląda następująco:

    Incrementally load data

Jeśli nie masz subskrypcji platformy Azure, przed rozpoczęciem utwórz bezpłatne konto.

Wymagania wstępne

  • SQL Server. Baza danych programu SQL Server jest używana jako źródłowy magazyn danych w tym samouczku.
  • Usługa Azure SQL Database. Baza danych w usłudze Azure SQL Database jest używana jako magazyn danych ujścia. Jeśli nie masz bazy danych SQL, zobacz Tworzenie bazy danych w usłudze Azure SQL Database , aby uzyskać instrukcje tworzenia bazy danych.

Tworzenie tabel źródłowych w bazie danych SQL Server

  1. Otwórz program SQL Server Management Studio (SSMS) lub azure Data Studio i połącz się z bazą danych programu SQL Server.

  2. W Eksploratorze serwera (SSMS) lub w okienku Połączenie ions (Azure Data Studio) kliknij prawym przyciskiem myszy bazę danych i wybierz pozycję Nowe zapytanie.

  3. Uruchom następujące polecenie SQL względem bazy danych w celu utworzenia tabel o nazwach customer_table i project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    
     INSERT INTO customer_table
     (PersonID, Name, LastModifytime)
     VALUES
     (1, 'John','9/1/2017 12:56:00 AM'),
     (2, 'Mike','9/2/2017 5:23:00 AM'),
     (3, 'Alice','9/3/2017 2:36:00 AM'),
     (4, 'Andy','9/4/2017 3:21:00 AM'),
     (5, 'Anny','9/5/2017 8:06:00 AM');
    
     INSERT INTO project_table
     (Project, Creationtime)
     VALUES
     ('project1','1/1/2015 0:00:00 AM'),
     ('project2','2/2/2016 1:23:00 AM'),
     ('project3','3/4/2017 5:16:00 AM');
    

Tworzenie tabel docelowych w usłudze Azure SQL Database

  1. Otwórz program SQL Server Management Studio (SSMS) lub azure Data Studio i połącz się z bazą danych programu SQL Server.

  2. W Eksploratorze serwera (SSMS) lub w okienku Połączenie ions (Azure Data Studio) kliknij prawym przyciskiem myszy bazę danych i wybierz pozycję Nowe zapytanie.

  3. Uruchom następujące polecenie SQL względem bazy danych w celu utworzenia tabel o nazwach customer_table i project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    

Tworzenie innej tabeli w usłudze Azure SQL Database w celu przechowywania wysokiej wartości limitu

  1. Uruchom następujące polecenie SQL względem bazy danych, aby utworzyć tabelę o nazwie watermarktable do przechowywania wartości limitu:

     create table watermarktable
     (
    
         TableName varchar(255),
         WatermarkValue datetime,
     );
    
  2. Wstaw początkowe wartości limitu dla obu tabel źródłowych w tabeli wartości limitu.

     INSERT INTO watermarktable
     VALUES
     ('customer_table','1/1/2010 12:00:00 AM'),
     ('project_table','1/1/2010 12:00:00 AM');
    

Tworzenie procedury składowanej w usłudze Azure SQL Database

Uruchom następujące polecenie, aby utworzyć procedurę składowaną w bazie danych. Ta procedura składowana służy do aktualizowania wartość limitu po każdym uruchomieniu potoku.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Tworzenie typów danych i dodatkowych procedur składowanych w usłudze Azure SQL Database

Uruchom następujące zapytanie, aby utworzyć dwie procedury składowane i dwa typy danych w bazie danych. Służą one do scalania danych z tabel źródłowych w tabelach docelowych.

Aby ułatwić rozpoczęcie podróży, użyjemy bezpośrednio tych procedur składowanych przekazujących dane różnicowe za pośrednictwem zmiennej tabeli, a następnie scalimy je z magazynem docelowym. Należy zachować ostrożność, że w zmiennej tabeli nie będzie przechowywana "duża" liczba wierszy różnicowych (ponad 100).

Jeśli musisz scalić dużą liczbę wierszy różnicowych z magazynem docelowym, zalecamy użycie działania kopiowania w celu skopiowania wszystkich danych różnicowych do tymczasowej tabeli przejściowej w magazynie docelowym, a następnie skompilowanie własnej procedury składowanej bez użycia zmiennej tabeli tabeli w celu scalenia ich z tabeli "przejściowej" do tabeli "końcowej".

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Azure PowerShell

Zainstaluj najnowsze moduły programu Azure PowerShell, wykonując instrukcje podane w temacie Instalowanie i konfigurowanie programu Azure PowerShell.

Tworzenie fabryki danych

  1. Zdefiniuj zmienną nazwy grupy zasobów, której użyjesz później w poleceniach programu PowerShell. Skopiuj poniższy tekst polecenia do programu PowerShell, podaj nazwę grupy zasobów platformy Azure w podwójnych cudzysłowach, a następnie uruchom polecenie. Może to być na przykład "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Jeśli grupa zasobów już istnieje, jej zastąpienie może być niewskazane. Przypisz inną wartość do zmiennej $resourceGroupName i ponownie uruchom polecenie.

  2. Zdefiniuj zmienną lokalizacji fabryki danych.

    $location = "East US"
    
  3. Aby utworzyć grupę zasobów platformy Azure, uruchom następujące polecenie:

    New-AzResourceGroup $resourceGroupName $location
    

    Jeśli grupa zasobów już istnieje, jej zastąpienie może być niewskazane. Przypisz inną wartość do zmiennej $resourceGroupName i ponownie uruchom polecenie.

  4. Zdefiniuj zmienną nazwy fabryki danych.

    Ważne

    Zaktualizuj nazwę fabryki danych, aby była unikatowa w skali globalnej. Może to być na przykład ADFIncMultiCopyTutorialFactorySP1127.

    $dataFactoryName = "ADFIncMultiCopyTutorialFactory";
    
  5. Aby utworzyć fabrykę danych, uruchom następujące polecenie cmdlet Set-AzDataFactoryV2 :

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

Należy uwzględnić następujące informacje:

  • Nazwa fabryki danych musi być globalnie unikatowa. Jeśli zostanie wyświetlony następujący błąd, zmień nazwę i spróbuj ponownie:

    Set-AzDataFactoryV2 : HTTP Status Code: Conflict
    Error Code: DataFactoryNameInUse
    Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
    
  • Aby utworzyć wystąpienia usługi Data Factory, konto użytkownika używane do logowania się na platformie Azure musi być członkiem roli współautora lub właściciela albo administratorem subskrypcji platformy Azure.

  • Aby uzyskać listę regionów platformy Azure, w których obecnie jest dostępna usługa Data Factory, wybierz dane regiony na poniższej stronie, a następnie rozwiń węzeł Analiza, aby zlokalizować pozycję Data Factory: Produkty dostępne według regionu. Magazyny danych (Azure Storage, SQL Database, SQL Managed Instance itd.) i obliczenia (Azure HDInsight itp.) używane przez fabrykę danych mogą znajdować się w innych regionach.

Tworzenie własnego środowiska Integration Runtime

W tej sekcji utworzysz własne środowisko Integration Runtime i skojarzysz je z maszyną lokalną za pomocą bazy danych programu SQL Server. Własne środowisko Integration Runtime to składnik, który kopiuje dane z programu SQL Server na maszynie do usługi Azure SQL Database.

  1. Utwórz zmienną dla nazwy środowiska Integration Runtime. Użyj unikatowej nazwy i zanotuj ją. Będziesz jej używać w dalszej części tego samouczka.

    $integrationRuntimeName = "ADFTutorialIR"
    
  2. Utwórz własne środowisko Integration Runtime.

    Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Oto przykładowe dane wyjściowe:

     Name              : <Integration Runtime name>
     Type              : SelfHosted
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Description       : 
     Id                : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
    
  3. Uruchom następujące polecenie, aby pobrać stan utworzonego środowiska Integration Runtime. Upewnij się, że właściwość State ma ustawioną wartość NeedRegistration.

    Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
    

    Oto przykładowe dane wyjściowe:

    State                     : NeedRegistration
    Version                   : 
    CreateTime                : 9/24/2019 6:00:00 AM
    AutoUpdate                : On
    ScheduledUpdateDate       : 
    UpdateDelayOffset         : 
    LocalTimeZoneOffset       : 
    InternalChannelEncryption : 
    Capabilities              : {}
    ServiceUrls               : {eu.frontend.clouddatahub.net}
    Nodes                     : {}
    Links                     : {}
    Name                      : ADFTutorialIR
    Type                      : SelfHosted
    ResourceGroupName         : <ResourceGroup name>
    DataFactoryName           : <DataFactory name>
    Description               : 
    Id                        : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
    
  4. Uruchom następujące polecenie, aby pobrać klucze uwierzytelniania używane do zarejestrowania własnego środowiska Integration Runtime za pomocą usługi Azure Data Factory w chmurze:

    Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
    

    Oto przykładowe dane wyjściowe:

    {
     "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=",
     "AuthKey2":  "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy="
    }
    
  5. Skopiuj jeden z kluczy (pomijając cudzysłowy) używanych do rejestracji własnego środowiska Integration Runtime, które zainstalujesz na swojej maszynie w kolejnych krokach.

Instalowanie narzędzia Integration Runtime

  1. Jeśli środowisko Integration Runtime jest już zainstalowane na maszynie, odinstaluj je za pomocą apletu Dodaj lub usuń programy.

  2. Pobierz środowisko Integration Runtime (Self-hosted) na lokalną maszynę z systemem Windows. Uruchom instalację.

  3. Na stronie powitalnej instalatora środowiska Microsoft Integration Runtime wybierz przycisk Dalej.

  4. Na stronie Umowa Licencyjna Użytkownika Oprogramowania zaakceptuj warunki i umowę licencyjną, a następnie wybierz przycisk Dalej.

  5. Na stronie Folder docelowy wybierz pozycję Dalej.

  6. Na stronie Gotowe do zainstalowania środowiska Microsoft Integration Runtime wybierz pozycję Zainstaluj.

  7. N astronie zakończenia pracy z Kreatorem instalacji środowiska Microsoft Integration Runtime wybierz pozycję Zakończ.

  8. Na stronie Rejestrowanie środowiska Integration Runtime (Self-hosted) wklej klucz zapisany w poprzedniej sekcji i wybierz pozycję Zarejestruj.

    Register the integration runtime

  9. Na stronie Nowy węzeł Integration Runtime (self-hosted) wybierz pozycję Zakończ.

  10. Gdy własne środowisko Integration Runtime zostanie pomyślnie zarejestrowane, zostanie wyświetlony następujący komunikat:

    Registered successfully

  11. Na stronie Rejestrowanie produktu Integration Runtime (Self-hosted) kliknij pozycję Uruchom program Configuration Manager.

  12. Gdy węzeł zostanie połączony z usługą w chmurze, zostanie wyświetlona następująca strona:

    Node is connected page

  13. Teraz przetestuj łączność z bazą danych programu SQL Server.

    Diagnostics tab

    a. Na stronie programu Configuration Manager przejdź na kartę Diagnostyka.

    b. Wybierz typ źródła danych SqlServer.

    c. Wprowadź nazwę serwera.

    d. Wprowadź nazwę bazy danych.

    e. Wybierz tryb uwierzytelniania.

    f. Wprowadź nazwę użytkownika.

    g. Wprowadź hasło skojarzone z nazwą użytkownika.

    h. Wybierz przycisk Testuj, aby upewnić się, że środowisko Integration Runtime może połączyć się z wystąpieniem programu SQL Server. Jeśli połączenie zostanie pomyślnie nawiązane, zostanie wyświetlony zielony znacznik wyboru. Jeśli nawiązywanie połączenia nie powiedzie się, zostanie wyświetlony komunikat o błędzie. Rozwiąż wszelkie problemy i upewnij się, że środowisko Integration Runtime może połączyć się z programem SQL Server.

    Uwaga

    Zapisz wartości: typ uwierzytelniania, serwer, baza danych, użytkownik i hasło. Będziesz ich używać w dalszej części tego samouczka.

Tworzenie połączonych usług

Połączone usługi tworzy się w fabryce danych w celu połączenia magazynów danych i usług obliczeniowych z fabryką danych. W tej sekcji utworzysz połączone usługi z bazą danych programu SQL Server i bazą danych w usłudze Azure SQL Database.

Tworzenie usługi połączonej z serwerem SQL Server

W tym kroku połączysz bazę danych programu SQL Server z fabryką danych.

  1. Utwórz plik JSON o nazwie SqlServerLinkedService.json w folderze C:\ADFTutorials\IncCopyMultiTableTutorial (utwórz foldery lokalne, jeśli jeszcze nie istnieją) z następującą zawartością. Wybierz właściwą sekcję na podstawie uwierzytelniania używanego do nawiązywania połączenia z programem SQL Server.

    Ważne

    Wybierz właściwą sekcję na podstawie uwierzytelniania używanego do nawiązywania połączenia z programem SQL Server.

    Jeśli używasz uwierzytelniania SQL, skopiuj następującą definicję JSON:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>"
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Jeśli używasz uwierzytelniania systemu Windows, skopiuj następującą definicję JSON:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>",
                 "userName":"<username> or <domain>\\<username>",
                 "password":{
                     "type":"SecureString",
                     "value":"<password>"
                 }
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Ważne

    • Wybierz właściwą sekcję na podstawie uwierzytelniania używanego do nawiązywania połączenia z programem SQL Server.
    • Zastąp <nazwę środowiska Integration Runtime nazwą> środowiska Integration Runtime.
    • Przed zapisaniem pliku zastąp wartość <servername, <databasename>>, <username> i <password> wartościami bazy danych programu SQL Server.
    • Jeśli musisz użyć znaku ukośnika (\) w nazwie konta użytkownika lub nazwie serwera, użyj znaku ucieczki (\). Może to być na przykład mydomain\\myuser.
  2. W programie PowerShell uruchom następujące polecenie cmdlet, aby przełączyć się do folderu C:\ADFTutorials\IncCopyMultiTableTutorial.

    Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
    
  3. Uruchom polecenie cmdlet Set-AzDataFactoryV2LinkedService, aby utworzyć połączoną usługę AzureStorageLinkedService. W poniższym przykładzie przekazujesz wartości dla parametrów ResourceGroupName i DataFactoryName:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
    

    Oto przykładowe dane wyjściowe:

    LinkedServiceName : SqlServerLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
    

Tworzenie połączonej usługi SQL Database

  1. Utwórz plik JSON o nazwie AzureSQLDatabaseLinkedService.json w folderze C:\ADFTutorials\IncCopyMultiTableTutorial o następującej zawartości. (Utwórz folder ADF, jeśli jeszcze nie istnieje). Przed zapisaniem pliku zastąp <wartości servername>, <<database name>, user name> i <password> nazwą bazy danych programu SQL Server, nazwą bazy danych, nazwą użytkownika i hasłem.

     {
         "name":"AzureSQLDatabaseLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"AzureSqlDatabase",
             "typeProperties":{
                 "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;"
             }
         }
     }
    
  2. W programie PowerShell uruchom polecenie cmdlet Set-AzDataFactoryV2LinkedService , aby utworzyć połączoną usługę AzureSQLDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Oto przykładowe dane wyjściowe:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

Tworzenie zestawów danych

W tym kroku utworzysz zestawy danych reprezentujące źródło danych, docelową lokalizację danych i lokalizację, w której będzie przechowywana wartość limitu.

Tworzenie zestawu danych źródłowych

  1. Utwórz plik JSON o nazwie SourceDataset.json w tym samym folderze o następującej zawartości:

    {
         "name":"SourceDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"SqlServerLinkedService",
                 "type":"LinkedServiceReference"
             },
             "annotations":[
    
             ],
             "type":"SqlServerTable",
             "schema":[
    
             ]
         }
    }
    

    Działanie Copy w potoku korzysta z zapytania SQL do załadowania danych, a nie całej tabeli.

  2. Uruchom polecenie cmdlet Set-AzDataFactoryV2Dataset, aby utworzyć zestaw danych SourceDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Oto przykładowe dane wyjściowe polecenia cmdlet:

    DatasetName       : SourceDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
    

Tworzenie ujścia zestawu danych

  1. Utwórz plik JSON o nazwie SinkDataset.json w tym samym folderze o następującej zawartości. Element tableName jest dynamicznie ustawiany przez potok w czasie wykonywania. Działanie ForEach w potoku przeprowadza iterację po liście nazw i przekazuje nazwę tabeli do tego zestawu danych w każdej iteracji.

     {
         "name":"SinkDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"AzureSQLDatabaseLinkedService",
                 "type":"LinkedServiceReference"
             },
             "parameters":{
                 "SinkTableName":{
                     "type":"String"
                 }
             },
             "annotations":[
    
             ],
             "type":"AzureSqlTable",
             "typeProperties":{
                 "tableName":{
                     "value":"@dataset().SinkTableName",
                     "type":"Expression"
                 }
             }
         }
     }
    
  2. Uruchom polecenie cmdlet Set-AzDataFactoryV2Dataset, aby utworzyć zestaw danych SinkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Oto przykładowe dane wyjściowe polecenia cmdlet:

    DatasetName       : SinkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Tworzenie zestawu danych dla limitu

W tym kroku utworzysz zestaw danych do przechowywania wartości górnego limitu.

  1. Utwórz plik JSON o nazwie WatermarkDataset.json w tym samym folderze o następującej zawartości:

     {
         "name": " WatermarkDataset ",
         "properties": {
             "type": "AzureSqlTable",
             "typeProperties": {
                 "tableName": "watermarktable"
             },
             "linkedServiceName": {
                 "referenceName": "AzureSQLDatabaseLinkedService",
                 "type": "LinkedServiceReference"
             }
         }
     }
    
  2. Uruchom polecenie cmdlet Set-AzDataFactoryV2Dataset, aby utworzyć zestaw danych WatermarkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Oto przykładowe dane wyjściowe polecenia cmdlet:

    DatasetName       : WatermarkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Tworzenie potoku

Potok przyjmuje listę nazw tabel jako parametr. Działanie ForEach iteruje za pośrednictwem listy nazw tabel i wykonuje następujące operacje:

  1. Użyj działania Lookup, aby pobrać starą wartość limitu (początkową wartość lub wartość użytą w ostatniej iteracji).

  2. Użyj działania Lookup, aby pobrać nową wartość limitu (maksymalną wartość kolumny limitu w tabeli źródłowej).

  3. Użyj działanie Kopiuj, aby skopiować dane między tymi dwiema wartościami limitu z źródłowej bazy danych do docelowej bazy danych.

  4. Użyj działania StoredProcedure, aby zaktualizować starą wartość limitu do użycia w pierwszym kroku następnej iteracji.

Tworzenie potoku

  1. Utwórz plik JSON o nazwie IncrementalCopyPipeline.json w tym samym folderze o następującej zawartości:

     {
         "name":"IncrementalCopyPipeline",
         "properties":{
             "activities":[
                 {
                     "name":"IterateSQLTables",
                     "type":"ForEach",
                     "dependsOn":[
    
                     ],
                     "userProperties":[
    
                     ],
                     "typeProperties":{
                         "items":{
                             "value":"@pipeline().parameters.tableList",
                             "type":"Expression"
                         },
                         "isSequential":false,
                         "activities":[
                             {
                                 "name":"LookupOldWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"AzureSqlSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"WatermarkDataset",
                                         "type":"DatasetReference"
                                     }
                                 }
                             },
                             {
                                 "name":"LookupNewWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     },
                                     "firstRowOnly":true
                                 }
                             },
                             {
                                 "name":"IncrementalCopyActivity",
                                 "type":"Copy",
                                 "dependsOn":[
                                     {
                                         "activity":"LookupOldWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     },
                                     {
                                         "activity":"LookupNewWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "sink":{
                                         "type":"AzureSqlSink",
                                         "sqlWriterStoredProcedureName":{
                                             "value":"@{item().StoredProcedureNameForMergeOperation}",
                                             "type":"Expression"
                                         },
                                         "sqlWriterTableType":{
                                             "value":"@{item().TableType}",
                                             "type":"Expression"
                                         },
                                         "storedProcedureTableTypeParameterName":{
                                             "value":"@{item().TABLE_NAME}",
                                             "type":"Expression"
                                         },
                                         "disableMetricsCollection":false
                                     },
                                     "enableStaging":false
                                 },
                                 "inputs":[
                                     {
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     }
                                 ],
                                 "outputs":[
                                     {
                                         "referenceName":"SinkDataset",
                                         "type":"DatasetReference",
                                         "parameters":{
                                             "SinkTableName":{
                                                 "value":"@{item().TABLE_NAME}",
                                                 "type":"Expression"
                                             }
                                         }
                                     }
                                 ]
                             },
                             {
                                 "name":"StoredProceduretoWriteWatermarkActivity",
                                 "type":"SqlServerStoredProcedure",
                                 "dependsOn":[
                                     {
                                         "activity":"IncrementalCopyActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "storedProcedureName":"[dbo].[usp_write_watermark]",
                                     "storedProcedureParameters":{
                                         "LastModifiedtime":{
                                             "value":{
                                                 "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}",
                                                 "type":"Expression"
                                             },
                                             "type":"DateTime"
                                         },
                                         "TableName":{
                                             "value":{
                                                 "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}",
                                                 "type":"Expression"
                                             },
                                             "type":"String"
                                         }
                                     }
                                 },
                                 "linkedServiceName":{
                                     "referenceName":"AzureSQLDatabaseLinkedService",
                                     "type":"LinkedServiceReference"
                                 }
                             }
                         ]
                     }
                 }
             ],
             "parameters":{
                 "tableList":{
                     "type":"array"
                 }
             },
             "annotations":[
    
             ]
         }
     }
    
  2. Uruchom polecenie cmdlet Set-AzDataFactoryV2Pipeline, aby utworzyć potok IncrementalCopyPipeline.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Oto przykładowe dane wyjściowe:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Activities        : {IterateSQLTables}
     Parameters        : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
    

Uruchamianie potoku

  1. Utwórz plik parametrów o nazwie Parameters.json w tym samym folderze o następującej zawartości:

     {
         "tableList":
         [
             {
                 "TABLE_NAME": "customer_table",
                 "WaterMark_Column": "LastModifytime",
                 "TableType": "DataTypeforCustomerTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
             },
             {
                 "TABLE_NAME": "project_table",
                 "WaterMark_Column": "Creationtime",
                 "TableType": "DataTypeforProjectTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
             }
         ]
     }
    
  2. Uruchom potok IncrementalCopyPipeline przy użyciu polecenia cmdlet Invoke-AzDataFactoryV2Pipeline . Zastąp symbole zastępcze własną nazwą grupy zasobów i fabryki danych.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    

Monitor the pipeline (Monitorowanie potoku)

  1. Zaloguj się w witrynie Azure Portal.

  2. Wybierz pozycję Wszystkie usługi, przeprowadź wyszukiwanie za pomocą słowa kluczowego Fabryki danych, a następnie wybierz pozycję Fabryki danych.

  3. Wyszukaj używaną fabrykę danych na liście fabryk danych, a następnie wybierz ją, aby otworzyć stronę Fabryka danych.

  4. Na stronie Fabryka danych wybierz pozycję Otwórz na kafelku Otwórz usługę Azure Data Factory Studio, aby uruchomić usługę Azure Data Factory na osobnej karcie.

  5. Na stronie głównej usługi Azure Data Factory wybierz pozycję Monitoruj po lewej stronie.

    Screenshot shows the home page for Azure Data Factory.

  6. Wyświetlone zostaną wszystkie uruchomienia potoków wraz z ich stanami. Zwróć uwagę, że w poniższym przykładzie stan uruchomienia potoku to Powodzenie. Wybierz link w kolumnie Parametry, aby sprawdzić parametry przekazywane do potoku. Jeśli wystąpił błąd, w kolumnie Błąd zostanie wyświetlony link.

    Screenshot shows pipeline runs for a data factory including your pipeline.

  7. Po wybraniu linku w kolumnie Akcje zobaczysz wszystkie uruchomienia działań dla potoku.

  8. Aby wrócić do widoku Uruchomienia potoku, wybierz pozycję Wszystkie uruchomienia potoku.

Sprawdzanie wyników

W programu SQL Server Management Studio uruchom następujące zapytania względem docelowej bazy danych Azure SQL Database, aby sprawdzić, czy dane zostały skopiowane z tabel źródłowych do tabel docelowych:

Zapytanie

select * from customer_table

Wyjście

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            Alice    2017-09-03 02:36:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Zapytanie

select * from project_table

Wyjście

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000

Zapytanie

select * from watermarktable

Wyjście

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-05 08:06:00.000
project_table    2017-03-04 05:16:00.000

Należy zauważyć, że wartości limitu dla obu tabel zostały zaktualizowane.

Dodawanie większej ilości danych do tabel źródłowych

Uruchom następujące zapytanie względem źródłowej bazy danych programu SQL Server, aby zaktualizować istniejący wiersz w tabeli customer_table. Wstaw nowy wiersz do tabeli project_table.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

Ponowne uruchamianie potoku

  1. Teraz ponownie uruchom proces, wykonując następujące polecenie programu PowerShell:

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    
  2. Monitoruj uruchomienia potoków, postępując zgodnie z instrukcjami w sekcji Monitorowanie potoku. Gdy stan potoku to W toku, w obszarze Akcje zostanie wyświetlony kolejny link akcji, aby anulować uruchomienie potoku.

  3. Wybierz pozycję Odśwież, aby odświeżać listę do momentu, aż uruchomienie potoku zakończy się pomyślnie.

  4. Opcjonalnie wybierz link Wyświetl uruchomienia działań w kolumnie Akcje, aby wyświetlić wszystkie uruchomienia działań skojarzone z tym uruchomieniem potoku.

Przegląd wyników końcowych

W programie SQL Server Management Studio uruchom następujące zapytania względem docelowej bazy danych, aby sprawdzić, czy zaktualizowane lub nowe dane zostały skopiowane z tabel źródłowych do tabel docelowych.

Zapytanie

select * from customer_table

Wyjście

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            NewName    2017-09-08 00:00:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Zwróć uwagę na nowe wartości właściwości Name i LastModifytime dla identyfikatora PersonID numeru 3.

Zapytanie

select * from project_table

Wyjście

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000
NewProject    2017-10-01 00:00:00.000

Zwróć uwagę, że do tabeli project_table dodano pozycję NewProject.

Zapytanie

select * from watermarktable

Wyjście

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-08 00:00:00.000
project_table    2017-10-01 00:00:00.000

Należy zauważyć, że wartości limitu dla obu tabel zostały zaktualizowane.

W ramach tego samouczka wykonano następujące procedury:

  • Przygotowanie źródłowych i docelowych magazynów danych
  • Tworzenie fabryki danych.
  • Tworzenie własnego środowiska Integration Runtime.
  • Instalowanie środowiska Integration Runtime.
  • Tworzenie połączonych usług.
  • Tworzenie zestawów danych źródła, ujścia i limitu.
  • Tworzenie, uruchamianie i monitorowanie potoku
  • Przejrzyj wyniki.
  • Dodawanie lub aktualizowanie danych w tabelach źródłowych.
  • Ponowne uruchamianie i monitorowanie potoku.
  • Przegląd wyników końcowych.

Przejdź do następującego samouczka, aby dowiedzieć się więcej o przekształcaniu danych za pomocą klastra Spark na platformie Azure: