Kafka 0.9版本開始推出了Java版本的consumer,優(yōu)化了coordinator的設(shè)計(jì)以及擺脫了對(duì)zookeeper的依賴。社區(qū)最近也在探討正式用這套consumer API替換Scala版本的consumer的計(jì)劃。鑒于目前這方面的資料并不是很多,本文將嘗試給出一個(gè)利用KafkaConsumer編寫的多線程消費(fèi)者實(shí)例,希望對(duì)大家有所幫助。
這套API最重要的入口就是KafkaConsumer(o.a.k.clients.consumer.KafkaConsumer),普通的單線程使用方法官網(wǎng)API已有介紹,這里不再贅述了。因此,我們直奔主題——討論一下如何創(chuàng)建多線程的方式來使用KafkaConsumer。KafkaConsumer和KafkaProducer不同,后者是線程安全的,因此我們鼓勵(lì)用戶在多個(gè)線程中共享一個(gè)KafkaProducer實(shí)例,這樣通常都要比每個(gè)線程維護(hù)一個(gè)KafkaProducer實(shí)例效率要高。但對(duì)于KafkaConsumer而言,它不是線程安全的,所以實(shí)現(xiàn)多線程時(shí)通常由兩種實(shí)現(xiàn)方法:
1 每個(gè)線程維護(hù)一個(gè)KafkaConsumer
2 維護(hù)一個(gè)或多個(gè)KafkaConsumer,同時(shí)維護(hù)多個(gè)事件處理線程(worker thread)
當(dāng)然,這種方法還可以有多個(gè)變種:比如每個(gè)worker線程有自己的處理隊(duì)列。consumer根據(jù)某種規(guī)則或邏輯將消息放入不同的隊(duì)列。不過總體思想還是相同的,故這里不做過多展開討論了。
下表總結(jié)了兩種方法的優(yōu)缺點(diǎn):
優(yōu)點(diǎn) | 缺點(diǎn) | |
方法1(每個(gè)線程維護(hù)一個(gè)KafkaConsumer) | 方便實(shí)現(xiàn) 速度較快,因?yàn)椴恍枰魏尉€程間交互 易于維護(hù)分區(qū)內(nèi)的消息順序 | 更多的TCP連接開銷(每個(gè)線程都要維護(hù)若干個(gè)TCP連接) consumer數(shù)受限于topic分區(qū)數(shù),擴(kuò)展性差 頻繁請(qǐng)求導(dǎo)致吞吐量下降 線程自己處理消費(fèi)到的消息可能會(huì)導(dǎo)致超時(shí),從而造成rebalance |
方法2 (單個(gè)(或多個(gè))consumer,多個(gè)worker線程) | 可獨(dú)立擴(kuò)展consumer數(shù)和worker數(shù),伸縮性好 | 實(shí)現(xiàn)麻煩 通常難于維護(hù)分區(qū)內(nèi)的消息順序 處理鏈路變長,導(dǎo)致難以保證提交位移的語義正確性 |
下面我們分別實(shí)現(xiàn)這兩種方法。需要指出的是,下面的代碼都是最基本的實(shí)現(xiàn),并沒有考慮很多編程細(xì)節(jié),比如如何處理錯(cuò)誤等。
方法1
延伸閱讀
學(xué)習(xí)是年輕人改變自己的最好方式