基於Openresty+CEPH實現海量數據管理系統
1. 需求:
作為一家專注於三維高精度地圖服務的公司,內部有海量(PB級)的原始數據、中間數據、成功數據,需要存儲、管理、並定期歸檔。
按項目管理數據,數據分類航飛數據、控制點數據、中間數據、成果數據、其他數據。數據來源包括無人機數據、載荷數據、地面站數據、人工打點數據等。不同渠道彙集而來的數據。
採用類似百度網盤的形式,上傳、下載,支持斷點續傳、進度跟蹤。
支持細化到文件級別的權限控制,以及更多的文件(夾)屬性。
2. 分析:
系統重點在於數據存儲的選型,支持海量數據的存儲,能夠支持在複雜網絡下的數據上傳。選用CEPH作為數據存儲,RGW對象存儲,S3協議上傳下載,完美支持分片和斷點續傳。
系統難點在於文件級別的業務權限控制,以及文件(夾)更多的屬性支持。CEPH RGW本身支持權限控制,但是無法和業務權限做對接。對象存儲本身沒有文件夾的概念,無法對文件夾做分類、數量展示、大小展示。所以實現自定義索引服務,CEPH主要負責存儲,自定義索引服務實現展示與查詢。
由於上傳下載會經過Openresty,通過lua腳本,將流經的文件信息,經由kafka轉發到業務服務中進行業務處理應用。
上傳助手就是類百度網盤的桌面端軟件,採用
)實現。主要實現功能:項目展示、上傳、下載。
業務層包括網關服務、賬號服務、項目服務、文件索引服務等。採用Java Spring Boot Spring Cloud技術棧。其中重點服務是文件索引服務Index Server,負責海量文件的索引維護和查詢。
業務數據MySQL集羣 Redis集羣,海量文件存儲使用CEPH對象存儲,支持S3 API。
上傳助手使用普通的Put Object請求上傳文件,加上自定義的metadata字段(項目ID、用户ID等)即可完成數據的提交。
Openresty使用proxy模式將文件請求轉發到 CEPH RGW,由RGW完成後台數據存儲處理。
Openresty在RGW完成數據存儲以後,調用log_by_lua_file將對應請求的用户自定義metadata和文件屬性轉發到後台Kafka。
文件索引服務(Index Server)從Kafka中消費任務,拿到每個文件的信息。
文件索引服務(Index Server)對文件數據按業務要求進行處理後,存入MySQL數據庫。
3.4 示例代碼
log_by_lua_file.lua:從Openresty獲取文件信息,併發往Kafka
local cjson = require "cjson"local producer = require "resty.kafka.producer"local broker_list = { { host = "172.16.0.20", port = 9092 },}function send_job_to_kafka() local log_json = {} local req_headers_ = ngx.req.get_headers() for k, v in pairs(req_headers_) do if k == "content-length" then log_json["contentLength"] = tostring(v) end if k == "u-id" then log_json["uId"] = tostring(v) end if k == "p-id" then log_json["pId"] = tostring(v) end end local resp_headers_ = ngx.resp.get_headers() for k, v in pairs(resp_headers_) do if k == "etag" then log_json["etag"] = string.gsub(v, "\"", "") break end end log_json["uri"] = ngx.var.uri log_json["host"] = ngx.var.host log_json["remoteAddr"] = ngx.var.remote_addr log_json["status"] = ngx.var.status local message = cjson.encode(log_json); ngx.log(ngx.ERR, "message is[", message, "]") return messageend--local is_args = ngx.var.is_argslocal request_method = ngx.var.request_methodlocal status_code = ngx.var.status-- 過濾Put Object成功的請求,記錄相應的metadata及請求ID,並轉發到kafkaif request_method == "PUT" and status_code == "200" then local bp = producer:new(broker_list, { producer_type = "async" }) local ok, err = bp:send("ceph_lua_test", nil, send_job_to_kafka()) if not ok then ngx.log(ngx.ERR, "kafka send err:", err) return end ngx.log(ngx.ERR, "kafka send success:", ok)end
4. 總結
通過此架構方案,在海量文件歸檔過程中,將文件基本信息異步導入到業務數據庫中,便於業務應用開發。
此架構一般也應用對象存儲的多媒體文件處理,比如圖片處理、視頻處理、加水印、鑑黃、事件通知等。