摘要:本文由墨芷技術團隊唐鐸老師分享,主要講述其技術團隊內部引入流計算的整個過程,包括最初的決策、期間的取捨以及最終落地,一路走來他們的思考、感悟以及經驗分享。
初識 Flink
為什麼一定要上 Flink
一個小例子
總結
Tips:“實時即未來”在很多人的眼中可能只是一句口號,但在墨芷,這是他們親手創造的故事。
大家好,我們是浙江墨芷信息科技有限公司,一個剛剛滿3年的創業團隊,主營業務是電商代運營,目前是淘寶四星級服務商。
我們的核心團隊先後服務於國內知名女裝、家電、母嬰、男裝、童裝、珠寶飾品、化妝品等多個品類知名品牌商,具有豐富的品牌運營管理經驗,服務過的品牌均在行業前列。
主營業務圍繞泛時尚領域互聯網平台品牌運營及全網品牌推廣,涉及品牌定位與推廣、電商運營、商品企劃與經營、視覺設計、營銷推廣、顧客服務、倉儲物流等綜合端到端服務。
本文將分享墨芷與流計算結緣的故事。
01 初識Flink
第一次接觸 Flink 流計算是在18年9月的雲棲大會上,大沙老師與在場以及線上的開發者們分享 Flink,會場座無虛席,會場門外還圍着三五層的聽眾。雖然老師的講解時間不長,聽的也是一知半解,卻有種很強烈感覺,“實時,即是未來”。
從雲棲小鎮回來後,跟自己的團隊討論了一下,大家決定向 Flink 開進,但前進的難度是我們沒有預料到的。那個時候學習資料很少,一本《Flink 基礎教程》被我們翻來覆去的看,動手實操門檻較高,進度非常不理想。
圖1 雲棲大會流計算分會場
19年3月,有幸參加了在杭州舉行的 Flink 用户交流會,報名時只是抱着學習的心態去旁聽,但到現場後震驚了,參會的不僅是 Flink 的深度用户,更甚的是每位都來自估值百億以上的大廠。無論是討論的內容還是出身都讓我們感到自卑。
回來之後的第二天,一起去的五個人不約而同的都到公司加班,即便不説透,這次會議給大家帶來的心麗衝擊是巨大的,也促使了我們下定決心,即便難度再大也要把 Flink 應用起來。
在此一個月之後,我們用 Java 編寫的 Flink Job 上線了,即便實現的功能很簡單,但這是我們堅實的一小步。
圖2 社區裏廣為流傳的一張照片
2020年年初,疫情肆虐,團隊人員變動,客觀條件使我們不得不放棄之前用 Java 編寫的一切,轉投 Python。這個決定極其艱難,我們很清楚,一切將回到原點。
但我們與 Flink 的緣分還沒結束。剛好,我們看到社區發起了 PyFlink 扶持計劃,於是郵件諮詢,也有幸被眷顧。接下來的一個月時間,我們在金竹、付典、斷塵幾位老師的幫助下,將原有的 Flink Job 遷移到了 PyFlink 上,同時也帶着需求去學習 PyFlink 的特性。這才有了與大家分享學習成果的機會。
02 為什麼一定要上Flink
説到這,一定有同行問,為啥一個小微企業還要上流計算,用得上嗎?
我們面臨的是若干個嚴峻的事實:
人員數量的膨脹帶來了成倍的開銷。公司用了3年時間,將團隊規模擴張到的150人,在嘉興這個小城市裏這是很不容易的一件事,而且主業是電商代運營,這種工作更像我們軟件行業的項目外包。一提到外包,同行們肯定會聯想到人力配備,簡單講,有項目做才能養活人,沒項目的話,閒置的人力成本就是虧本買賣。
人效提升困難,規定再嚴格的 KPI 也會有瓶頸。同事們每天上班第一件事就是發前一天的銷售業績,只是這個小小的日報,就要耗費半個小時的時間,數據的時效又是“T 1”,略顯滯後。
在做直通車推廣時,由於同事的疏忽,一些已經不再需要付費推廣或可以降低競價的商品還在按照原計劃持續燒錢,人工監控很難及時地發現這些問題。
作為 IT 規劃的主導者,一直以來我都希望可以依託團隊在電商經營上豐厚的經驗及操盤能力,這樣目標很明瞭,就是搭建我們自己的數據實時決策平台。
決策,我們暫且拆開來看,決斷與策略。團隊自有經驗及做事的判斷邏輯,我們把它劃到策略一側,現在我們缺少的是“決斷的能力”,決斷既要考慮準確性,又要顧及時效性,當然,如果決斷時能漸進地優化策略也是極好的。所以我們大致規劃了圖3中的架構。從下至上依次為我們的 DataSource,Swarm,DW,NB,Radical。數據逐層向上被收集,保存,計算,展現,應用,而Flink在數據的生命週期內擔當實時計算的重要任務。
還記得電商場景下商家被薅羊毛的新聞嗎?
目前沒有任何一款電商 ERP 有針對這方面的功能設計。如果可以編寫一個基於 Flink 流計算的實時監控異常銷售情況的小插件,在獲取到訂單中的實付金額去比對之前的商品價格,再結合最新的庫存計算後判斷得出結果,適時彈出告警,那樣的悲劇是否可以避免?
當然,對於電商場景下實時計算的應用點可以開的腦洞是沒有邊界的,況且,如果以上的系統通過不斷地迭代和優化,是否會代替人工成本呢?如果做到了,那一定是一個新的開端。
項目來項目走,短短三年,我們這個小微企業沒有記錄多大的數據量,無非就是店鋪的運營和訂單數據,數據採集平台幫助我們秒級地監控在運營的15家店鋪,每個店鋪有60多個數據監控點。但只有依託 Flink 的流計算,我們才能儘早地從查看到我們想要的數據結果並且做出正確的決策,今天給大家分享的例子也是在這個背景上的。
圖3 架構圖與技術棧
03 一個小例子
根據我們自身的需求以及 Flink 的特性,我們搭建了一個基於 Flink 流計算的實時監控系統,以監測異常情況。以下是一個線上商品價格實時監控的小例子,這是我們在參加 PyFlink 扶持計劃這段時間裏完成的,希望可以讓大家感受到 PyFlink 開發的便捷。
項目背景
公司裏有一個美妝經銷項目,即存在旗艦店的同時有數以千計的經銷商店鋪,業務同事希望可以通過技術手段監控經銷商店鋪的商品價格不低於旗艦店,避免影響旗艦店銷售,於是我們想到如下思路:
圖4 問題解決思路
實踐過程
根據以上思路,我們先採集到了如下數據樣例:
{"shop_name": "經銷商1",
"item_name": "凝時精華",
"item_url": "https://*****",
"item_img": "https://*****",
"item_price": 200.00,
"discount_info": "",
"item_size": ""},
{"shop_name": "經銷商2",
"item_name": "精華油1",
"item_url": "https://*****",
"item_img": "https://",
"item_price": 200.00,
"discount_info": "",
"item_size": "125ml"}
然後,根據數據樣例可以編寫註冊 Kafka source 的方法。
.topic
.start_from_earliest()
.property
.property
) \
.with_format
.fail_on_missing_field
.schema)) \
.with_schema
.field)
.field)
.field)
.field)
.field)
.field)
.field)
) \
.in_append_mode() \
.register_table_source
商品價格參考的 CSV 文件的數據樣例為:
1,精華油1,125ml,**,************,敏感肌可用的全身精華油,****,200,180
2,精華油1,200ml,**,************,提亮美白、淡化疤痕,****,300,280
3,按摩油1,125ml,**,************,有效增加肌膚彈性,****,200,180
4,按摩油1,200ml,**,************,持續柔潤滋養肌膚,****,300,280
5,按摩油2,125ml,**,************,潤彈緊緻,深層滋潤肌膚,****,300,280
6,沐浴露,500ml,**,************,舒緩鎮靜,滋潤乾燥的皮膚,防止肌膚乾燥,****,100,80
7,凝時精華,4x6ml,**,************,密集淡紋 多效抗氧 緊緻彈潤,****,200,180
8,精華油2,30ml,**,************,改善脆弱敏感幹皮。小分子精華滲透肌底,密集補水,****,200,180
9,潔面凝膠,200ml,**,************,痘肌潔面優選,****,100,80
於是我們可以編寫註冊 CSV source 的方法。
數據源文件
source_file = '/demo_job1/控價表.csv'
序號
.field) 容量
.field) 條形碼
.field) 貿易方式
.field) 單瓶最低到手價
) \
.with_schema
.field) 商品名稱
.field) 箱規
.field) 功效
.field) 主圖顯示價格
.field) register csv sink
def register_sink:
result_file = "./result.csv"
sink_field = {
"shop_name": DataTypes.STRING(),
"item_name": DataTypes.STRING(),
"item_url": DataTypes.STRING(),
"item_img": DataTypes.STRING(),
"item_price": DataTypes.STRING(),
"discount_info": DataTypes.STRING(),
-*- coding: utf-8 -*-
import re
import logging
from pyflink.table import DataTypes
from pyflink.table.udf import udf, ScalarFunction
標準商品名稱
regexes = re.compile
items =
items_set = []
for index, value in enumerate:
items_set.append)) 先匹配商品名稱之外的字符,再去掉
sub_str = re.sub
spbt = re.sub, '', item_name) 找到最為匹配的商品標題,否則認作未知商品
intersection_len = 0
items_index = None
for index, value in enumerate:
j = value & set) 初始化 商品價格與商品規格
price = 0 if len == 0 else float
item_size = ''
Named Function
@udf)
def get_min_price:
""" 按需計算優惠價格 """
price = 0 if len == 0 else float
如果有優惠券信息,則計算最低價
min_price = price
mayby_price = []
if len >= 0:
regexes_v2 = re.compile
for i in coupons:
a = re.findall
cut_price = min, float)
flag_price = max, float)
if flag_price <= price:
mayby_price.append
if len > 0:
min_price = min
return min_price
query
def calculate_func:
識別商品名稱與基礎信息表對應
"item_url, "
"item_img, "
"item_price, "
"discount_info, "
"item_size"
) \
.select as item_size, " 根據頁面價與優惠信息計算最低價
) \
.select as discount_info, " 右表為基礎信息表
right = st_env.from_path \
.select
result = left.join.where 輸出 join 結果至 csv sink
init env
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism register source
register_rides_source_from_csv
register_rides_source_from_kafka
register function
st_env.register_function
st_env.register_function
st_env.register_function
st_env.register_function
execute
st_env.execute
04 總結
工程上線後,由於數據收集端在不停的提供數據,藉助流計算,我們已經鑑別出 200 個涉嫌違規定價的商品鏈接,在維護品牌力和價格力的同時,避免旗艦店銷售損失 40 餘萬,而這只是我們眾多監控用 Flink Job 中的一個。
在本着“將企業越做越小,市場越做越大”思路的今天,使用 IT 技術來代替人工作業已經是最快捷的一條路,即便像我們這種小微企業並不像大廠那樣是技術主導,只要產出能被業務同事喜歡使用且提高工作效率的功能,同時被公司高層所重視,那我們的工作就是有意義的且可持續的。
如果你也像我們一樣,以 Python 語言為主,開發工作多為數據分析和實時決策,又渴望享受流計算帶來的準確、高效和便捷,那麼,歡迎加入 PyFlink 生態,讓我們一起為她的明天添磚加瓦。同時 Flink 1.11 版本也預計將在 6 月中下旬發佈,屆時 PyFlink 將攜 Pandas 強勢來襲。
最後,再次感謝在扶持計劃中所有幫助過我們的人!總之, PyFlink,你值得擁有。
如果您也對 PyFlink 社區扶持計劃感興趣,可以填寫下方問卷,與我們一起共建 PyFlink 生態。
PyFlink 社區扶持計劃: https://survey.aliyun.com/apps/zhiliao/B5JOoruzY