作者 | 李勁松(之信)
策劃 | 蔡芳芳
基於 Hive 的離線數倉往往是企業大資料生產系統中不可缺少的一環。Hive 數倉有很高的成熟度和穩定性,但由於它是離線的,延時很大。在一些對延時要求比較高的場景,需要另外搭建基於 Flink 的實時數倉,將鏈路延時降低到秒級。但是一套離線數倉加一套實時數倉的架構會帶來超過兩倍的資源消耗,甚至導致重複開發。
想要搭建流式鏈路就必須得拋棄現有的 Hive 數倉嗎?並不是,藉助 Flink 可以實現已有的 Hive 離線數倉準實時化。本文整理自 Apache Flink Committer、阿里巴巴技術專家李勁松 在 InfoQ 技術公開課的分享,文章將分析當前離線數倉實時化的難點,詳解 Flink 如何解決 Hive 流批一體準實時數倉的難題,實現更高效、合理的資源配置。文章大綱如下:
離線數倉實時化的難點
Flink 在流批一體的探索
構建流批一體準實時數倉應用實踐
1
離線數倉實時化的難點
離線數倉
上圖是一個典型的離線數倉,假設現在公司有一個需求,目前公司的資料量很大,需要每天出一個報表且輸出到業務資料庫中。首先是剛入庫的業務資料,大致分為兩種,一種是 MySQL 的 binlog,另外一種是業務系統中的業務打點,這個日誌打點資訊可以透過 Flume 等工具去採集,再離線入庫到數倉中。然後隨著業務越來越多,業務中的各個表可以做一些抽象,抽象的好處是更好的管理和更高效的資料複用和計算複用。所以數倉就分成了多層 (明細層、中間層、服務層等等),每一層存的是資料表,資料表之間透過 HiveSQL 的計算來實現 ETL 轉換。
不止是 HiveSQL ,Hive 只是靜態的批計算,而業務每天都要出報表,這意味著每天都要進行計算,這種情況下會依賴於排程工具和血緣管理:
排程工具:按照某個策略把批計算排程起來。
血緣管理:一個任務是由許多個作業組合而成,可能有非常複雜的表結構層次,整個計算是一個非常複雜的拓撲,作業間的依賴關係非常複雜 (減少冗餘儲存和計算,也可以有較好的容錯),只有當一級結束後才能進行下一級的計算。
當任務十分龐大的時候,我們得出結果往往需要很長的一段時間,也就是我們常說的 T+1,H+1 ,這就是離線數倉的問題。
第三方工具
上面說過,離線數倉不僅僅是簡單的 Hive 計算,它還依賴了其它的第三方工具,比如:
使用 Flume 來入庫,但存在一定的問題,首先,它的容錯可能無法保證 Exactly-Once 效果,需要下游再次進行去重操作。其次,自定義邏輯需要透過一些手段,比如指令碼來控制。第三,離線數倉並不具備良好的擴充套件能力,當資料劇增時,增加原本的併發數就比較困難了。
基於排程工具的作業排程會帶來級聯的計算延遲,比如凌晨 1 點開始計算昨天的資料,可能需要到早上 6、7 點才能做完,並且無法保證在設定的排程時間內資料可以完全 ready 。此外,級聯的計算還會帶來複雜的血緣管理問題,大任務的 Batch 計算可能會突然打滿叢集的資源,所以也要求我們對於負載管理進行考量,這些都會給業務增加負擔。
無論是離線數倉還是第三方工具,其實主要的問題還是“慢”,如何解決慢的問題,此時就該實時數倉出場了。
實時數倉
實時數倉其實是從 Hive+HDFS 的組合換成了 Kafka,ETL 的功能透過 Flink 的流式處理解決。此時就不存在排程和血緣管理的問題了,透過實時不斷的增量更新,最終輸出到業務的 DB 中。
雖然延時降低了,但此時我們會面臨另外一些問題:
歷史資料丟失,因為 Kafka 只是臨時的儲存介質,資料會有一個超時的時間 (比如只儲存 7 天的資料),這會導致我們的歷史資料丟失。
成本相對較高,實時計算的成本要大於離線計算。
Lambda 架構
所以此時很多人就會選擇一套實時一套離線的做法,互不干擾,根據任務是否需要走實時的需求來對需求進行分離。
這套架構看似解決了所有問題,但實際帶來的問題也是非常多。首先,Lambda 架構造成了離線和實時的割裂問題,它們解決的業務問題都是一樣的,但是兩套方案讓同樣的資料來源產生了不同的計算結果。不同層級的表結構可能不一致,並且當資料產生不一致的問題時,還需要去進行比對排查。
隨著這套 Lambda 架構越走越遠,開發團隊、表結構表依賴、計算模型等都可能會被割裂開,越到後面越會發現,成本越來越高,而統一的代價越來越大。
那麼問題來了,實時數倉會耗費如此大的資源,且還不能保留歷史資料,Lambda 架構存在如此多的問題,有什麼方案可以解決呢?
資料湖
資料湖擁有不少的優點,原子性可以讓我們做到準實時的批流一體,並且支援已有資料的修改操作。但是畢竟資料湖是新一代數倉儲存架構,各方面都還不是很完美,目前已有的資料湖都強依賴於 Spark(當然 Flink 也正在擁抱資料湖),將資料遷移到資料湖需要團隊對遷移成本和人員學習成本進行考量。
如果沒有這麼大的決心遷移資料湖,那有沒有一個稍微緩和一些的方案加速已有的離線數倉呢?
2
Flink 在批流一體上的探索
統一元資料
Flink 一直持續致力於離線和實時的統一,首先是統一元資料。簡單來說就是把 Kafka 表的元資料資訊儲存到 HiveMetaStore 中,做到離線和實時的表 Meta 的統一。
(目前開源的實時計算並沒有一個較為完善的持久化 MetaStore,Hive MetaStore 不僅能儲存離線表,也可以承擔實時計算的 MetaStore 能力)。
統一計算引擎
同樣的元資料之後,實時和離線的表結構和層次可以設計成一樣,接下來就是可以共用:
同一套 SQL,Flink 自身提供批流一體的 ANSI-SQL 語法,可以大大減小使用者 SQL 開發者和運維者的負擔,讓使用者專注於業務邏輯。
同一個引擎,Flink 的流和批覆用一套最佳化和 Runtime 框架,現階段的大資料引擎還遠遠達不到完全穩定的情況,所以仍然有很多時候需要我們去深入的分析和最佳化,一套引擎可以讓開發者專注單個技術棧,避免需要接觸多個技術棧,而只有技術廣度,沒有技術深度。
統一資料
分析了元資料和計算引擎的統一,更進一步,是否能統一實時和離線的資料,避免資料的不一致,避免資料的重複儲存和重複計算。ETL 計算是否能統一呢?既然實時表設計上可以和離線表一模一樣,是否可以乾脆只有實時表的 ETL 計算,離線表從實時表裡獲取資料?
並且,透過實時鏈路可以加速離線鏈路的資料準備,批計算可以把排程換成流輸入。
Flink Hive/File Streaming Sink 即為解決這個問題,實時 Kafka 表可以實時的同步到對於的離線表中:
離線表作為實時的歷史資料,填補了實時數倉不存在歷史資料的空缺。
資料批次準實時攝入為 Ad-hoc 查詢離線表提供了準實時輸入。
此時離線的批計算也可以交由實時排程,在實時任務處理中某個契機 (Partition Commit 見後續) 自行排程離線那塊的任務進行資料同步操作。
此時實時和離線的表已經基本統一,那麼問題來了,Kafka 中的表和 Hive 中的表能否就共用一張表呢?我的想法是之後可能會出現以下情況,在數倉中定義一張表,分別對應著 Kafka 和 Hive+HDFS 兩種物理儲存:
使用者在進行 insert 操作時,就自然插入到了 Kafka 的實時 table 當中,同時生成另外一條鏈路,自動同步到 Hive Table 當中。這樣這一張表就非常的完整,不僅滿足實時的需求,而且擁有歷史的資料。
一個 SQL 讀取這樣的一個 Hybrid Source ,根據你的查詢語句後面的 where 條件,自動路由到 Hive 的歷史資料,或者是 Kafka 的實時資料。根據一定的規則先讀 Hive 歷史資料,再讀 Kafka 實時資料,當然這裡有一個問題,它們之間透過什麼標識來切換呢?一個想法是資料中或者 Kafka 的 Timestamp。
Hive Streaming Sink 的實現
Flink 1.11 前已經有了 StreamingFileSink,在 1.11 中不但把它整合到 SQL 中,讓這個 Hive Streaming Sink 可以像離線的 Hive SQL 那樣,所有的業務邏輯都由 SQL 去處理,而且帶來了進一步的增量。
接下來介紹下 Hive/File Streaming Sink,分為兩個元件,FileWriter 和 PartitionCommitter:
FileWriter 元件可以做到分割槽感知,透過 checkpoint 機制可以保證 Exactly-Once(分散式場景是不可靠的,需要透過兩階段提交 + 檔案 Rename 的冪等性),FileWriter 也提供了 Rolling 相關的引數,這個 Rolling 指的是我們的流式處理過程,它可以透過兩個引數來控制執行頻率,file-size 就是每個資料流的大小,rollover-interval 就是時長間隔。但是需要注意,checkpoint 不宜設定太頻繁,以免產生過多的小檔案。
Partition Committer,透過一系列的業務邏輯處理後得到的 Finished Flies 就直接可用了嗎?因為我們典型的 Hive 表都是分割槽表,當一個分割槽就緒後,還需要通知下游,Partition 已經處理完成,可以同步到 Hive metastore 中了。我們需要在合適的時機來有效的 trigger 特定的 Partition commit。Partition committer 總的來說,就是完成了 Hive 分割槽表的資料及元資料的寫入,甚至可以完成通知排程系統開始執行之後的 Batch 作業。
因為流式作業是不間斷的在執行的,如何設定分割槽提交的時間,某個分割槽什麼時候提交它呢?
第一種是預設策略 Process time ,也就是我們所說的事件被處理時的當前系統時間,但是缺點也比較明顯,可能出現各種各樣的資料不完整。
推薦策略就是 partition-time,這種策略可以做到提交時的語義明確且資料完整,partition 欄位就是由 event time ,也就是事件產生的時間所得到的。
如果當前時間 Current time > 分割槽產生的時間 + commitDelay 延時,即是可以開始進行分割槽提交的時間。一個簡單的例子是小時分割槽,比如當前已經 12 點過 1 分了,已經過了 11 點的分割槽 + 一個小時,所以我們可以說不會再有 11 點分割槽的資料過來了,就可以提交 11 點的分割槽。(要是有 LateEvent 怎麼辦?所以也要求分割槽的提交是冪等的。)
接下來介紹分割槽的提交具體作用,最直接的就是寫 SuccessFile 和 Add partition 到 Hive metastore。
Flink 內建支援了 Hive-MetaStore 和 SuccessFile,只要配置"sink.partition-commit.policy.kind" 為 "metastore,success-file",即可做到在 commit 分割槽的時候自動 add 分割槽到 Hive 中,而且寫 SuccessFile,當 add 操作完成的時候,這個 partition 才真正的對 Hive 可見。
Custom 機制允許自定義一個 Partition Commit Policy 的類,實現這個類可以做到在這個分割槽的任務處理完成後:比如觸發下游的排程、Statistic Analysis、又或者觸發 Hive 的小檔案合併。(當然觸發 Hive 的小檔案合併不但需要啟動另一個作業,而且做不到一致性保證,後續 Flink 也會有進一步的探索,在 Flink 作業中,主動完成小檔案的合併)。
實時消費
不止是準實時的資料攝入,Flink 也帶來了維表關聯 Hive 表和流實時消費 Hive 表。
我們知道 Flink 是支援維表關聯查詢 MySQL 和 HBase 的,在計算中維護一個 LRU 的快取,未命中查詢 MySQL 或 HBase。但是沒有 Lookup 的能力怎麼辦呢?資料一般是放在離線數倉中的,所以業務上我們一般採用 Hive Table 定期同步到 HBase 或者 MySQL。Flink 也可以允許直接維表關聯 Hive 表,目前的實現很簡單,需要在每個併發中全量 Load Hive 表的所有資料,只能針對小表的關聯。
傳統的 Hive Table 只支援按照批的方式進行讀取計算,但是我們現在可以使用流的方式來監控 Hive 裡面的分割槽 / 檔案生成,也就是每一條資料過來,都可以實時的進行消費計算,它也是完全複用 Flink Streaming SQL 的方式,可以和 HBase、MySQL、Hive Table 進行 Join 操作,最後再透過 FileWriter 實時寫入到 Hive Table 中。
3
構建流批一體準實時數倉應用實踐
案例如下:透過 Flume 採集日誌打點 Logs,計算各年齡層的 PV,此時我們存在兩條鏈路:
一條是實時鏈路,透過輸入訪問日誌,關聯 Hive 的 User 表來計算出所需要的結果到業務 DB 中。
而另一條則是離線鏈路,我們需要 Hive 提供小時分割槽表,來實現對歷史資料的 Ad-hoc 查詢。
這裡就是我們剛剛提到的,雖然是對應兩個 database:realtime_db 和 offline_db,但是它們共用一份元資料。
對於 Hive 表我們可以透過 Flink SQL 提供的 Hive dialect 語法,然後透過 Hive 的 DDL 語法來在 Flink 中建立 Hive 表,這裡設定 PARTITION BY 天和小時,是與實時鏈路的不同之處,因為實時鏈路是沒有分割槽概念的。
如何在表結構裡避免分割槽引起的 Schema 差異?一個可以解決的方案是考慮引入 Hidden Partition 的定義,Partition 的欄位可以是某個欄位的 Computed Column,這也可以與實際常見的情況做對比,如天或小時是由時間欄位計算出的,之後是下面的三個引數:
sink.partition-commit.trigger,指定什麼時候進行 partition 的 commit,這裡設定了 partition-time,用於保證 exactly-once;
partition.time-extractor.timestamp-pattern,怎樣從 partition 中提取時間,相當於設定了一個提取格式;
sink.partition-commit.policy.kind,既 partition commit 所要進行的操作,也就是剛剛提到的 metastore,success-file。
之後設定回預設的 Flink dialect,建立 Kafka 的實時表,透過 insert into 將 Kafka 中的資料同步到 Hive 之中。
透過實時 Pipeline 的手段消費 Hive Table,而不是透過排程或者以往手動觸發的 batch 作業,第一個引數 streaming-source.enable,開啟流處理機制,然後使用 start-offset 引數指定從哪個分割槽 / 檔案開始消費。此時,整個流批一體準實時數倉應用基本算是完成啦。
未來規劃
Hive 作為分割槽級別管理的 Table Format 在一些方便有比較大的限制,如果是新型的 Table Format 比如 Iceberg 會有更好的支援,未來 Flink 會在下面幾個方面加強:
Flink Hive/File Streaming Sink 的 Auto Compaction(Merging) 能力,小檔案是實時的最大阻礙之一。
Flink 擁抱 Iceberg,目前在社群中已經開發完畢 Iceberg Sink,Iceberg Source 正在推進中,可以看見在不遠的將來,可以直接將 Iceberg 當做一個訊息佇列,且,它儲存了所有的歷史資料,達到真正的流批統一。
增強 Flink Batch 的 Shuffle,目前完全的 Hash Shuffle 帶來了很多問題,比如小檔案、隨機 IO、Buffer 管理帶來的 OOM,後續開源 Flink (1.12) 會加強力量引入 SortedShuffle 以及 ShuffleService。
Flink Batch BoundedStream 支援,舊的 Dataset API 已經不能滿足流批統一的架構,社群 (1.12) 會在 DataStream 上提供 Batch 計算的能力。
更多細節,可以檢視 InfoQ 公開課的完整影片回放:
https://live.infoq.cn/room/390
講師介紹:
李勁松,花名之信,阿里巴巴技術專家,Apache Flink Committer。2014 年起專注於阿里內部 Galaxy 流計算框架;2017 年起開始 Flink 研發,主要專注於 Batch 計算、資料結構與型別。
今日薦文