워크플로 오케스트레이션 관리자를 사용하여 기존 파이프라인 실행

적용 대상: Azure Data Factory Azure Synapse Analytics

기업용 올인원 분석 솔루션인 Microsoft Fabric의 Data Factory를 사용해 보세요. Microsoft Fabric은 데이터 이동부터 데이터 과학, 실시간 분석, 비즈니스 인텔리전스 및 보고에 이르기까지 모든 것을 다룹니다. 무료로 새 평가판을 시작하는 방법을 알아봅니다!

참고 항목

워크플로 오케스트레이션 관리자는 Apache Airflow를 통해 구동됩니다.

참고 항목

Azure Data Factory용 워크플로 오케스트레이션 관리자는 오픈 소스 Apache Airflow 애플리케이션을 사용합니다. Airflow에 대한 설명서 및 추가 자습서는 Apache Airflow 설명서 또는 커뮤니티 페이지에서 찾을 수 있습니다.

Data Factory 파이프라인은 확장 가능하고 안정적인 데이터 통합/데이터 흐름을 제공하는 100개 이상의 데이터 원본 커넥터를 제공합니다. Apache Airflow DAG에서 기존 데이터 팩터리 파이프라인을 실행하려는 시나리오가 있습니다. 이 자습서에서는 이 작업을 수행하는 방법을 보여줍니다.

필수 조건

단계

  1. 아래 내용을 사용하여 새 Python 파일 adf.py를 만듭니다.

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    워크플로 오케스트레이션 관리자 UI 관리 (커넥트ions -> '+' ->> '커넥트ion 형식'을 'Azure Data Factory'로 선택한 다음, client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name 및 pipeline_name 입력하여 연결을 만들어야 합니다.

  2. DAGS라는 폴더 내에서 Blob Storage에 adf.py 파일을 업로드합니다.

  3. DAGS 폴더를 워크플로 오케스트레이션 관리자 환경으로 가져옵니다. 계정이 없는 경우 새 계정을 만듭니다.

    Airflow 섹션이 선택된 데이터 팩터리 관리 탭을 보여 주는 스크린샷