Działanie ForEach w usługach Azure Data Factory i Azure Synapse Analytics

DOTYCZY: Azure Data Factory Azure Synapse Analytics

Napiwek

Wypróbuj usługę Data Factory w usłudze Microsoft Fabric — rozwiązanie analityczne typu all-in-one dla przedsiębiorstw. Usługa Microsoft Fabric obejmuje wszystko, od przenoszenia danych do nauki o danych, analizy w czasie rzeczywistym, analizy biznesowej i raportowania. Dowiedz się, jak bezpłatnie rozpocząć nową wersję próbną !

Działanie ForEach definiuje powtarzający się przepływ sterowania w potoku usługi Azure Data Factory lub Synapse. To działanie służy do wykonywania iteracji po kolekcji i wykonuje określone działania w pętli. Implementacja pętli tego działania przypomina strukturę pętli Foreach w językach programowania.

Tworzenie działania ForEach za pomocą interfejsu użytkownika

Aby użyć działania ForEach w potoku, wykonaj następujące kroki:

  1. Możesz użyć dowolnej zmiennej typu tablicy lub danych wyjściowych z innych działań jako danych wejściowych dla działania ForEach. Aby utworzyć zmienną tablicową, wybierz tło kanwy potoku, a następnie wybierz kartę Zmienne , aby dodać zmienną typu tablicy, jak pokazano poniżej.

    Shows an empty pipeline canvas with an array type variable added to the pipeline.

  2. Wyszukaj ciąg ForEach w okienku Działania potoku i przeciągnij działanie ForEach na kanwę potoku.

  3. Wybierz nowe działanie ForEach na kanwie, jeśli nie zostało jeszcze wybrane, a karta Ustawienia, aby edytować jego szczegóły.

    Shows the UI for a Filter activity.

  4. Wybierz pole Elementy, a następnie wybierz link Dodaj zawartość dynamiczną, aby otworzyć okienko edytora zawartości dynamicznej.

    Shows the  Add dynamic content  link for the Items property.

  5. Wybierz tablicę wejściową, która ma być filtrowana w edytorze zawartości dynamicznej. W tym przykładzie wybieramy zmienną utworzoną w pierwszym kroku.

    Shows the dynamic content editor with the variable created in the first step selected

  6. Wybierz edytor działań w działaniu ForEach, aby dodać co najmniej jedno działanie do wykonania dla każdego elementu w tablicy Elementy wejściowe.

    Shows the Activities editor button on the ForEach activity in the pipeline editor window.

  7. We wszystkich działaniach tworzonych w ramach działania ForEach można odwoływać się do bieżącego elementu, za pomocą którego działanie ForEach jest iterujące z listy Elementy . Możesz odwoływać się do bieżącego elementu w dowolnym miejscu, w którym można użyć wyrażenia dynamicznego, aby określić wartość właściwości. W edytorze zawartości dynamicznej wybierz iterator ForEach, aby zwrócić bieżący element.

    Shows the dynamic content editor with the ForEach iterator selected.

Składnia

Właściwości zostały opisane w dalszej części tego artykułu. Właściwość items jest kolekcją, a każdy element w kolekcji jest określany przy użyciu @item() elementu , jak pokazano w następującej składni:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Właściwości typu

Właściwości opis Dozwolone wartości Wymagania
name Nazwa działania dla każdego. String Tak
type Musi być ustawiona na ForEach String Tak
isSequential Określa, czy pętla powinna być wykonywana sekwencyjnie, czy równolegle. Maksymalnie 50 iteracji pętli można wykonywać jednocześnie. Jeśli na przykład masz iterację działania ForEach dla działania kopiowania z 10 różnymi zestawami danych źródłowych i ujścia z wartością isSequential ustawioną na wartość False, wszystkie kopie są wykonywane jednocześnie. Wartość domyślna to False.

Jeśli parametr "isSequential" ma wartość False, upewnij się, że istnieje poprawna konfiguracja uruchamiania wielu plików wykonywalnych. W przeciwnym razie ta właściwość powinna być używana z ostrożnością, aby uniknąć konfliktów zapisu. Aby uzyskać więcej informacji, zobacz sekcję Równoległe wykonywanie .
Wartość logiczna L.p. Wartość domyślna to False.
batchCount Liczba partii, która ma być używana do kontrolowania liczby równoległych wykonań (gdy parametr isSequential ma wartość false). Jest to górny limit współbieżności, ale dla każdego działania nie zawsze będzie wykonywane na tej liczbie Liczba całkowita (maksymalnie 50) L.p. Wartość domyślna to 20.
Elementy Wyrażenie zwracające tablicę JSON do iteracji. Wyrażenie (które zwraca tablicę JSON) Tak
Działania Działania do wykonania. Lista działań Tak

Równoległego

Jeśli parametr isSequential ma wartość false, działanie iteruje równolegle z maksymalnie 50 współbieżnymi iteracjami. To ustawienie powinno być używane z ostrożnością. Jeśli iteracji współbieżne są zapisywane w tym samym folderze, ale w przypadku różnych plików, takie podejście jest w porządku. Jeśli iteracje współbieżne zapisują jednocześnie do dokładnie tego samego pliku, to podejście najprawdopodobniej powoduje błąd.

Język wyrażeń iteracji

W działaniu ForEach podaj tablicę do iterated dla elementów właściwości. Użyj @item() polecenia , aby iterować w ramach pojedynczego wyliczenia w działaniu ForEach. Jeśli na przykład elementy są tablicą: [1, 2, 3], @item() zwraca wartość 1 w pierwszej iteracji, 2 w drugiej iteracji i 3 w trzeciej iteracji. Możesz również użyć @range(0,10) wyrażenia like do iterowania dziesięć razy, zaczynając od 0 kończącego się na 9.

Iterowanie pojedynczego działania

Scenariusz: Skopiuj z tego samego pliku źródłowego w usłudze Azure Blob do wielu plików docelowych w usłudze Azure Blob.

Definicja potoku

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Definicja zestawu danych obiektów blob

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Uruchamianie wartości parametrów

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Iterowanie wielu działań

Istnieje możliwość iterowania wielu działań (na przykład kopiowania i działań internetowych) w działaniu ForEach. W tym scenariuszu zalecamy abstrakcję wielu działań w osobnym potoku. Następnie możesz użyć działania ExecutePipeline w potoku z działaniem ForEach, aby wywołać oddzielny potok z wieloma działaniami.

Składnia

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Przykład

Scenariusz: iteracja nad elementem InnerPipeline w działaniu ForEach za pomocą działania Execute Pipeline (Wykonywanie potoku). Wewnętrzny potok kopiuje z definicjami schematu sparametryzowanymi.

Definicja potoku głównego

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

Definicja potoku wewnętrznego

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Definicja źródłowego zestawu danych

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Definicja zestawu danych ujścia

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Parametry potoku głównego

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Agregowanie danych wyjściowych

Aby zagregować dane wyjściowe działania foreach , skorzystaj ze zmiennych i działania Dołącz zmienną .

Najpierw zadeklaruj zmienną array w potoku. Następnie wywołaj działanie Dołącz zmienną wewnątrz każdej pętli foreach . Następnie można pobrać agregację z tablicy.

Ograniczenia i rozwiązania

Poniżej przedstawiono pewne ograniczenia działania ForEach i sugerowane obejścia.

Ograniczenie Rozwiązanie
Nie można zagnieżdżać pętli ForEach wewnątrz innej pętli ForEach (lub pętli Until). Zaprojektuj dwuwymiarowy potok, w którym zewnętrzny potok z zewnętrzną pętlą ForEach iteruje nad wewnętrznym potokiem zagnieżdżonym.
Działanie ForEach ma maksymalnie batchCount 50 dla przetwarzania równoległego i maksymalnie 100 000 elementów. Zaprojektuj dwu poziom potoku, w którym potok zewnętrzny z działaniem ForEach iteruje nad potokiem wewnętrznym.
Funkcji SetVariable nie można używać wewnątrz działania ForEach, które jest uruchamiane równolegle, ponieważ zmienne są globalne dla całego potoku, nie są one ograniczone do działania ForEach ani innych działań. Rozważ użycie sekwencyjnego elementu ForEach lub użyj potoku wykonywania wewnątrz programu ForEach (zmienna/parametr obsługiwany w podrzędnym potoku).

Zobacz inne obsługiwane działania przepływu sterowania: