Flink 與 Hive 的磨合期

有不少讀者反饋,參考上篇文章《Hive 終於等來了 Flink》部署 Flink 並集成 Hive 時,出現一些 bug 以及兼容性等問題。雖已等來,卻未可用。所以筆者增加了這一篇文章,作為姊妹篇。

回顧

在上篇文章中,筆者使用的 CDH 版本為 5.16.2,其中 Hive 版本為 1.1.0,Flink 源代碼本身對 Hive 1.1.0 版本兼容性不好,存在不少問題。為了兼容目前版本,筆者基於 CDH 5.16.2 環境,對 Flink 代碼進行了修改,重新打包並部署。

其實經過很多開源項目的實戰,比如 Apache Atlas,Apache Spark 等,Hive 1.2.x 和 Hive 1.1.x 在大部分情況下,替換一些 Jar 包,是可以解決兼容性的問題。對於筆者的環境來説,可以使用 Hive 1.2.1 版本的一些 Jar 包來代替 Hive 1.1.0 版本的 Jar 包。在本篇文章的開始部分,筆者會解決這個問題,然後再補充上篇文章缺少的實戰內容。

剪不斷理還亂的問題

根據讀者的反饋,筆者將所有的問題總結為三類:

Flink 如何連接 Hive 除了 API 外,有沒有類似 spark-sql 命令

識別不到 Hadoop 環境或配置文件找不到

依賴包、類或方法找不到

1. Flink 如何連接 Hive

有的讀者不太清楚,如何配置 Flink 連接 Hive 的 Catalog,這裏補充一個完整的 conf/sql-client-hive.yaml 示例:

catalogs:

- name: staginghive

type: hive

hive-conf-dir: /etc/hive/conf

hive-version: 1.2.1

execution:

planner: blink

type: batch

time-characteristic: event-time

periodic-watermarks-interval: 200

result-mode: table

max-table-result-rows: 1000000

parallelism: 1

max-parallelism: 128

min-idle-state-retention: 0

max-idle-state-retention: 0

current-catalog: staginghive

current-database: ssb

restart-strategy:

type: fallback

deployment:

response-timeout: 5000

gateway-address: ""

gateway-port: 0

m: yarn-cluster

yn: 2

ys: 5

yjm: 1024

ytm: 2048

sql-client-hive.yaml 配置文件裏面包含:

Hive 配置文件 catalogs 中配置了 Hive 的配置文件路徑。

Yarn 配置信息 deployment 中配置了 Yarn 的配置信息。

執行引擎信息 execution 配置了 blink planner,並且使用 batch 模式。batch 模式比較穩定,適合傳統的批處理作業,而且可以容錯,另外中間數據落盤,建議開啓壓縮功能。除了 batch,Flink 也支持 streaming 模式。

■ Flink SQL CLI 工具

類似 spark-sql 命令,Flink 提供了 SQL CLI 工具,即 sql-client.sh 腳本。在 Flink 1.10 版本中,Flink SQL CLI 改進了很多功能,筆者後面講解。

sql-client.sh 使用方式如下:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

2. 識別不到 Hadoop 環境或配置文件找不到

筆者在上篇文章中提到過,在部署 Flink 的環境上部署 CDH gateway,包括 Hadoop、Hive 客户端,另外還需要配置一些環境變量,如下:

export HADOOP_CONF_DIR=/etc/hadoop/conf

export YARN_CONF_DIR=/etc/hadoop/conf

export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive

export HIVE_CONF_DIR=/etc/hive/conf

3. 依賴包、類或方法找不到

先查看一下 Flink 家目錄下的 lib 目錄:

$ tree lib

lib

├── flink-connector-hive_2.11-1.10.0.jar

├── flink-dist_2.11-1.10.0.jar

├── flink-hadoop-compatibility_2.11-1.10.0.jar

├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar

├── flink-table_2.11-1.10.0.jar

├── flink-table-blink_2.11-1.10.0.jar

├── hive-exec-1.1.0-cdh5.16.2.jar

├── hive-metastore-1.1.0-cdh5.16.2.jar

├── libfb303-0.9.3.jar

├── log4j-1.2.17.jar

└── slf4j-log4j12-1.7.15.jar

如果上面前兩個問題都解決後,執行如下命令:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

報錯,報錯,還是報錯:

Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory

其實在運行 sql-client.sh 腳本前,需要指定 Hadoop 環境的依賴包的路徑,建議不要報錯一個添加一個,除非有的讀者喜歡。這裏筆者提示一個方便的方式,即設置 HADOOPCLASSPATH環境變量:

export HADOOP_CLASSPATH=`hadoop classpath`

再次執行:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

很抱歉,繼續報錯:

Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession at org.apache.flink.table.client.SqlClient.start at org.apache.flink.table.client.SqlClient.main Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client

這裏就是 Hive 1.1.0 版本的 Jar 包與 Flink 出現版本不兼容性的問題了,解決方法是:

下載 apache-hive-1.2.1 版本

替換 Flink lib 目錄下的 Hive Jar 包 刪除掉 hive-exec-1.1.0-cdh5.16.2.jar、 hive-metastore-1.1.0-cdh5.16.2.jar 和 libfb303-0.9.3.jar,然後添加 hive-exec-1.2.1.jar、 hive-metastore-1.2.1.jar 和 libfb303-0.9.2.jar,再次查看 lib 目錄:

$ tree lib

lib

├── flink-connector-hive_2.11-1.10.0.jar

├── flink-dist_2.11-1.10.0.jar

├── flink-hadoop-compatibility_2.11-1.10.0.jar

├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar

├── flink-table_2.11-1.10.0.jar

├── flink-table-blink_2.11-1.10.0.jar

├── hive-exec-1.2.1.jar

├── hive-metastore-1.2.1.jar

├── libfb303-0.9.2.jar

├── log4j-1.2.17.jar

└── slf4j-log4j12-1.7.15.jar

最後再執行:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

這時,讀者就可以看到手握栗子的可愛小松鼠了。

Flink SQL CLI 實踐

在 Flink 1.10 版本 中,Flink 社區對 SQL CLI 做了大量的改動,比如支持 View、支持更多的數據類型和 DDL 語句、支持分區讀寫、支持 INSERT OVERWRITE 等,實現了更多的 TableEnvironment API 的功能,更加方便用户使用。

接下來,筆者詳細講解 Flink SQL CLI。

0. Help

執行下面命令,登錄 Flink SQL 客户端:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

執行 HELP,查看 Flink SQL 支持的命令,如下為大部分常用的:

CREATE TABLE

DROP TABLE

CREATE VIEW

DESCRIBE

DROP VIEW

EXPLAIN

INSERT INTO

INSERT OVERWRITE

SELECT

SHOW FUNCTIONS

USE CATALOG

SHOW TABLES

SHOW DATABASES

SOURCE

USE

SHOW CATALOGS

1. Hive 操作

■ 1.1 創建表和導入數據

為了方便讀者進行實驗,筆者使用 ssb-dbgen 生成測試數據,讀者也可以使用測試環境已有的數據來進行實驗。

具體如何在 Hive 中一鍵式創建表並插入數據,可以參考筆者早期的項目 https://github.com/MLikeWater/ssb-kylin。

■ 1.2 Hive 表

查看上個步驟中創建的 Hive 表:

0: jdbc:hive2://xx.xxx.xxx.xxx:10000> show tables;

-------------- --

/ tab_name /

-------------- --

/ customer /

/ dates /

/ lineorder /

/ p_lineorder /

/ part /

/ supplier /

-------------- --

讀者可以對 Hive 進行各種查詢,對比後面 Flink SQL 查詢的結果。

2. Flink 操作

■ 2.1 通過 HiveCatalog 訪問 Hive 數據庫

登錄 Flink SQL CLI,並查詢 catalogs:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

Flink SQL> show catalogs;

default_catalog

staginghive

Flink SQL> use catalog staginghive;

通過 show catalogs 獲取配置的所有 catalog。由於筆者在 sql-client-hive.yaml 文件中設置了默認的 catalog,即為 staginghive。如果需要切換到其他 catalog,可以使用 usecatalog xxx。

■ 2.2 查詢 Hive 元數據

通過 Flink SQL 查詢 Hive 數據庫和表:

查詢表

Flink SQL> show tables;

customer

dates

lineorder

p_lineorder

part

supplier

12' and s_region = 'AMERICA'

> group by d_year, p_brand

> order by d_year, p_brand;

lo_revenue d_year p_brand

819634128 1998 MFGR1207

754489428 1998 MFGR1209

668482306 1998 MFGR1211

862902570 1998 MFGR14'

> group by d_year, s_city, p_brand

> order by d_year, s_city, p_brand;

d_year s_city p_brand profit

1998 UNITED ST9 MFGR 插入靜態分區的數據

Flink SQL> INSERT INTO flink_partition_test PARTITION SELECT 100001, 'Flink001';

插入動態分區

Flink SQL> INSERT INTO flink_partition_test SELECT 100002, 'Spark', '2020-02-02', 'SparkSQL';

動態和靜態分區結合使用類似,不再演示

# 覆蓋插入數據

Flink SQL> INSERT OVERWRITE flink_partition_test PARTITION SELECT 100002, 'Spark', '2020-02-08', 'SparkSQL-2.4';

id name day type

100002 Spark 2020-02-02 SparkSQL

100001 FlinkSQL 2020-02-01 Flink

字段 day 在 Flink 屬於關鍵字,要特殊處理。

■ 2.6 其他功能

2.6.1 函數

Flink SQL 支持內置的函數和自定義函數。對於內置的函數,可以執行 show functions 進行查看,這一塊筆者以後會單獨介紹如何創建自定義函數。

2.6.2 設置參數

Flink SQL 支持設置環境參數,可以使用 set 命令查看和設置參數:

Flink SQL> set;

deployment.gateway-address=

deployment.gateway-port=0

deployment.m=yarn-cluster

deployment.response-timeout=5000

deployment.yjm=1024

deployment.yn=2

deployment.ys=5

deployment.ytm=2048

execution.current-catalog=staginghive

execution.current-database=ssb

execution.max-idle-state-retention=0

execution.max-parallelism=128

execution.max-table-result-rows=1000000

execution.min-idle-state-retention=0

execution.parallelism=1

execution.periodic-watermarks-interval=200

execution.planner=blink

execution.restart-strategy.type=fallback

execution.result-mode=table

execution.time-characteristic=event-time

execution.type=batch

Flink SQL> set deployment.yjm = 2048;

總結

在本文中,筆者通過 Flink SQL 比較詳細地去操作 Hive 數據庫,以及 Flink SQL 提供的一些功能。

當然,目前 Flink SQL 操作 Hive 數據庫還是存在一些問題:

目前只支持 TextFile 存儲格式,還無法指定其他存儲格式

只支持 Hive 數據庫中 TextFile 存儲格式的表,而且 row format serde 是 org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe。雖然實現了 RCFile、ORC、Parquet、Sequence 等存儲格式,但是無法自動識別 Hive 表的存儲格式。如果要使用其他存儲格式,需要修改源碼,重新編譯。不過社區已經對這些存儲格式進行了測試,相信不久以後就可以在 Flink SQL 中使用。

OpenCSVSerde 支持不完善

如果讀者使用 TextFile 的 row format serde 為 org.apache.hadoop.hive.serde2.OpenCSVSerde 時,無法正確識別字段類型,會把 Hive 表的字段全部映射為 String 類型。

暫時不支持 Bucket 表

Flink SQL 優化方面功能較少

權限控制方面

這方面和 Spark SQL 類似,目前基於 HDFS ACL 控制,暫時還沒有實現 Sentry 或 Ranger 控制權限,不過目前 Cloudera 正在開發基於 Ranger 設置 Spark SQL 和 Hive 共享訪問權限的策略,實現行/列級控制以及審計信息。

Flink 社區發展很快,所有這些問題只是暫時的,隨着新版本的發佈會被逐個解決。

如果 Flink SQL 目前不滿足的需求,建議使用 API 方式來解決問題。

版權聲明:本文源自 網絡, 於,由 楠木軒 整理發佈,共 7502 字。

轉載請註明: Flink 與 Hive 的磨合期 - 楠木軒