你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

如何在 HDInsight on AKS 上将 Hive 目录与 Apache Flink® 配合使用

重要

此功能目前以预览版提供。 Microsoft Azure 预览版的补充使用条款包含适用于 beta 版、预览版或其他尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览版的信息,请参阅 Azure HDInsight on AKS 预览版信息。 如有疑问或功能建议,请在 AskHDInsight 上提交请求并附上详细信息,并在 Azure HDInsight Community 上关注我们以了解更多更新。

此示例使用 Hive 的元存储作为包含 Apache Flink 的 HiveCatalog 的永久目录。 我们使用此功能在 Flink 上跨会话存储 Kafka 表和 MySQL 表元数据。 Flink 使用 Hive Catalog 中注册的 Kafka 表作为源,执行某种查找操作,并将结果接收到 MySQL 数据库

先决条件

Flink 提供与 Hive 的双重集成。

  • 第一步是使用 Hive 元存储 (HMS) 作为包含 Flink 的 HiveCatalog 的持久目录,用于跨会话存储 Flink 特定的元数据。
    • 例如,用户可以使用 HiveCatalog 将 Kafka 或 ElasticSearch 表存储在 Hive 元存储中,并稍后在 SQL 查询中重用它们。
  • 第二步是提供 Flink 作为读写 Hive 表的替代引擎。
  • HiveCatalog 能够“现成地”与现有 Hive 安装兼容。 不需要修改现有的 Hive 元存储,也不需要更改表的数据位置或分区。

有关详细信息,请参阅 Apache Hive

环境准备

让我们在 Azure 门户上使用 HMS 创建 Apache Flink 群集,有关详细说明,请参阅 Flink 群集创建

显示如何创建 Flink 群集的屏幕截图。

群集创建完成后,检查 HMS 是否在 AKS 端运行。

显示如何在 Flink 群集中检查 HMS 状态的屏幕截图。

在 HDInsight 上准备用户订单交易数据 Kafka 主题

使用以下命令下载 kafka 客户端 jar:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

使用以下命令解压缩 tar 文件

tar -xvf kafka_2.12-3.2.0.tgz

生成发送到 Kafka 主题的消息。

显示如何向 Kafka 主题生成消息的屏幕截图。

其他命令:

注意

需要将 bootstrap-server 替换为你自己的 kafka 代理主机名或 IP

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

在 MySQL on Azure 上准备用户订单主数据

测试数据库:

显示如何在 Kafka 中测试数据库的屏幕截图。

显示如何在门户中运行 Cloud Shell 的屏幕截图。

准备订单表:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

使用 SSH 下载所需的 Kafka 连接器和 MySQL 数据库 jar

注意

根据我们的 HDInsight kafka 版本和 MySQL 版本下载正确的版本 jar。

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

移动规划器 jar

移动 webssh pod 的 /opt to /lib 中的 jar flink-table-planner_2.12-1.17.0-....jar,并移出 jar flink-table-planner-loader1.17.0-....jar /opt/flink-webssh/opt/ from /lib。 请参阅问题,了解更多详细信息。 执行以下步骤来移动规划器 jar。

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

注意

仅当使用 Hive 方言或 HiveServer2 终结点时,才需要移动额外的规划器 jar。 但是,这是 Hive 集成的建议设置。

验证

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

注意

由于我们已将 Flink 群集与 Hive 元存储一起使用,因此无需执行任何其他配置。

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

显示如何创建 Kafka 表的屏幕截图。

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

显示如何创建 mysql 表的屏幕截图。

显示表输出的屏幕截图。

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

显示如何接收用户事务的屏幕截图。

显示 Flink UI 的屏幕截图。

在 Azure Cloud Shell 中检查 Kafka 上的用户交易订单数据是否已添加到 MySQL 中的主表订单

显示如何检查用户事务的屏幕截图。

在 Kafka 上额外创建三个用户订单

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

显示如何检查 Kafka 表数据的屏幕截图。

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

显示如何检查订单表的屏幕截图。

在 Azure Cloud Shell 中检查 product_id = 104 记录是否已添加到 MySQL 上的订单表

显示添加到订单表的记录的屏幕截图。

参考