有不少讀者反饋,參考上篇文章《Hive 終於等來了 Flink》部署 Flink 並集成 Hive 時,出現一些 bug 以及兼容性等問題。雖已等來,卻未可用。所以筆者增加了這一篇文章,作為姊妹篇。
回顧
在上篇文章中,筆者使用的 CDH 版本為 5.16.2,其中 Hive 版本為 1.1.0,Flink 源代碼本身對 Hive 1.1.0 版本兼容性不好,存在不少問題。為了兼容目前版本,筆者基於 CDH 5.16.2 環境,對 Flink 代碼進行了修改,重新打包並部署。
其實經過很多開源項目的實戰,比如 Apache Atlas,Apache Spark 等,Hive 1.2.x 和 Hive 1.1.x 在大部分情況下,替換一些 Jar 包,是可以解決兼容性的問題。對於筆者的環境來説,可以使用 Hive 1.2.1 版本的一些 Jar 包來代替 Hive 1.1.0 版本的 Jar 包。在本篇文章的開始部分,筆者會解決這個問題,然後再補充上篇文章缺少的實戰內容。
剪不斷理還亂的問題
根據讀者的反饋,筆者將所有的問題總結為三類:
Flink 如何連接 Hive 除了 API 外,有沒有類似 spark-sql 命令
識別不到 Hadoop 環境或配置文件找不到
依賴包、類或方法找不到
1. Flink 如何連接 Hive
有的讀者不太清楚,如何配置 Flink 連接 Hive 的 Catalog,這裏補充一個完整的 conf/sql-client-hive.yaml 示例:
catalogs:
- name: staginghive
type: hive
hive-conf-dir: /etc/hive/conf
hive-version: 1.2.1
execution:
planner: blink
type: batch
time-characteristic: event-time
periodic-watermarks-interval: 200
result-mode: table
max-table-result-rows: 1000000
parallelism: 1
max-parallelism: 128
min-idle-state-retention: 0
max-idle-state-retention: 0
current-catalog: staginghive
current-database: ssb
restart-strategy:
type: fallback
deployment:
response-timeout: 5000
gateway-address: ""
gateway-port: 0
m: yarn-cluster
yn: 2
ys: 5
yjm: 1024
ytm: 2048
sql-client-hive.yaml 配置文件裏面包含:
Hive 配置文件 catalogs 中配置了 Hive 的配置文件路徑。
Yarn 配置信息 deployment 中配置了 Yarn 的配置信息。
執行引擎信息 execution 配置了 blink planner,並且使用 batch 模式。batch 模式比較穩定,適合傳統的批處理作業,而且可以容錯,另外中間數據落盤,建議開啓壓縮功能。除了 batch,Flink 也支持 streaming 模式。
■ Flink SQL CLI 工具
類似 spark-sql 命令,Flink 提供了 SQL CLI 工具,即 sql-client.sh 腳本。在 Flink 1.10 版本中,Flink SQL CLI 改進了很多功能,筆者後面講解。
sql-client.sh 使用方式如下:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
2. 識別不到 Hadoop 環境或配置文件找不到
筆者在上篇文章中提到過,在部署 Flink 的環境上部署 CDH gateway,包括 Hadoop、Hive 客户端,另外還需要配置一些環境變量,如下:
export HADOOP_CONF_DIR=/etc/hadoop/conf
export YARN_CONF_DIR=/etc/hadoop/conf
export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive
export HIVE_CONF_DIR=/etc/hive/conf
3. 依賴包、類或方法找不到
先查看一下 Flink 家目錄下的 lib 目錄:
$ tree lib
lib
├── flink-connector-hive_2.11-1.10.0.jar
├── flink-dist_2.11-1.10.0.jar
├── flink-hadoop-compatibility_2.11-1.10.0.jar
├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar
├── flink-table_2.11-1.10.0.jar
├── flink-table-blink_2.11-1.10.0.jar
├── hive-exec-1.1.0-cdh5.16.2.jar
├── hive-metastore-1.1.0-cdh5.16.2.jar
├── libfb303-0.9.3.jar
├── log4j-1.2.17.jar
└── slf4j-log4j12-1.7.15.jar
如果上面前兩個問題都解決後,執行如下命令:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
報錯,報錯,還是報錯:
Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory
其實在運行 sql-client.sh 腳本前,需要指定 Hadoop 環境的依賴包的路徑,建議不要報錯一個添加一個,除非有的讀者喜歡。這裏筆者提示一個方便的方式,即設置 HADOOPCLASSPATH環境變量:
export HADOOP_CLASSPATH=`hadoop classpath`
再次執行:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
很抱歉,繼續報錯:
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession at org.apache.flink.table.client.SqlClient.start at org.apache.flink.table.client.SqlClient.main Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client
這裏就是 Hive 1.1.0 版本的 Jar 包與 Flink 出現版本不兼容性的問題了,解決方法是:
下載 apache-hive-1.2.1 版本
替換 Flink lib 目錄下的 Hive Jar 包 刪除掉 hive-exec-1.1.0-cdh5.16.2.jar、 hive-metastore-1.1.0-cdh5.16.2.jar 和 libfb303-0.9.3.jar,然後添加 hive-exec-1.2.1.jar、 hive-metastore-1.2.1.jar 和 libfb303-0.9.2.jar,再次查看 lib 目錄:
$ tree lib
lib
├── flink-connector-hive_2.11-1.10.0.jar
├── flink-dist_2.11-1.10.0.jar
├── flink-hadoop-compatibility_2.11-1.10.0.jar
├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar
├── flink-table_2.11-1.10.0.jar
├── flink-table-blink_2.11-1.10.0.jar
├── hive-exec-1.2.1.jar
├── hive-metastore-1.2.1.jar
├── libfb303-0.9.2.jar
├── log4j-1.2.17.jar
└── slf4j-log4j12-1.7.15.jar
最後再執行:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
這時,讀者就可以看到手握栗子的可愛小松鼠了。
Flink SQL CLI 實踐
在 Flink 1.10 版本 中,Flink 社區對 SQL CLI 做了大量的改動,比如支持 View、支持更多的數據類型和 DDL 語句、支持分區讀寫、支持 INSERT OVERWRITE 等,實現了更多的 TableEnvironment API 的功能,更加方便用户使用。
接下來,筆者詳細講解 Flink SQL CLI。
0. Help
執行下面命令,登錄 Flink SQL 客户端:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
執行 HELP,查看 Flink SQL 支持的命令,如下為大部分常用的:
CREATE TABLE
DROP TABLE
CREATE VIEW
DESCRIBE
DROP VIEW
EXPLAIN
INSERT INTO
INSERT OVERWRITE
SELECT
SHOW FUNCTIONS
USE CATALOG
SHOW TABLES
SHOW DATABASES
SOURCE
USE
SHOW CATALOGS
1. Hive 操作
■ 1.1 創建表和導入數據
為了方便讀者進行實驗,筆者使用 ssb-dbgen 生成測試數據,讀者也可以使用測試環境已有的數據來進行實驗。
具體如何在 Hive 中一鍵式創建表並插入數據,可以參考筆者早期的項目 https://github.com/MLikeWater/ssb-kylin。
■ 1.2 Hive 表
查看上個步驟中創建的 Hive 表:
0: jdbc:hive2://xx.xxx.xxx.xxx:10000> show tables;
-------------- --
/ tab_name /
-------------- --
/ customer /
/ dates /
/ lineorder /
/ p_lineorder /
/ part /
/ supplier /
-------------- --
讀者可以對 Hive 進行各種查詢,對比後面 Flink SQL 查詢的結果。
2. Flink 操作
■ 2.1 通過 HiveCatalog 訪問 Hive 數據庫
登錄 Flink SQL CLI,並查詢 catalogs:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
Flink SQL> show catalogs;
default_catalog
staginghive
Flink SQL> use catalog staginghive;
通過 show catalogs 獲取配置的所有 catalog。由於筆者在 sql-client-hive.yaml 文件中設置了默認的 catalog,即為 staginghive。如果需要切換到其他 catalog,可以使用 usecatalog xxx。
■ 2.2 查詢 Hive 元數據
通過 Flink SQL 查詢 Hive 數據庫和表:
查詢表
Flink SQL> show tables;
customer
dates
lineorder
p_lineorder
part
supplier
12' and s_region = 'AMERICA'
> group by d_year, p_brand
> order by d_year, p_brand;
lo_revenue d_year p_brand
819634128 1998 MFGR1207
754489428 1998 MFGR1209
668482306 1998 MFGR1211
862902570 1998 MFGR14'
> group by d_year, s_city, p_brand
> order by d_year, s_city, p_brand;
d_year s_city p_brand profit
1998 UNITED ST9 MFGR 插入靜態分區的數據
Flink SQL> INSERT INTO flink_partition_test PARTITION SELECT 100001, 'Flink001';
插入動態分區
Flink SQL> INSERT INTO flink_partition_test SELECT 100002, 'Spark', '2020-02-02', 'SparkSQL';
動態和靜態分區結合使用類似,不再演示
# 覆蓋插入數據
Flink SQL> INSERT OVERWRITE flink_partition_test PARTITION SELECT 100002, 'Spark', '2020-02-08', 'SparkSQL-2.4';
id name day type
100002 Spark 2020-02-02 SparkSQL
100001 FlinkSQL 2020-02-01 Flink
字段 day 在 Flink 屬於關鍵字,要特殊處理。
■ 2.6 其他功能
2.6.1 函數
Flink SQL 支持內置的函數和自定義函數。對於內置的函數,可以執行 show functions 進行查看,這一塊筆者以後會單獨介紹如何創建自定義函數。
2.6.2 設置參數
Flink SQL 支持設置環境參數,可以使用 set 命令查看和設置參數:
Flink SQL> set;
deployment.gateway-address=
deployment.gateway-port=0
deployment.m=yarn-cluster
deployment.response-timeout=5000
deployment.yjm=1024
deployment.yn=2
deployment.ys=5
deployment.ytm=2048
execution.current-catalog=staginghive
execution.current-database=ssb
execution.max-idle-state-retention=0
execution.max-parallelism=128
execution.max-table-result-rows=1000000
execution.min-idle-state-retention=0
execution.parallelism=1
execution.periodic-watermarks-interval=200
execution.planner=blink
execution.restart-strategy.type=fallback
execution.result-mode=table
execution.time-characteristic=event-time
execution.type=batch
Flink SQL> set deployment.yjm = 2048;
總結
在本文中,筆者通過 Flink SQL 比較詳細地去操作 Hive 數據庫,以及 Flink SQL 提供的一些功能。
當然,目前 Flink SQL 操作 Hive 數據庫還是存在一些問題:
目前只支持 TextFile 存儲格式,還無法指定其他存儲格式
只支持 Hive 數據庫中 TextFile 存儲格式的表,而且 row format serde 是 org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe。雖然實現了 RCFile、ORC、Parquet、Sequence 等存儲格式,但是無法自動識別 Hive 表的存儲格式。如果要使用其他存儲格式,需要修改源碼,重新編譯。不過社區已經對這些存儲格式進行了測試,相信不久以後就可以在 Flink SQL 中使用。
OpenCSVSerde 支持不完善
如果讀者使用 TextFile 的 row format serde 為 org.apache.hadoop.hive.serde2.OpenCSVSerde 時,無法正確識別字段類型,會把 Hive 表的字段全部映射為 String 類型。
暫時不支持 Bucket 表
Flink SQL 優化方面功能較少
權限控制方面
這方面和 Spark SQL 類似,目前基於 HDFS ACL 控制,暫時還沒有實現 Sentry 或 Ranger 控制權限,不過目前 Cloudera 正在開發基於 Ranger 設置 Spark SQL 和 Hive 共享訪問權限的策略,實現行/列級控制以及審計信息。
Flink 社區發展很快,所有這些問題只是暫時的,隨着新版本的發佈會被逐個解決。
如果 Flink SQL 目前不滿足的需求,建議使用 API 方式來解決問題。