Kafka 0.9版本開(kāi)始推出了Java版本的consumer,優(yōu)化了coordinator的設(shè)計(jì)以及擺脫了對(duì)zookeeper的依賴。社區(qū)最近也在探討正式用這套consumer API替換Scala版本的consumer的計(jì)劃。鑒于目前這方面的資料并不是很多,本文將嘗試給出一個(gè)利用KafkaConsumer編寫(xiě)的多線程消費(fèi)者實(shí)例,希望對(duì)大家有所幫助。

    這套API最重要的入口就是KafkaConsumer(o.a.k.clients.consumer.KafkaConsumer),普通的單線程使用方法官網(wǎng)API已有介紹,這里不再贅述了。因此,我們直奔主題——討論一下如何創(chuàng)建多線程的方式來(lái)使用KafkaConsumer。KafkaConsumer和KafkaProducer不同,后者是線程安全的,因此我們鼓勵(lì)用戶在多個(gè)線程中共享一個(gè)KafkaProducer實(shí)例,這樣通常都要比每個(gè)線程維護(hù)一個(gè)KafkaProducer實(shí)例效率要高。但對(duì)于KafkaConsumer而言,它不是線程安全的,所以實(shí)現(xiàn)多線程時(shí)通常由兩種實(shí)現(xiàn)方法:

1 每個(gè)線程維護(hù)一個(gè)KafkaConsumer

移動(dòng)開(kāi)發(fā)培訓(xùn),Android培訓(xùn),安卓培訓(xùn),手機(jī)開(kāi)發(fā)培訓(xùn),手機(jī)維修培訓(xùn),手機(jī)軟件培訓(xùn)

2  維護(hù)一個(gè)或多個(gè)KafkaConsumer,同時(shí)維護(hù)多個(gè)事件處理線程(worker thread)

移動(dòng)開(kāi)發(fā)培訓(xùn),Android培訓(xùn),安卓培訓(xùn),手機(jī)開(kāi)發(fā)培訓(xùn),手機(jī)維修培訓(xùn),手機(jī)軟件培訓(xùn)

當(dāng)然,這種方法還可以有多個(gè)變種:比如每個(gè)worker線程有自己的處理隊(duì)列。consumer根據(jù)某種規(guī)則或邏輯將消息放入不同的隊(duì)列。不過(guò)總體思想還是相同的,故這里不做過(guò)多展開(kāi)討論了。

  下表總結(jié)了兩種方法的優(yōu)缺點(diǎn): 


優(yōu)點(diǎn)缺點(diǎn)
方法1(每個(gè)線程維護(hù)一個(gè)KafkaConsumer)方便實(shí)現(xiàn)
速度較快,因?yàn)椴恍枰魏尉€程間交互
易于維護(hù)分區(qū)內(nèi)的消息順序
更多的TCP連接開(kāi)銷(每個(gè)線程都要維護(hù)若干個(gè)TCP連接)
consumer數(shù)受限于topic分區(qū)數(shù),擴(kuò)展性差
頻繁請(qǐng)求導(dǎo)致吞吐量下降
線程自己處理消費(fèi)到的消息可能會(huì)導(dǎo)致超時(shí),從而造成rebalance
方法2 (單個(gè)(或多個(gè))consumer,多個(gè)worker線程)可獨(dú)立擴(kuò)展consumer數(shù)和worker數(shù),伸縮性好

實(shí)現(xiàn)麻煩

通常難于維護(hù)分區(qū)內(nèi)的消息順序

處理鏈路變長(zhǎng),導(dǎo)致難以保證提交位移的語(yǔ)義正確性 

 

下面我們分別實(shí)現(xiàn)這兩種方法。需要指出的是,下面的代碼都是最基本的實(shí)現(xiàn),并沒(méi)有考慮很多編程細(xì)節(jié),比如如何處理錯(cuò)誤等。

方法1

網(wǎng)友評(píng)論