你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

从数据处理器管道将数据发送到 Microsoft Fabric

重要

Azure IoT 操作预览版(由 Azure Arc 启用)当前处于预览状态。 不应在生产环境中使用此预览版软件。

有关 beta 版本、预览版或尚未正式发布的版本的 Azure 功能所适用的法律条款,请参阅 Microsoft Azure 预览版的补充使用条款

使用Fabric Lakehouse目标从Azure IoT 数据处理器预览版管道将数据写入 Microsoft Fabric 中的湖屋。 目标阶段将 parquet 文件写入湖屋,以便查看增量表中的数据。 目标阶段先批处理消息,然后再将其发送到 Microsoft Fabric。

先决条件

要配置和使用 Microsoft Fabric 目标管道阶段,需要:

设置 Microsoft Fabric

在从数据管道写入 Microsoft Fabric 之前,需要授予从管道访问 Lakehouse 的权限。 可使用服务主体或托管标识对管道进行身份验证。 使用托管标识的优点是无需管理服务主体的生命周期。 托管标识由 Azure 自动管理,并与其分配到的资源的生命周期相关联。

在配置服务主体或托管标识对 Lakehouse 的访问权限之前,请启用服务主体身份验证

要使用客户端密码创建服务主体:

  1. 使用以下 Azure CLI 命令创建服务主体。

    az ad sp create-for-rbac --name <YOUR_SP_NAME> 
    
  2. 此命令的输出包括 appIddisplayNamepasswordtenant。 记下以下值,用于配置对云资源(例如 Microsoft Fabric)的访问、创建机密和配置管道目标:

    {
        "appId": "<app-id>",
        "displayName": "<name>",
        "password": "<client-secret>",
        "tenant": "<tenant-id>"
    }
    

要将服务主体添加到 Microsoft Fabric 工作区:

  1. 记下工作区 ID 和湖屋 ID。 可以在用于访问湖屋的 URL 中找到这些值:

    https://msit.powerbi.com/groups/<your workspace ID>/lakehouses/<your lakehouse ID>?experience=data-engineering

  2. 在工作区中,选择“管理访问权限”:

    显示如何查找“管理访问”链接的屏幕截图。

  3. 选择“添加人员或组”:

    显示如何添加用户的屏幕截图。

  4. 按名称搜索服务主体。 开始键入以查看匹配的服务主体列表。 选择之前创建的服务主体:

    显示如何添加服务主体的屏幕截图。

  5. 向服务主体管理员授予对工作区的访问权限。

配置机密

要使目标阶段连接到 Microsoft Fabric,需要访问包含身份验证详细信息的机密。 若要创建机密,请执行以下操作:

  1. 使用以下命令将机密添加到 Azure Key Vault,其中包含在创建服务主体时记下的客户端机密:

    az keyvault secret set --vault-name <your-key-vault-name> --name AccessFabricSecret --value <client-secret>
    
  2. 按照管理 Azure IoT 操作预览版部署的机密中的步骤,将机密引用添加到 Kubernetes 群集。

配置目标阶段

Fabric Lakehouse目标阶段 JSON 配置定义了阶段的详细信息。 要创建该阶段,可以与基于窗体的 UI 交互,或在“高级”选项卡上提供 JSON 配置:

字段 类型​​ 描述 需要 默认 示例
Display name 字符串 要显示在数据处理器 UI 中的名称。 - Azure IoT MQ output
说明 字符串 关于阶段所执行的操作的用户友好型说明。 Write to topic default/topic1
WorkspaceId 字符串 湖屋工作区 ID。 -
LakehouseId 字符串 湖屋 Lakehouse ID。 -
字符串 要写入的表的名称。 -
文件路径1 模板 将 parquet 文件写入的文件路径。 {{{instanceId}}}/{{{pipelineId}}}/{{{partitionId}}}/{{{YYYY}}}/{{{MM}}}/{{{DD}}}/{{{HH}}}/{{{mm}}}/{{{fileNumber}}}
批处理2 批处理 如何批处理数据。 60s 10s
身份验证4 字符串 连接到 Azure 数据资源管理器的身份验证详细信息。 Service principalManaged identity 服务主体 -
重试 重试 要使用的重试策略。 default fixed
列 > 名称 string 列的名称。 temperature
列 > 类型3 字符串枚举 列中保存的数据类型,使用Delta 基元类型之一。 integer
列 > 路径 Path 从中读取列值的数据的每个记录中的位置。 .{{name}} .temperature

1文件路径:要将文件写入 Microsoft Fabric,需要文件路径。 可以使用模板配置文件路径。 文件路径必须按任意顺序包含以下组件:

  • instanceId
  • pipelineId
  • partitionId
  • YYYY
  • MM
  • DD
  • HH
  • mm
  • fileNumber

文件名是fileNumber指示的增量整数值。 如果希望系统识别文件类型,请务必包含文件扩展名。

2批处理:将数据写入 Microsoft Fabric 时,必须使用批处理。 目标阶段在可配置的时间间隔内批处理消息。

如果未配置批处理间隔,阶段会使用 60 秒作为默认值。

3类型:数据处理器使用增量格式写入 Microsoft Fabric。 数据处理器支持除decimaltimestamp without time zone之外的所有增量基元数据类型

要确保在 Microsoft Fabric 中正确表示所有日期和时间,请确保属性的值是有效的 RFC 3339 字符串,并且数据类型为datetimestamp

1身份验证:目前,连接到 Microsoft Fabric 时,目标阶段支持基于服务主体的身份验证或托管标识。

基于服务主体的身份验证

若要配置基于服务主体的身份验证,请提供以下值。 创建服务主体并将机密引用添加到群集时,记下了这些值。

字段 说明 必须
TenantId 租户 ID。
ClientId 创建有权访问数据库的服务主体时记下的应用 ID。
机密 在群集中创建的机密引用。

示例配置

以下 JSON 示例演示了完整的 Microsoft Fabric 湖屋目标阶段配置,该配置将整个消息写入数据库中的quickstart表`:

{
    "displayName": "Fabric Lakehouse - 520f54",
    "type": "output/fabric@v1",
    "viewOptions": {
        "position": {
            "x": 0,
            "y": 784
        }
    },
    "workspace": "workspaceId",
    "lakehouse": "lakehouseId",
    "table": "quickstart",
    "columns": [
        {
            "name": "Timestamp",
            "type": "timestamp",
            "path": ".Timestamp"
        },
        {
            "name": "AssetName",
            "type": "string",
            "path": ".assetname"
        },
        {
            "name": "Customer",
            "type": "string",
            "path": ".Customer"
        },
        {
            "name": "Batch",
            "type": "integer",
            "path": ".Batch"
        },
        {
            "name": "CurrentTemperature",
            "type": "float",
            "path": ".CurrentTemperature"
        },
        {
            "name": "LastKnownTemperature",
            "type": "float",
            "path": ".LastKnownTemperature"
        },
        {
            "name": "Pressure",
            "type": "float",
            "path": ".Pressure"
        },
        {
            "name": "IsSpare",
            "type": "boolean",
            "path": ".IsSpare"
        }
    ],
    "authentication": {
        "type": "servicePrincipal",
        "tenantId": "tenantId",
        "clientId": "clientId",
        "clientSecret": "secretReference"
    },
    "batch": {
        "time": "5s",
        "path": ".payload"
    },
    "retry": {
        "type": "fixed",
        "interval": "20s",
        "maxRetries": 4
    }
}

配置定义:

  • 批处理消息 5 秒。
  • 使用批处理路径.payload查找列的数据。

示例

以下示例演示了 Microsoft Fabric 湖屋目标阶段的示例输入消息:

{
  "payload": {
    "Batch": 102,
    "CurrentTemperature": 7109,
    "Customer": "Contoso",
    "Equipment": "Boiler",
    "IsSpare": true,
    "LastKnownTemperature": 7109,
    "Location": "Seattle",
    "Pressure": 7109,
    "Timestamp": "2023-08-10T00:54:58.6572007Z",
    "assetName": "oven"
  },
  "qos": 0,
  "systemProperties": {
    "partitionId": 0,
    "partitionKey": "quickstart",
    "timestamp": "2023-11-06T23:42:51.004Z"
  },
  "topic": "quickstart"
}