你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
如何在 HDInsight on AKS 上将 Hive 目录与 Apache Flink® 配合使用
重要
此功能目前以预览版提供。 Microsoft Azure 预览版的补充使用条款包含适用于 beta 版、预览版或其他尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览版的信息,请参阅 Azure HDInsight on AKS 预览版信息。 如有疑问或功能建议,请在 AskHDInsight 上提交请求并附上详细信息,并在 Azure HDInsight Community 上关注我们以了解更多更新。
此示例使用 Hive 的元存储作为包含 Apache Flink 的 HiveCatalog 的永久目录。 我们使用此功能在 Flink 上跨会话存储 Kafka 表和 MySQL 表元数据。 Flink 使用 Hive Catalog 中注册的 Kafka 表作为源,执行某种查找操作,并将结果接收到 MySQL 数据库
先决条件
- 具有 Hive 元存储 3.1.2 的 HDInsight on AKS 上的 Apache Flink 群集
- HDInsight 上的 Apache Kafka 群集
- 需要确保如使用 Kafka 中所述完成网络设置;这是为了确保 HDInsight on AKS 和 HDInsight 群集位于同一 VNet 中
- MySQL 8.0.33
Apache Flink 上的 Apache Hive
Flink 提供与 Hive 的双重集成。
- 第一步是使用 Hive 元存储 (HMS) 作为包含 Flink 的 HiveCatalog 的持久目录,用于跨会话存储 Flink 特定的元数据。
- 例如,用户可以使用 HiveCatalog 将 Kafka 或 ElasticSearch 表存储在 Hive 元存储中,并稍后在 SQL 查询中重用它们。
- 第二步是提供 Flink 作为读写 Hive 表的替代引擎。
- HiveCatalog 能够“现成地”与现有 Hive 安装兼容。 不需要修改现有的 Hive 元存储,也不需要更改表的数据位置或分区。
有关详细信息,请参阅 Apache Hive
环境准备
使用 HMS 创建 Apache Flink 群集
让我们在 Azure 门户上使用 HMS 创建 Apache Flink 群集,有关详细说明,请参阅 Flink 群集创建。
群集创建完成后,检查 HMS 是否在 AKS 端运行。
在 HDInsight 上准备用户订单交易数据 Kafka 主题
使用以下命令下载 kafka 客户端 jar:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
使用以下命令解压缩 tar 文件
tar -xvf kafka_2.12-3.2.0.tgz
生成发送到 Kafka 主题的消息。
其他命令:
注意
需要将 bootstrap-server 替换为你自己的 kafka 代理主机名或 IP
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
在 MySQL on Azure 上准备用户订单主数据
测试数据库:
准备订单表:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
使用 SSH 下载所需的 Kafka 连接器和 MySQL 数据库 jar
注意
根据我们的 HDInsight kafka 版本和 MySQL 版本下载正确的版本 jar。
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
移动规划器 jar
移动 webssh pod 的 /opt to /lib 中的 jar flink-table-planner_2.12-1.17.0-....jar,并移出 jar flink-table-planner-loader1.17.0-....jar /opt/flink-webssh/opt/ from /lib。 请参阅问题,了解更多详细信息。 执行以下步骤来移动规划器 jar。
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
注意
仅当使用 Hive 方言或 HiveServer2 终结点时,才需要移动额外的规划器 jar。 但是,这是 Hive 集成的建议设置。
验证
使用 bin/sql-client.sh 连接到 Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
创建 Hive 目录并连接到 Flink SQL 上的 Hive 目录
注意
由于我们已将 Flink 群集与 Hive 元存储一起使用,因此无需执行任何其他配置。
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
在 Apache Flink SQL 上创建 Kafka 表
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
在 Apache Flink SQL 上创建 MySQL 表
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
在 Flink SQL 中检查 Hive 目录上注册的表
在 Flink SQL 上将用户交易订单信息接收到 MySQL 中的主订单表
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
在 Azure Cloud Shell 中检查 Kafka 上的用户交易订单数据是否已添加到 MySQL 中的主表订单
在 Kafka 上额外创建三个用户订单
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
在 Flink SQL 中检查 Kafka 表数据
Flink SQL> select * from kafka_user_orders;
在 Flink SQL 中将 product_id=104
插入 MySQL 上的订单表
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
在 Azure Cloud Shell 中检查 product_id = 104
记录是否已添加到 MySQL 上的订单表
参考
- Apache Hive
- Apache、Apache Hive、Hive、Apache Flink、Flink 和关联的开源项目名称是 Apache Software Foundation (ASF) 的商标。