你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

将数据从 Azure API for FHIR 复制到 Azure Synapse Analytics

本文介绍将数据从 Azure API for FHIR 复制到 Azure Synapse Analytics 的三种方法,这是一种无限的分析服务,将数据集成、企业数据仓库和大数据分析结合在一起。

使用 FHIR to Synapse 同步代理 OSS 工具

注意

FHIR 到 Synapse 同步代理是在 MIT 许可下发布的开放源代码工具,不受 Azure 服务的 Microsoft SLA 所涵盖。

FHIR to Synapse 同步代理是根据 MIT 许可证发布的 Microsoft OSS 项目。 它是一个 Azure 函数,它使用 FHIR 资源 API 从 FHIR 服务器中提取数据,将其转换为分层 Parquet 文件,并将其近乎实时地写入 Azure Data Lake。 它还包含一个脚本,用于在指向 Parquet 文件的 Synapse 无服务器 SQL 池 中创建外部表和视图。

借助此解决方案,可以使用 Synapse Studio、SSMS 和 Power BI 等工具查询整个 FHIR 数据。 还可以直接从 Synapse Spark 池访问 Parquet 文件。 如果要近乎实时地访问所有 FHIR 数据,并且想要延迟对下游系统的自定义转换,应考虑此解决方案。

请按照 OSS 文档 获取安装和使用说明。

使用 FHIR 到 CDM 管道生成器 OSS 工具

注意

FHIR 到 CDM 管道生成器是在 MIT 许可下发布的开放源代码工具,不受 Azure 服务的 Microsoft SLA 所涵盖。

FHIR 到 CDM 管道生成器是在 MIT 许可下发布的 Microsoft OSS 项目。 它是一种生成 ADF 管道的工具,用于使用 $export API 从 FHIR 服务器复制数据的快照,将其转换为 csv 格式,以及写入 Azure Data Lake Storage Gen 2 中的 CDM 文件夹。 该工具需要用户创建的配置文件,其中包含将 FHIR 资源和字段投影和平展到表中的说明。 还可以按照在 Synapse 工作区中创建下游管道的说明,将数据从 CDM 文件夹移动到 Synapse 专用 SQL 池。

通过此解决方案,可以在数据写入 CDM 文件夹时将数据转换为表格格式。 如果要在从 FHIR 服务器中提取 FHIR 数据后将数据转换为自定义架构,应考虑此解决方案。

请按照 OSS 文档 获取安装和使用说明。

使用 T-SQL 将导出的数据加载到 Synapse

在此方法中,使用 FHIR $export 操作以格式将 FHIR 资源复制到 Azure Data Lake Gen 2 (ADL Gen 2) Blob 存储NDJSON中。 随后,使用 T-SQL 将数据从存储加载到 Synapse 中的 无服务器或专用 SQL 池 中。 可以使用 Synapse 管道将这些步骤转换为可靠的数据移动管道。

使用 $export 将 Azure 存储存储到 Synapse。

使用 $export 复制数据

在 FHIR 服务器中配置$export

Azure API for FHIR 采用 FHIR 规范定义的 $export 操作,以 NDJSON 格式导出所有或筛选的 FHIR 数据子集。 此外,它还支持取消标识导出,以在导出过程中匿名处理 FHIR 数据。

若要将 FHIR 数据导出到 Azure Blob 存储,首先需要将 FHIR 服务器配置为将数据导出到存储帐户。 需要 (1) 启用托管标识, (2) 转到存储帐户中的访问控制并添加角色分配, (3) 为 $export选择存储帐户。 可 在此处找到更多分步操作。

可以将服务器配置为将数据导出到任何类型的 Azure 存储帐户,但我们建议导出到 ADL Gen 2,以便与 Synapse 保持高度一致。

使用 $export 命令

配置 FHIR 服务器后,可以按照文档中的内容在系统、患者或组级别导出 FHIR 资源。 例如,可以使用以下 $export 命令导出与 Group 中患者相关的所有 FHIR 数据,其中可以在字段 {{BlobContainer}} 中指定 ADL Gen 2 Blob 存储名称:

https://{{FHIR service base URL}}/Group/{{GroupId}}/$export?_container={{BlobContainer}}  

还可以在 _type 上述调用中使用 $export 参数来限制要导出的资源。 例如,以下调用将仅导出 PatientMedicationRequestObservation 资源:

https://{{FHIR service base URL}}/Group/{{GroupId}}/$export?_container={{BlobContainer}}&
_type=Patient,MedicationRequest,Condition

有关支持的不同参数的详细信息,请查看有关查询参数$export 页面部分。

使用 Synapse 进行分析

创建 Synapse 工作区

在使用 Synapse 之前,需要一个 Synapse 工作区。 你将在 Azure 门户 上创建Azure Synapse Analytics 服务。 可以在此处更多的分步指南。 需要 ADLSGEN2 帐户才能创建工作区。 Azure Synapse 工作区将使用此存储帐户来存储 Synapse 工作区数据。

创建工作区后,可以在 Synapse Studio 中查看工作区,方法是在 上https://web.azuresynapse.net登录工作区,或在 Azure 门户中启动Synapse Studio。

在 Azure 存储和 Synapse 之间创建链接服务

若要将数据复制到 Synapse,需要创建一个链接服务,用于连接已导出数据的 Azure 存储帐户与 Synapse。 可 在此处找到更多分步说明。

  1. 在“Synapse Studio”中,浏览到“管理”选项卡,然后在“外部连接”下,选择“链接服务”。
  2. 选择“新建”以添加新的链接服务。
  3. 从列表中选择“Azure Data Lake Storage Gen2”磁贴,然后选择“继续” 。
  4. 输入你的身份验证凭据。 完成后,选择“创建”

在 ADL Gen 2 存储和 Synapse 之间拥有链接服务后,可以使用 Synapse SQL 池来加载和分析 FHIR 数据。

在无服务器池与专用 SQL 池之间做出决定

Azure Synapse Analytics 提供两个不同的 SQL 池:无服务器 SQL 池和专用 SQL 池。 无服务器 SQL 池提供了使用无服务器 SQL 终结点直接在 Blob 存储中查询数据的灵活性,而无需进行任何资源预配。 专用 SQL 池具有高性能和并发处理能力,建议用于企业规模数据仓储功能。 有关这两种 SQL 池的详细信息,请查看有关 SQL 体系结构的 Synapse 文档页

使用无服务器 SQL 池

由于它是无服务器的,因此无需设置基础结构或要维护的群集。 创建工作区后,即可从 Synapse Studio 查询数据。

例如,以下查询可用于将所选字段从 Patient.ndjson 转换为表格结构:

SELECT * FROM  
OPENROWSET(bulk 'https://{{youraccount}}.blob.core.windows.net/{{yourcontainer}}/Patient.ndjson', 
FORMAT = 'csv', 
FIELDTERMINATOR ='0x0b', 
FIELDQUOTE = '0x0b')  
WITH (doc NVARCHAR(MAX)) AS rows     
CROSS APPLY OPENJSON(doc)     
WITH ( 
    ResourceId VARCHAR(64) '$.id', 
    Active VARCHAR(10) '$.active', 
    FullName VARCHAR(100) '$.name[0].text', 
    Gender VARCHAR(20) '$.gender', 
       ...
) 

在上面的查询中,OPENROWSET 函数访问 Azure 存储中的文件,并且 OPENJSON 分析 JSON 文本,并返回 JSON 输入属性作为行和列。 每次执行此查询时,无服务器 SQL 池都会从 Blob 存储中读取文件、分析 JSON 并提取字段。

还可以将结果以 Parquet 格式在外部表中具体化以获得更好的查询性能,如下所示:

-- Create External data source where the parquet file will be written 
CREATE EXTERNAL DATA SOURCE [MyDataSource] WITH ( 
    LOCATION = 'https://{{youraccount}}.blob.core.windows.net/{{exttblcontainer}}' 
); 
GO 

-- Create External File Format 
CREATE EXTERNAL FILE FORMAT [ParquetFF] WITH ( 
    FORMAT_TYPE = PARQUET, 
    DATA_COMPRESSION = 'org.apache.hadoop.io.compress.SnappyCodec' 
); 
GO 

CREATE EXTERNAL TABLE [dbo].[Patient] WITH ( 
        LOCATION = 'PatientParquet/', 
        DATA_SOURCE = [MyDataSource], 
        FILE_FORMAT = [ParquetFF] 
) AS 
SELECT * FROM  
OPENROWSET(bulk 'https://{{youraccount}}.blob.core.windows.net/{{yourcontainer}}/Patient.ndjson' 
-- Use rest of the SQL statement from the previous example --

使用专用 SQL 池

专用 SQL 池支持托管表和分层缓存,能够提高内存中性能。 可以使用简单的 T-SQL 查询导入大数据,然后利用分布式查询引擎的功能运行高性能分析。

将数据从存储加载到专用 SQL 池的最简单快捷的方法是使用 T-SQL 中的 COPY 命令,该命令可以读取 CSV、Parquet 和 ORC 文件。 如下面的示例查询所示,使用 COPY 命令将 NDJSON 行加载到表格结构中。

-- Create table with HEAP, which is not indexed and does not have a column width limitation of NVARCHAR(4000) 
CREATE TABLE StagingPatient ( 
Resource NVARCHAR(MAX) 
) WITH (HEAP) 
COPY INTO StagingPatient 
FROM 'https://{{yourblobaccount}}.blob.core.windows.net/{{yourcontainer}}/Patient.ndjson' 
WITH ( 
FILE_TYPE = 'CSV', 
ROWTERMINATOR='0x0a', 
FIELDQUOTE = '', 
FIELDTERMINATOR = '0x00' 
) 
GO

在上面的 StagingPatient 表中具有 JSON 行后,可以使用 OPENJSON 函数创建数据的不同表格式并将结果存储到表中。 下面是一个示例 SQL 查询,用于通过从Patient资源中提取几个字段来创建Patient表:

SELECT RES.* 
INTO Patient 
FROM StagingPatient
CROSS APPLY OPENJSON(Resource)   
WITH (
  ResourceId VARCHAR(64) '$.id',
  FullName VARCHAR(100) '$.name[0].text',
  FamilyName VARCHAR(50) '$.name[0].family',
  GivenName VARCHAR(50) '$.name[0].given[0]',
  Gender VARCHAR(20) '$.gender',
  DOB DATETIME2 '$.birthDate',
  MaritalStatus VARCHAR(20) '$.maritalStatus.coding[0].display',
  LanguageOfCommunication VARCHAR(20) '$.communication[0].language.text'
) AS RES 
GO

后续步骤

本文介绍了将 FHIR 数据复制到 Synapse 的三种不同方法。

接下来,可以了解如何在将 FHIR 数据导出到 Synapse 时对数据进行去标识,以保护 PHI。

FHIR® 是 HL7 的注册商标,经 HL7 许可使用。