楠木軒

基於Flink+Hive構建流批一體準實時數倉

由 睢風娥 發佈於 科技

作者 | 李勁松(之信)

策劃 | 蔡芳芳

基於 Hive 的離線數倉往往是企業大數據生產系統中不可缺少的一環。Hive 數倉有很高的成熟度和穩定性,但由於它是離線的,延時很大。在一些對延時要求比較高的場景,需要另外搭建基於 Flink 的實時數倉,將鏈路延時降低到秒級。但是一套離線數倉加一套實時數倉的架構會帶來超過兩倍的資源消耗,甚至導致重複開發。

想要搭建流式鏈路就必須得拋棄現有的 Hive 數倉嗎?並不是,藉助 Flink 可以實現已有的 Hive 離線數倉準實時化。本文整理自 Apache Flink Committer、阿里巴巴技術專家李勁松 在 InfoQ 技術公開課的分享,文章將分析當前離線數倉實時化的難點,詳解 Flink 如何解決 Hive 流批一體準實時數倉的難題,實現更高效、合理的資源配置。文章大綱如下:

離線數倉實時化的難點

Flink 在流批一體的探索

構建流批一體準實時數倉應用實踐

1

離線數倉實時化的難點

離線數倉

上圖是一個典型的離線數倉,假設現在公司有一個需求,目前公司的數據量很大,需要每天出一個報表且輸出到業務數據庫中。首先是剛入庫的業務數據,大致分為兩種,一種是 MySQL 的 binlog,另外一種是業務系統中的業務打點,這個日誌打點信息可以通過 Flume 等工具去採集,再離線入庫到數倉中。然後隨着業務越來越多,業務中的各個表可以做一些抽象,抽象的好處是更好的管理和更高效的數據複用和計算複用。所以數倉就分成了多層 (明細層、中間層、服務層等等),每一層存的是數據表,數據表之間通過 HiveSQL 的計算來實現 ETL 轉換。

不止是 HiveSQL ,Hive 只是靜態的批計算,而業務每天都要出報表,這意味着每天都要進行計算,這種情況下會依賴於調度工具和血緣管理:

調度工具:按照某個策略把批計算調度起來。

血緣管理:一個任務是由許多個作業組合而成,可能有非常複雜的表結構層次,整個計算是一個非常複雜的拓撲,作業間的依賴關係非常複雜 (減少冗餘存儲和計算,也可以有較好的容錯),只有當一級結束後才能進行下一級的計算。

當任務十分龐大的時候,我們得出結果往往需要很長的一段時間,也就是我們常説的 T+1,H+1 ,這就是離線數倉的問題。

第三方工具

上面説過,離線數倉不僅僅是簡單的 Hive 計算,它還依賴了其它的第三方工具,比如:

使用 Flume 來入庫,但存在一定的問題,首先,它的容錯可能無法保證 Exactly-Once 效果,需要下游再次進行去重操作。其次,自定義邏輯需要通過一些手段,比如腳本來控制。第三,離線數倉並不具備良好的擴展能力,當數據劇增時,增加原本的併發數就比較困難了。

基於調度工具的作業調度會帶來級聯的計算延遲,比如凌晨 1 點開始計算昨天的數據,可能需要到早上 6、7 點才能做完,並且無法保證在設置的調度時間內數據可以完全 ready 。此外,級聯的計算還會帶來複雜的血緣管理問題,大任務的 Batch 計算可能會突然打滿集羣的資源,所以也要求我們對於負載管理進行考量,這些都會給業務增加負擔。

無論是離線數倉還是第三方工具,其實主要的問題還是“慢”,如何解決慢的問題,此時就該實時數倉出場了。

實時數倉

實時數倉其實是從 Hive+HDFS 的組合換成了 Kafka,ETL 的功能通過 Flink 的流式處理解決。此時就不存在調度和血緣管理的問題了,通過實時不斷的增量更新,最終輸出到業務的 DB 中。

雖然延時降低了,但此時我們會面臨另外一些問題:

歷史數據丟失,因為 Kafka 只是臨時的存儲介質,數據會有一個超時的時間 (比如只保存 7 天的數據),這會導致我們的歷史數據丟失。

成本相對較高,實時計算的成本要大於離線計算。

Lambda 架構

所以此時很多人就會選擇一套實時一套離線的做法,互不干擾,根據任務是否需要走實時的需求來對需求進行分離。

這套架構看似解決了所有問題,但實際帶來的問題也是非常多。首先,Lambda 架構造成了離線和實時的割裂問題,它們解決的業務問題都是一樣的,但是兩套方案讓同樣的數據源產生了不同的計算結果。不同層級的表結構可能不一致,並且當數據產生不一致的問題時,還需要去進行比對排查。

隨着這套 Lambda 架構越走越遠,開發團隊、表結構表依賴、計算模型等都可能會被割裂開,越到後面越會發現,成本越來越高,而統一的代價越來越大。

那麼問題來了,實時數倉會耗費如此大的資源,且還不能保留歷史數據,Lambda 架構存在如此多的問題,有什麼方案可以解決呢?

數據湖

數據湖擁有不少的優點,原子性可以讓我們做到準實時的批流一體,並且支持已有數據的修改操作。但是畢竟數據湖是新一代數倉存儲架構,各方面都還不是很完美,目前已有的數據湖都強依賴於 Spark(當然 Flink 也正在擁抱數據湖),將數據遷移到數據湖需要團隊對遷移成本和人員學習成本進行考量。

如果沒有這麼大的決心遷移數據湖,那有沒有一個稍微緩和一些的方案加速已有的離線數倉呢?

2

Flink 在批流一體上的探索

統一元數據

Flink 一直持續致力於離線和實時的統一,首先是統一元數據。簡單來説就是把 Kafka 表的元數據信息存儲到 HiveMetaStore 中,做到離線和實時的表 Meta 的統一。

(目前開源的實時計算並沒有一個較為完善的持久化 MetaStore,Hive MetaStore 不僅能保存離線表,也可以承擔實時計算的 MetaStore 能力)。

統一計算引擎

同樣的元數據之後,實時和離線的表結構和層次可以設計成一樣,接下來就是可以共用:

同一套 SQL,Flink 自身提供批流一體的 ANSI-SQL 語法,可以大大減小用户 SQL 開發者和運維者的負擔,讓用户專注於業務邏輯。

同一個引擎,Flink 的流和批覆用一套優化和 Runtime 框架,現階段的大數據引擎還遠遠達不到完全穩定的情況,所以仍然有很多時候需要我們去深入的分析和優化,一套引擎可以讓開發者專注單個技術棧,避免需要接觸多個技術棧,而只有技術廣度,沒有技術深度。

統一數據

分析了元數據和計算引擎的統一,更進一步,是否能統一實時和離線的數據,避免數據的不一致,避免數據的重複存儲和重複計算。ETL 計算是否能統一呢?既然實時表設計上可以和離線表一模一樣,是否可以乾脆只有實時表的 ETL 計算,離線表從實時表裏獲取數據?

並且,通過實時鏈路可以加速離線鏈路的數據準備,批計算可以把調度換成流輸入。

Flink Hive/File Streaming Sink 即為解決這個問題,實時 Kafka 表可以實時的同步到對於的離線表中:

離線表作為實時的歷史數據,填補了實時數倉不存在歷史數據的空缺。

數據批量準實時攝入為 Ad-hoc 查詢離線表提供了準實時輸入。

此時離線的批計算也可以交由實時調度,在實時任務處理中某個契機 (Partition Commit 見後續) 自行調度離線那塊的任務進行數據同步操作。

此時實時和離線的表已經基本統一,那麼問題來了,Kafka 中的表和 Hive 中的表能否就共用一張表呢?我的想法是之後可能會出現以下情況,在數倉中定義一張表,分別對應着 Kafka 和 Hive+HDFS 兩種物理存儲:

用户在進行 insert 操作時,就自然插入到了 Kafka 的實時 table 當中,同時生成另外一條鏈路,自動同步到 Hive Table 當中。這樣這一張表就非常的完整,不僅滿足實時的需求,而且擁有歷史的數據。

一個 SQL 讀取這樣的一個 Hybrid Source ,根據你的查詢語句後面的 where 條件,自動路由到 Hive 的歷史數據,或者是 Kafka 的實時數據。根據一定的規則先讀 Hive 歷史數據,再讀 Kafka 實時數據,當然這裏有一個問題,它們之間通過什麼標識來切換呢?一個想法是數據中或者 Kafka 的 Timestamp。

Hive Streaming Sink 的實現

Flink 1.11 前已經有了 StreamingFileSink,在 1.11 中不但把它集成到 SQL 中,讓這個 Hive Streaming Sink 可以像離線的 Hive SQL 那樣,所有的業務邏輯都由 SQL 去處理,而且帶來了進一步的增量。

接下來介紹下 Hive/File Streaming Sink,分為兩個組件,FileWriter 和 PartitionCommitter:

FileWriter 組件可以做到分區感知,通過 checkpoint 機制可以保證 Exactly-Once(分佈式場景是不可靠的,需要通過兩階段提交 + 文件 Rename 的冪等性),FileWriter 也提供了 Rolling 相關的參數,這個 Rolling 指的是我們的流式處理過程,它可以通過兩個參數來控制執行頻率,file-size 就是每個數據流的大小,rollover-interval 就是時長間隔。但是需要注意,checkpoint 不宜設置太頻繁,以免產生過多的小文件。

Partition Committer,通過一系列的業務邏輯處理後得到的 Finished Flies 就直接可用了嗎?因為我們典型的 Hive 表都是分區表,當一個分區就緒後,還需要通知下游,Partition 已經處理完成,可以同步到 Hive metastore 中了。我們需要在合適的時機來有效的 trigger 特定的 Partition commit。Partition committer 總的來説,就是完成了 Hive 分區表的數據及元數據的寫入,甚至可以完成通知調度系統開始執行之後的 Batch 作業。

因為流式作業是不間斷的在運行的,如何設置分區提交的時間,某個分區什麼時候提交它呢?

第一種是默認策略 Process time ,也就是我們所説的事件被處理時的當前系統時間,但是缺點也比較明顯,可能出現各種各樣的數據不完整。

推薦策略就是 partition-time,這種策略可以做到提交時的語義明確且數據完整,partition 字段就是由 event time ,也就是事件產生的時間所得到的。

如果當前時間 Current time > 分區產生的時間 + commitDelay 延時,即是可以開始進行分區提交的時間。一個簡單的例子是小時分區,比如當前已經 12 點過 1 分了,已經過了 11 點的分區 + 一個小時,所以我們可以説不會再有 11 點分區的數據過來了,就可以提交 11 點的分區。(要是有 LateEvent 怎麼辦?所以也要求分區的提交是冪等的。)

接下來介紹分區的提交具體作用,最直接的就是寫 SuccessFile 和 Add partition 到 Hive metastore。

Flink 內置支持了 Hive-MetaStore 和 SuccessFile,只要配置"sink.partition-commit.policy.kind" 為 "metastore,success-file",即可做到在 commit 分區的時候自動 add 分區到 Hive 中,而且寫 SuccessFile,當 add 操作完成的時候,這個 partition 才真正的對 Hive 可見。

Custom 機制允許自定義一個 Partition Commit Policy 的類,實現這個類可以做到在這個分區的任務處理完成後:比如觸發下游的調度、Statistic Analysis、又或者觸發 Hive 的小文件合併。(當然觸發 Hive 的小文件合併不但需要啓動另一個作業,而且做不到一致性保證,後續 Flink 也會有進一步的探索,在 Flink 作業中,主動完成小文件的合併)。

實時消費

不止是準實時的數據攝入,Flink 也帶來了維表關聯 Hive 表和流實時消費 Hive 表。

我們知道 Flink 是支持維表關聯查詢 MySQL 和 HBase 的,在計算中維護一個 LRU 的緩存,未命中查詢 MySQL 或 HBase。但是沒有 Lookup 的能力怎麼辦呢?數據一般是放在離線數倉中的,所以業務上我們一般採用 Hive Table 定期同步到 HBase 或者 MySQL。Flink 也可以允許直接維表關聯 Hive 表,目前的實現很簡單,需要在每個併發中全量 Load Hive 表的所有數據,只能針對小表的關聯。

傳統的 Hive Table 只支持按照批的方式進行讀取計算,但是我們現在可以使用流的方式來監控 Hive 裏面的分區 / 文件生成,也就是每一條數據過來,都可以實時的進行消費計算,它也是完全複用 Flink Streaming SQL 的方式,可以和 HBase、MySQL、Hive Table 進行 Join 操作,最後再通過 FileWriter 實時寫入到 Hive Table 中。

3

構建流批一體準實時數倉應用實踐

案例如下:通過 Flume 採集日誌打點 Logs,計算各年齡層的 PV,此時我們存在兩條鏈路:

一條是實時鏈路,通過輸入訪問日誌,關聯 Hive 的 User 表來計算出所需要的結果到業務 DB 中。

而另一條則是離線鏈路,我們需要 Hive 提供小時分區表,來實現對歷史數據的 Ad-hoc 查詢。

這裏就是我們剛剛提到的,雖然是對應兩個 database:realtime_db 和 offline_db,但是它們共用一份元數據。

對於 Hive 表我們可以通過 Flink SQL 提供的 Hive dialect 語法,然後通過 Hive 的 DDL 語法來在 Flink 中創建 Hive 表,這裏設置 PARTITION BY 天和小時,是與實時鏈路的不同之處,因為實時鏈路是沒有分區概念的。

如何在表結構裏避免分區引起的 Schema 差異?一個可以解決的方案是考慮引入 Hidden Partition 的定義,Partition 的字段可以是某個字段的 Computed Column,這也可以與實際常見的情況做對比,如天或小時是由時間字段計算出的,之後是下面的三個參數:

sink.partition-commit.trigger,指定什麼時候進行 partition 的 commit,這裏設置了 partition-time,用於保證 exactly-once;

partition.time-extractor.timestamp-pattern,怎樣從 partition 中提取時間,相當於設置了一個提取格式;

sink.partition-commit.policy.kind,既 partition commit 所要進行的操作,也就是剛剛提到的 metastore,success-file。

之後設置回默認的 Flink dialect,創建 Kafka 的實時表,通過 insert into 將 Kafka 中的數據同步到 Hive 之中。

通過實時 Pipeline 的手段消費 Hive Table,而不是通過調度或者以往手動觸發的 batch 作業,第一個參數 streaming-source.enable,打開流處理機制,然後使用 start-offset 參數指定從哪個分區 / 文件開始消費。此時,整個流批一體準實時數倉應用基本算是完成啦。

未來規劃

Hive 作為分區級別管理的 Table Format 在一些方便有比較大的限制,如果是新型的 Table Format 比如 Iceberg 會有更好的支持,未來 Flink 會在下面幾個方面加強:

Flink Hive/File Streaming Sink 的 Auto Compaction(Merging) 能力,小文件是實時的最大阻礙之一。

Flink 擁抱 Iceberg,目前在社區中已經開發完畢 Iceberg Sink,Iceberg Source 正在推進中,可以看見在不遠的將來,可以直接將 Iceberg 當做一個消息隊列,且,它保存了所有的歷史數據,達到真正的流批統一。

增強 Flink Batch 的 Shuffle,目前完全的 Hash Shuffle 帶來了很多問題,比如小文件、隨機 IO、Buffer 管理帶來的 OOM,後續開源 Flink (1.12) 會加強力量引入 SortedShuffle 以及 ShuffleService。

Flink Batch BoundedStream 支持,舊的 Dataset API 已經不能滿足流批統一的架構,社區 (1.12) 會在 DataStream 上提供 Batch 計算的能力。

更多細節,可以查看 InfoQ 公開課的完整視頻回放:

https://live.infoq.cn/room/390

講師介紹:

李勁松,花名之信,阿里巴巴技術專家,Apache Flink Committer。2014 年起專注於阿里內部 Galaxy 流計算框架;2017 年起開始 Flink 研發,主要專注於 Batch 計算、數據結構與類型。

今日薦文