




Spark Extension作為Spark Catalyst擴展點在SPARK-18127中被引入,Spark用戶可以在SQL處理的各個階段擴展自定義實現(xiàn),非常強大高效
在極光我們有自建的元數(shù)據(jù)管理平臺,相關(guān)元數(shù)據(jù)由各數(shù)據(jù)組件進行信息收集,其中對Spark SQL的血緣關(guān)系解析和收集就是通過自定義的Spark Extension實現(xiàn)的。
Spark Catalyst的SQL處理分成parser,analyzer,optimizer以及planner等多個步驟,其中analyzer,optimizer等步驟內(nèi)部也分為多個階段,為了獲取最有效的血緣關(guān)系信息,我們選擇最終的planner階段作為切入點,為此我們專門實現(xiàn)了一個planner strategy進行Spark SQL物理執(zhí)行計劃的解析,并提取出讀寫表等元數(shù)據(jù)信息并存儲到元數(shù)據(jù)管理平臺
在數(shù)據(jù)安全方面,極光選擇用Ranger作為權(quán)限管理等組件,但在實際使用的過程中我們發(fā)現(xiàn)目前社區(qū)版本的Ranger主要提供的還是HDFS、HBase、Hive、Yarn的相關(guān)接入插件,在Spark方面需要自己去實現(xiàn)相關(guān)功能,對于以上問題我們同樣選擇用Spark Extension去幫助我們進行權(quán)限方面的二次開發(fā),在實現(xiàn)的過程中我們借助了Ranger Hive-Plugin的實現(xiàn)原理,對Spark SQL訪問Hive進行了權(quán)限校驗功能的實現(xiàn)。
隨著數(shù)據(jù)平臺使用Spark SQL的業(yè)務同學越來越多,我們發(fā)現(xiàn)每個業(yè)務同學對于Spark的熟悉程度都有所不同,對Spark配置參數(shù)的理解也有好有壞,為了保障集群整體運行的穩(wěn)定性,我們對業(yè)務同學提交的Spark任務的進行了攔截處理,提取任務設(shè)置的配置參數(shù),對其中配置不合理的參數(shù)進行屏蔽,并給出風險提示,有效的引導業(yè)務同學進行合理的線上操作。
在Spark的實踐過程中,我們也積極關(guān)注業(yè)內(nèi)其它公司優(yōu)秀方案,在2020年我們參考字節(jié)跳動對于Spark Bucket Table的優(yōu)化思路,在此基礎(chǔ)上我們對極光使用的Spark進行了二次改造,完成如下優(yōu)化項:
上述三點的優(yōu)化,豐富了Bucket Join的使用場景,可以讓更多Join、Aggregate操作避免產(chǎn)生Shuffle,有效的提高了Spark SQL的運行效率.在完成相關(guān)優(yōu)化以后,如何更好的進行業(yè)務改造推廣,成為了我們關(guān)心的問題。
通過對數(shù)據(jù)平臺過往SQL執(zhí)行記錄的分析,我們發(fā)現(xiàn)用戶ID和設(shè)備ID的關(guān)聯(lián)查詢是十分高頻的一項操作,在此基礎(chǔ)上,我們通過之前SQL血緣關(guān)系解析收集到的元數(shù)據(jù)信息,對每張表進行Join、Aggregate操作的高頻字段進行了分析整理,統(tǒng)計出最為合適的Bucket Cloumn,并在這些元數(shù)據(jù)的支撐下輔助我們進行Bucket Table的推廣改造。
隨著公司業(yè)務的高速發(fā)展,在數(shù)據(jù)平臺上提交的SQL任務持續(xù)不斷增長,對任務的執(zhí)行時間和計算資源的消耗都提出了新的挑戰(zhàn),出于上述原因,我們提出了Hive任務遷移到Spark SQL的工作目標,由此我們總結(jié)出了如下問題需求:
在遷移業(yè)務job時,我們需要知道這個部門有哪些人,由于Azkaban在執(zhí)行具體job時會有執(zhí)行人信息,所以我們可以根據(jù)執(zhí)行人來推測有哪些job。分析程序使用了元數(shù)據(jù)系統(tǒng)的某些表數(shù)據(jù)和azkaban相關(guān)的一些庫表信息,用來幫助我們收集遷移的部門下有多少hive job,以及該hive job有多少sql,sql語法通過率是多少,當然在遷移時還需要查看Azkaban的具體執(zhí)行耗時等信息,用于幫助我們在精細化調(diào)參的時候大致判斷消耗的資源是多少。
由于線上直接檢測某條sql是否合乎spark語義需要具有相關(guān)的讀寫權(quán)限,直接開放權(quán)限給分析程序不安全。所以實現(xiàn)的思路是通過使用元數(shù)據(jù)系統(tǒng)存儲的庫表結(jié)構(gòu)信息,以及azkaban上有采集業(yè)務job執(zhí)行的sql信息。只要擁有某條sql所需要的全部庫表信息,我們就能在本地通過重建庫表結(jié)構(gòu)分析該條sql是否合乎spark語義(當然線上環(huán)境和本地是有不同的,比如函數(shù)問題,但大多情況下是沒有問題的)。
圖3-1-1
以下為某數(shù)據(jù)部通過分析程序得到的SQL通過率
目前業(yè)務方使用Hive的主要方式是通過beeline去連接hiveserver2,由于livy也提供了thriftserver模塊,所以beeline也可以直接連接livy。遷移的策略就是先把合乎Spark語法的SQL發(fā)往livy執(zhí)行,如果執(zhí)行失敗再切換到Hive進行兜底執(zhí)行。
beeline可獲取用戶SQL,啟動beeline時通過thrift接口創(chuàng)建livy session,獲取用戶sql發(fā)送給livy 執(zhí)行,期間執(zhí)行進度等信息可以查詢livy獲得,同時一個job對應一個session,以及每啟動一次beeline對應一個session,當job執(zhí)行完畢或者beeline被關(guān)閉時,關(guān)閉livy session。(如果spark不能成功執(zhí)行則走之前hive的邏輯)
圖3-2-1
有了以上切換思路以后,我們開始著手beeline程序的修改設(shè)計。
beeline重要類圖如圖3-2-2所示, Beeline類是啟動類,獲取用戶命令行輸入并調(diào)用Commands類去 執(zhí)行,Commands負責調(diào)用JDBC接口去執(zhí)行和獲取結(jié)果, 單向調(diào)用流程如圖3-2-3所示。
圖3-2-2
圖3-2-3
由圖3-2-2和圖3-2-3可知,所有的操作都是通過DatabaseConnection這個對象去完成的,持有這個 對象的是DatabaseConnections這個對象,所以多計算引擎切換,通過策略適DatabaseConnections對象,這樣就能在不修改其他代碼的情況下切換執(zhí)行引擎(即獲取不同的connection)
圖3-2-4
前文有說到,當一個Hive任務用SQL分析程序走通,并且在遷移程序用livy進行Spark任務提交以后,還是會有可能執(zhí)行失敗,這個時候我們會用Hive進行兜底執(zhí)行保障任務穩(wěn)定性。但是失敗的SQL會有多種原因,有的SQL確實用Hive執(zhí)行穩(wěn)定性更好,如果每次都先用Spark SQL執(zhí)行失敗以后再用Hive執(zhí)行會影響任務效率,基于以上目的,我們對遷移程序開發(fā)了黑名單功能,用來保障每個SQL可以找到它真正適合的執(zhí)行引擎,考慮到beeline是輕量級客戶端,識別的功能應該放在livy-server側(cè)來做,開發(fā)一個類似HBO的功能來將這樣的異常SQL加入黑名單,節(jié)省遷移任務執(zhí)行時間。
目標: 基于HBE(History-Based Executing)的異常SQL識別
有了上述目標以后我們主要通過如下方式進行了SQL黑名單的識別切換
今年經(jīng)過遷移程序的遷移改造,HSQL最大降幅為50%+(后隨今年業(yè)務增長有所回升)
當前極光使用的Spark默認版本已經(jīng)從2.X版本升級到了3.X版本,Spark3.X的AQE特性也輔助我們更好的使用Spark
實踐配置優(yōu)化:
#spark3.0.0參數(shù)
#動態(tài)合并shuffle partitions
spark.sql.adaptive.coalescePartitions.enabled true
spark.sql.adaptive.coalescePartitions.minPartitionNum 1
spark.sql.adaptive.coalescePartitions.initialPartitionNum 500
spark.sql.adaptive.advisoryPartitionSizeInBytes 128MB
#動態(tài)優(yōu)化數(shù)據(jù)傾斜,通過實際的數(shù)據(jù)特性考慮,skewedPartitionFactor我們設(shè)置成了1
spark.sql.adaptive.skewJoin.enabled true
spark.sql.adaptive.skewJoin.skewedPartitionFactor 1
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 512MB
目前針對線上運行的Spark任務,我們正在開發(fā)一套Spark全鏈路監(jiān)控平臺,作為我們大數(shù)據(jù)運維平臺的一部分,該平臺會承擔對線上Spark任務運行狀態(tài)的采集監(jiān)控工作,我們希望可以通過該平臺及時定位發(fā)現(xiàn)資源使用浪費、寫入大量小文件、存在slow task等問題的Spark任務,并以此進行有針對性的優(yōu)化,讓數(shù)據(jù)平臺可以更高效的運行。
最后打個小廣告,極光數(shù)據(jù)平臺團隊,主要負責極光數(shù)據(jù)平臺(DP)、離線計算(Spark、Hive、Yarn)、海量存儲(HDFS、HBase、Kafka)、實時計算(Flink)、數(shù)據(jù)倉庫(DW)的開發(fā)建設(shè)工作,歡迎感興趣的小伙伴聯(lián)系caizg@jiguang.cn
熱門文章
相關(guān)文章
極光官方微信公眾號
關(guān)注我們,即時獲取最新極光資訊
現(xiàn)在注冊,領(lǐng)取新人大禮包