Apache Kafka Notlarım
Kuyruk sistemlerinin geçmişten günümüze tarihçesi
Apache Kafka’nın özelliklerini yazarak başlayayım sonra irdelerim. Önce Kafka nedir diye başlayalım.
Linkedin’de doğdu açık kaynak oldu, helal olsun sana Apache Kafka, Apache Kafka :)
Apache Kafka, büyük ölçekte veri toplamak, işlemek, depolamak ve entegre etmek için kullanılan bir veri akışı platformudur. Dağıtılmış akış, akış işleme, veri entegrasyonu ve pub/sub mesajlaşması dahil olmak üzere çok sayıda kullanım durumu vardır.
Örneğin 4 farklı kaynaktan (Elasticsearc, MongoDB, WikiMedia …) veriyi alıp 4 farklı hedefe veriyi göndermek isteseydiniz 16 entegrasyon yazmanız gerekirdi. Belki her bir akışın farklı protokol (TCP, HTTP, FTP, gRPC vb.), veri biçimi (binary, json, avro, thrift, protobuf, xml vb.) ve bunların şemalı olma durumlara göre veri şeması işlerini de yüklenecektiniz. Bir de veri yükünün yüksek olmasına bağlı akışı ölçekleme, hata olduğunda yedeklilik (failover) durumlarını da ele almanız gerekecekti. Gök tanrıya şükür ki Kafka var.
Mesajlaşma, etkinlik takibi, farklı yerlerden metrik ve günlük toplama, akışları işleme, mikro hizmetlerin birbirleriyle iletişimi gibi işleri Spark, Flink, Storm, Hadoop ve diğer büyük veri teknolojilerine entegre bir şekilde yapıyor.
- Distributed (dağıtık): Tek bir sunucu üzerinden merkezi çalışabilse dahi, birden fazla kafka sunucusuyla da hizmet verebilir.
- Streaming Platform: Öyle bir platform ki
veri
akışı sağlayabiliyor :) - Gerçek Zamanlı Veri İşleme: Verilerin sunucuya ulaşmasıyla, işleme alınmasını sağlar (10ms’den az gecikme)
- Dayanıklı (fault tolerant): Sunucu arızalarına karşın veriler diske yazılır ve farklı sunucularda (çoğaltılarak) saklanır yani hata toleranslı. Bir sunucu bir hata yapıp hakka yürümüş olsa dahi diğeri anlayışlı davranarak devam eder. Ayrıca diğer kuyruk sistemlerinden farklı olarak mesajları iletse dahi bir veritabanı gibi saklar.
- Bölümleme (partitioning): Tek bir veritabanında veriyi organize etmek için kullanılır, dikey ve yatay olmak üzere iki türü vardır.
- Parçalama (Sharding): Büyük bir tablonun verilerini dağıtmak için, küçük tablolar halinde farklı veritabanı sunucularına bölmektir. Genellikle yatay bölümleme olarak kabul edilir. Sadece sunucular değişir ancak tabloya dair her türlü tanım
Kuyruk sistemlerini kıyaslarken ‘iletim modeli’ ve ‘saklama modeli’ olarak ayrıştırabiliriz.
Yukarıdaki grafikte iletim modeline göre ayrımlarını görüyorsunuz. Saklama modeli; bellek, disk, bellek+disk (hibrit) olarak 3 tiptir. RabbitMQ önce belleğe sonra diske yazarak hibrit modelini desteklerken, Kafka doğrudan diske yazar.
Kafka Ekosistemi
Kafka dağıtılmış bir sistemdir; yani birden fazla aracıdan (broker
) oluşan (bir tane lider ve n tane takipçi - follower) kümeli hizmet sunar. Veriler, kümedeki herhangi bir aracıda olabilecek belirli bir bölüm (partition) için LİDER’den okunur ve LİDER’e yazılır. Takipçiler liderin verilerini kendilerine kopyalar. Böylece sistemin dayanıklılığı (durability) artar.
Bir istemci (üretici/tüketici) işe koyulduğunda, seçtiği bölüm için hangi aracının lider olduğu hakkında meta veri ister ve bunu herhangi bir broker
’dan edinebilir. Döndürülen meta veriler, söz konusu bölüm için Lider broker için mevcut uç noktaları içerecek ve istemci daha sonra bu uç noktaları kullanarak gerektiğinde veri okumak/yazmak üzere broker’a bağlanacaktır.
Kafkanın gücü ekosisteminden geliyor gelin bu ekosistemin bileşenlerini görelim:
Kafka Broker
- Brokerlar, Kafka’nın temel yapı taşlarıdır ve üreticilerden gelen mesajları depolar ve tüketicilere dağıtan sunuculardır.
- Yeni brokerlar ekleyerek yatay olarak ölçeklenebilir. Bu, veri yükünün daha fazla broker arasında dağıtılmasıyla yüksek performanslı mesaj aktarımı sağlar.
num.io.threads
Parametresi I/O işlemleri için kullanılacak thread sayısını belirler. Bu değer artırıldığında, daha fazla kaynak etkin bir şekilde kullanılabilir. - Veri güvenliği için her bir konu, birden fazla parçaya (partition) bölünür ve birden fazla kopya (replikasyon) halinde birden fazla broker üzerinde çoğaltılırak saklar. Bu parçalar, brokerlar arasında dağıtılarak yük dengelemesi yapılır. (aşağıda bir topic için partition sayısı girildiğini ve her kafka broker’ı üzerinde okuma+yazma ve okuma olacak şekilde broker’lara görevler yüklenerek bu konuların işlendiğini göreceksiniz)
Yukarıdaki çizimde 3 broker var ve konu için 1 bölüm tercih edilmiş.
------ TOPIC ---- BROKER 101 ---- BROKER 102 ---- BROKER 103
Partition 0 ---- Lider ---- Takipçi ---- Takipçi
Controller (process.roles)
Controller rolü küme içinde bir broker’a verilir. Nedir controller? Kafka kümesinin yönetiminden sorumlu broker ‘controller’ olur. Daha geniş tanımla Controller, Kafka kümelerinin metadata yönetimini (örneğin, yeni topic oluşturma, partition replica’ların yönetimi vb.) üstlenen bileşendir. Topic yaratıldığıdan partition liderlerini atar, yeni broker’ları küme yapısına ekler ve mevcut broker’ların durumunu izler. Eğer bir lider broker arızalanırsa, controller rolünü alan broker yeni bir lideri seçer. Bir Kafka düğümünün rolünün broker ve/veya controller olduğunu KAFKA_PROCESS_ROLES
ortam değişkeniyle tayin edebiliriz.
KRaft modu, Kafka’nın Zookeeper’a bağımlılığını ortadan kaldıran yeni bir moddur. KRaft modunda, controller’lar Raft protokolü kullanarak metadata’yı yönetir.
Tek bir Kafka broker ayaklandırırsak doğal olarak controller da kendisi olur :)
Konu (Topic)
Önce biraz Türkçe ;)
Kategori ne demek? Aralarında herhangi bir bakımdan ilgi veya benzerlik bulunan şeylerin tamamı, grup.
Kafka’da konu (topic), mesajların mantıki olarak gruplandığı (kategori), üreticilerin (producers) veri gönderdiği ve tüketicilerin (consumers) okuduğu bir mesaj kuyruğudur.
Tek parçalı bir konu:
$ docker exec -it kafka kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic tekbroker.1parcali.baslik
Created topic tekbroker.1parcali.baslik.
$ docker exec -it kafka kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic tekbroker.1parcali.baslik
Topic: tekbroker.1parcali.baslik TopicId: W2tE37sDSdiD9NuYekcJwA PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: tekbroker.1parcali.baslik Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:
Konu içinde toplanan verilerin parça parça (part by part, partition — parçalı/bölümlenmiş) saklanarak verilerin paralel işlenmesini, yük dağılımını (load balance) sağlayabiliriz.
$ docker exec -it kafka kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic tekbroker.6parcali.baslik --partitions 6
Created topic tekbroker.6parcali.baslik.
$ docker exec -it kafka kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic tekbroker.6parcali.baslik
Topic: tekbroker.6parcali.baslik TopicId: RYAs59tvSvuzadGK1Iyfdw PartitionCount: 6 ReplicationFactor: 1 Configs: cleanup.policy=delete
Topic: tekbroker.6parcali.baslik Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:
Topic: tekbroker.6parcali.baslik Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:
Topic: tekbroker.6parcali.baslik Partition: 2 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:
Topic: tekbroker.6parcali.baslik Partition: 3 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:
Topic: tekbroker.6parcali.baslik Partition: 4 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:
Topic: tekbroker.6parcali.baslik Partition: 5 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:
Ayrıca, parçalayarak sakladığımız verileri yedek kopyalar (replica) halinde çoklayarak yani farklı aracı sunucularda kopyalarını saklayarak hata toleransı (fault tolerans) sağlayabiliriz.
$ docker exec -it kafka kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic 3broker.6parcali.baslik \
--partitions 6 --replication-factor 3
Created topic 3broker.6parcali.baslik
$ docker exec -it kafka-broker-1 kafka-topics.sh --bootstrap-server localhost:9093 \
--describe --topic 3broker.6parcali.baslik
Topic: 3broker.6parcali.baslik TopicId: TJ_6qBU8RT-4QVEdUuVrtQ PartitionCount: 6 ReplicationFactor: 3 Configs: cleanup.policy=delete
Topic: 3broker.6parcali.baslik Partition: 0 Leader: 11 Replicas: 11,12,13 Isr: 11,13,12 Elr: LastKnownElr:
Topic: 3broker.6parcali.baslik Partition: 1 Leader: 12 Replicas: 12,13,11 Isr: 11,12,13 Elr: LastKnownElr:
Topic: 3broker.6parcali.baslik Partition: 2 Leader: 13 Replicas: 13,11,12 Isr: 11,12,13 Elr: LastKnownElr:
Topic: 3broker.6parcali.baslik Partition: 3 Leader: 13 Replicas: 13,11,12 Isr: 11,12,13 Elr: LastKnownElr:
Topic: 3broker.6parcali.baslik Partition: 4 Leader: 11 Replicas: 11,12,13 Isr: 11,12,13 Elr: LastKnownElr:
Topic: 3broker.6parcali.baslik Partition: 5 Leader: 12 Replicas: 12,13,11 Isr: 11,12,13 Elr: LastKnownElr:
Çok aracı üzerinde parçalı yapıda sakladığımızda her bir parçanın verilerinin ilk yazılıp+okunduğu aracıdaki bölümüne lider (leader), kopyalarının saklandığı aracıya (broker)(follower) denir ve liderin verilerini sürekli eşleştirirler. Bu sayede hataya dayanıklı (failover) ve yüksek erişilebilirlik (high availability) sağlanır. Yukarıdaki çıktıda 0
numaralı parçanın lideri (yani verilerin ilk yazılıp ve hep okunduğu aracı) KAFKA_NODE_ID
değeri 11
olan aracıda, yedekleri ise 12
ve 13
ID’li aracılarda (Partition: 0 Leader: 11 Replicas: 11,12,13 Isr: 11,13,12
).
Farklı Bölümlere sahip olmanın amacı yükü paylaşmak ve Kafka kümesinin performansını iyileştirmektir.
Veri bir bölüme (partition) yazıldıktan sonra artık değiştirilemez (immutable) !
Aşağıdaki ekran görüntüsünde hiç topic
yokken /tmp/kafka-logs/
dizininde konulara ait dosyanın oluşmadığını görüyoruz. deneme
İsimli konu yaratılınca partitions
sayısı kadar dizinin oluştuğunu görüyoruz.
Dizin içinde dosya yapısı şöyle:
a6bcec4ecd02:/$ ll /tmp/kafka-logs/deneme-0
total 16
drwxr-xr-x 2 appuser appuser 4096 Jan 30 18:40 .
drwxr-xr-x 6 appuser appuser 4096 Jan 30 18:48 ..
-rw-r--r-- 1 appuser appuser 10485760 Jan 30 18:40 00000000000000000000.index
-rw-r--r-- 1 appuser appuser 0 Jan 30 18:40 00000000000000000000.log
-rw-r--r-- 1 appuser appuser 10485756 Jan 30 18:40 00000000000000000000.timeindex
-rw-r--r-- 1 appuser appuser 8 Jan 30 18:40 leader-epoch-checkpoint
-rw-r--r-- 1 appuser appuser 43 Jan 30 18:40 partition.metadata
Ayrıntılı olarak deneme
konusuna bakalım:
a6bcec4ecd02:/$ kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic deneme
Topic: deneme TopicId: VGgUTsy2TvuKA2C4czogWg PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: deneme Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:
Bir üretici, key
değeri olmaksızın aşağıdaki yapıda 30 farklı mesajı yazmak istesin. Anahtarsız gelen mesajlar round-robin
yöntemiyle bölümlere ( Partition 0
, Partition 1
, Partition 2
, Partition 3
) sırayla yazılır.
mesaj-1
Sırayla bölümlere ( Partition 0
,Partition 1
,Partition 2
) yazılacak. Bunu Önce lider broker’da yer alan Partition 0
sonra BROKER 1
’in takipçileri olan diğer Partition 0
tutan brokerlara ( BROKER 2
ve BROKER 3
).
mesaj-2
geldiğinde bir sonraki bölüm olan Partition 1
in lider brokerı olan BROKER 2
nin Partition 1
bölümüne yazılacak. Kopyalarını tutan takipçilerin broker’ları BROKER 1
ve BROKER 3
ün Partition 1
bölümlerine yazılacak.
Varsayılan olarak 7 gün ama ayarlandığında sonsuza dek veri saklanabilir.
Offset sadece özel bir bölüm (partition) için anlamlıdır. Örneğin partition 0 üzerinde 3. offset ile partition 1 üzerindeki 3. offset aynı veri değildir. Önceki değer silinse dahi offset değeri tekrar kullanılmaz çünkü Kafka Topic’e gönderilen her mesaj ile offset değeri 1 artacaktır.
Mesajların sırası belirli bir bölüm için garanti edilirken diğer bölümler için aynı sırada olmaları garanti edilmez.
Üretici (Producer)
- Producer, hangi konuya (topic), hangi bölüme (partition), hangi anahtar (key) ve mesajı (value) göndereceğini seçip bu verileri Kafka broker’ına sync/async şekilde ileten uygulamadır.
- Mesajların sıkıştırılması (hiç sıkıştırmayabilir veya
gzip
,snappy
,lz4
veyazstd
kullanabilir) ve partitioning stratejilerini uygular (aşağıda sıkıştırma bilgisini mesajın içinde göreceksiniz). - Performans odaklı, yüksek veri akışının olduğu bir durumda, örneğin metriklerin aktarımında mesajların sırası önemli olmayıp ulaştırılması öncelikliyse, ASYNC olacak şekilde mesajlar arka planda gönderilir ve sonuçları daha sonra işlenir.
acks=0
Olarak üretici başlatılır.
const { Kafka } = require('kafkajs')
const kafka = new Kafka({ brokers: ['localhost:9092'] })
// Async (performans odaklı)
const asyncProducer = kafka.producer({
acks: 0
})
await asyncProducer.connect()
// Onay beklemeden hızlıca mesaj gönderir
asyncProducer.send({
topic: 'test-topic',
messages: [{ value: 'Senkron mesaj' }]
}).then(result => {
// Arka planda sonucu işler
console.log('Mesaj gönderildi:', result)
}).catch(error => {
console.error('Gönderim hatası:', error)
})
- Bir producer, mesajların belli bir sırayla gönderilmesini (senkron) isterse, mesajın Kafa tarafından (tüm broker’lar=sync, lider broker=semi-sync) başarıyla alındığını doğrulamak için gönderim işlemini SYNC yapar.
// Semi-Sync (sadece lider broker onaylı güvenilirlik odaklı)
const semiSyncProducer = kafka.producer({
acks: 1
})
// Sync (tüm brokerların onayıyla tam güvenilirlik odaklı)
const syncProducer = kafka.producer({
acks: 'all'
})
Bir veri üretici yazılım, mesajını hangi bölüme ve konuya yazacağını Kafka Broker’ına bildirir. Broker gelen mesajı belirtilen bölüme yazar ve çoklama mevcutsa diğer Broker’lara da mesajın yazılması için gönderir. Böylece lider bölüm (partition) çökerse yedek yazdığı bölümler arasından bir lider seçerek ayakta kalmaya devam eder.
Aşağıda göreceğiniz şekilde üreticinin gönderdiği mesajın key
alanı boş ise (key == null
) round robin
yöntemiyle bölümler arasında sırayla gelen mesajları kaydeder (gelen mesajları önce ilk bölüme sonra ikinci, üçüncü vs. ve tekrar ilk bölüme …).
Eğer key
boşsa ancak üreticinin Kafka’ya bağlanırken partitioner
ayarı sticky
olarak ayarlanmışsa mesajların, bölümler arasında eşit dağıtımı yapılacaktır.
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
// Sticky partitioner ile üretici oluştur
const producer = kafka.producer({ partitioner: 'sticky' });
const sendMessages = async () => {
// Üreticiyi bağla
await producer.connect();
// Null anahtar ile mesaj gönderimi
for (let i = 0; i < 10; i++) {
await producer.send({
topic: 'my-topic',
messages: [{ value: `Mesaj ${i}`, key: null }]
});
console.log(`Gönderildi: Mesaj ${i}`);
}
await producer.disconnect(); // Üreticiyi kapat
};
sendMessages().catch(console.error); // Hata durumunda konsola yazdır
Eğer key
değeri doluysa ( key != null
) bu değeri hash
fonksiyonundan ( murmurhash2
) geçirerek ilgili bölüme yazar. key
ve value
ikilileri üretici tarafından serileştirilerek 2 tabanlı değerlere dönüştürülür ve istenirse sıkıştırılarak Kafka broker’a gönderilir. Elbette tüketici (consumer) ikili değerler halinde alacağı mesajı deserialize ederek kullanır ;)
Tüketici (Consumer)
Kafka’da tüketici, belirli konulardan (topics) mesajları okuyan ve işleyen uygulamadır. Her tüketici, okuduğu mesajların konumunu takip eden bir “offset” ile çalışır. Bu offset, hangi mesajların okunduğunu ve hangi mesajların henüz okunmadığını belirler. Tüketiciye gönderilecek en yeni mesajı bu sayede Kafka’dan alabilir.
Tüketici grupları, birden fazla tüketicinin (consumer) bir araya gelerek aynı konu (topic) üzerindeki mesajları paralel olarak işleyebilmesini sağlayan bir yapıdır. Her tüketici, bir veya daha fazla konuya abone olabilir ve bu sayede mesaj akışını paralel olarak işleyebilir. Her tüketici grubu bir grup kimlik değerine (group.id) sahip olur.
Her tüketici bir veya daha fazla partition’a üye olabilir
Yukarıdaki gösterimde Topic-A
konusu 3 adet bölümden oluşuyor ve bir tüketici grubunda bölüm sayısı kadar tüketici varsa ( consumer-group-application-2
) her birine bir bölüm düşerken, tüketici grubunda bir tüketici varsa (consumer-group-3
) tüm bölümleri okuyabiliyor.
Aşağıdaki çizimde
- ‘Consumer Group B’ nin 2 tüketici uygulaması var ama 3 partition olduğu için bir tüketici 2 tanesinden veri çekiyor.
- ‘Consumer Group A’ nın 4 tüketici uygulaması var. 3 Partiton olduğu için her bir tüketici bir partition’dan veri çekiyor ancak boşta bir partition kalmadığından dolayı 4. tüketici ‘inactive’ bekliyor (bunlar hep kısmet işi).
Ayrıca her tüketicinin kaçıncı mesajda kaldığı (offset) kafka tarafında yahut tüketi tarafında tutulur.
Tüketici grupları sayesinde şunları elde edebiliriz:
- Paralel işleme, aynı grup içindeki tüketiciler, bir konudaki mesajları farklı tüketicilere dağıtarak paralel işleme imkanı sunar. Bu sayede yük dengelemesi yapar; yani her mesaj tüketici grubundaki yalnızca bir tüketici tarafından işlenir. Bu, sistemin verimliliğini artırır ve mesajların işlenme süresini kısaltır.
Kullanım senaryosu; bir kullanıcı kaydının ardından eposta adresinin doğrulanması için onaylama iletisi göndermek ve telefon numarasının doğrulanması için SMS onay kodunu girmesini isteyelim. Her mesajı ancak bir tüketici işleyebileceği için birinci tüketici grubunda e-posta onayını ikinci gruptaki tüketici ile SMS onayını işleyebileceğiz.
- Offset, bir partition’daki mesajların sıralı numarasıdır ve her tüketici grubu için ayrı olarak yönetilir. Tüketici grubu ilk kez bir topic’e abone olduğunda, offset başlangıç değeri genellikle sıfırdır (0). Ancak daha önceki bir tüketici grubu varsa, offset ya en baştan (başlangıç —
earliest
) ya da en son (bitiş ,latest
) noktadan başlar. Okunan her mesajdan sonra, tüketici offset’i günceller. Kafka, offset’leri__consumer_offsets
adında özel bir topic'te saklar. Bu topic, her tüketici grubunun hangi partition'dan hangi offset'e kadar okuduğunu izler.
RabbitMQ’da ise genellikle her mesaj birden fazla tüketiciye yönlendirilir ve bu durum, mesajların birden fazla kez işlenmesine yol açabilir. Kafka’nın kullanıcı grubu yapısı, her mesajın yalnızca bir tüketici tarafından işlenmesini garanti eder, bu da veri okuma sürecinde daha iyi bir kontrol ve tutarlılık sağlar. Bu farklılık, Kafka’nın yüksek verimlilik ve ölçeklenebilirlik sunan mimarisinin temel unsurlarından biridir.
Zookeeper
- Kafka kümesinin koordinasyon, senkronizasyon ve yönetim hizmetini sağlar
- Broker’ların durumunu izler, lider seçimi (leader election) ve küme konfigürasyonlarını yöneterek dağıtık sistemin güvenilirliğini artırır
- Kafka gibi uygulamaların metadata’larını (konfigürasyon bilgileri, broker bilgileri, topic bilgileri vb.) yönetir. Hangi brokerların aktif olduğunu ve topic’lerin hangi brokerlarda bulunduğunu kaydeder.
ZooKeeper’ın bağımlılığı, Kafka kümesinin kurulumu ve yönetimini karmaşık hale getirdiği, önce zookeeper ve sonra kafka kümesinin kurulup yönetilmesi, özellikle büyük ve karmaşık sistemlerde zorlayıcıdır. Kafka’nın 2.8 sürümünden itibaren Zookeeper yerine KRaft ile kurulumlar denenebilir ve 3.3 sürümünden sonra üretimde kullanılabilir hale gelip 4.0 itibarıyla Zookeeper desteklenmeyecek.
Kafka Connect
Veritabanları, mesajlaşma sistemleri ve bulut hizmetleri gibi dış sistemlerle Kafka arasında veri aktarımını hazır entegrasyon bağlayıcıları (connector) ile kolaylaştırır. ETL (Extract, Transform, Load) hatlarınızın bir parçası olarak çalışır ve ölçeklenebilir.
- Kafka Streams
- Gerçek zamanlı akış işleme kütüphanesidir
- Mesajları anlık olarak filtreleme, dönüştürme ve zenginleştirme imkanı sağlar
- Mikroservis mimarileri için ideal
- Yerleşik pencere ve JOIN işlemleri destekler
- Schema Registry
- Kafka mesajlarının şema yönetimini sağlar
- Mesaj formatlarının uyumluluğunu kontrol eder
- Avro, Protobuf gibi şema formatlarını destekler
- Veri bütünlüğünü korur
- Kafka Mirror Maker
- Kafka kümelerini çoğaltma (replication) imkanı sağlar
- Farklı veri merkezleri arasında veri aktarımını kolaylaştırır
- Yedekleme ve olağanüstü durum kurtarma senaryolarında kullanılır
- Confluent Control Center
- Confluent’in sunduğu bu araç, Kafka’nın performansını izlemek ve yönetmek için kapsamlı bir arayüz sağlar. Kullanıcılar, konuları, tüketici gruplarını ve akışları kolayca yönetebilmemizi sağlayan grafik arayüz.
- Performans metriklerini görselleştirir
- Küme sağlığını ve mesaj akışını takip eder
Bu bileşenler, Apache Kafka’nın yüksek performanslı, ölçeklenebilir ve güvenilir bir mesaj aktarım platformu olmasını sağlar.
Zero Copy ile Yüksek Performans
Normal bir veri transferi sürecinde veriler genellikle birden fazla kez kopyalanır. Mesaj, ağ kartından belleğe, bellekten diske veya başka bir depolama alanına kopyalanır ve okuma işlemi sırasında da bu adımların tersi takip edilir. Bu durum, hem sistem performansını düşürür hem de gecikmelere neden olur. Zero Copy, alışılmış veri akışında bir köprü görevi görerek, verilerin gereksiz yere kopyalanmasını önler. Bu sayede, hem işlemci yükü azalır ve bellek kullanımı azalır hem de veri transferi hızlanır, sistemin cevap verme süresi iyileşir.
Normal Süreç:
Uygulama Belleği -> Kernel Belleği -> Ağ Kartı -> Ağ
Zero Copy:
Uygulama Belleği -> (Kernel Belleği'ni atlar) -> Ağ Kartı -> Ağ
Zero Copy’nin temel çalışma prensibi:
- Bellek Sayfaları: İşletim sistemleri, belleği daha küçük parçalara böler ve bu parçalara sayfa adı verilir. Zero Copy, bu sayfaları kullanarak doğrudan veri transferi yapar.
- Direkt Bellek Erişimi (DMA): Zero Copy, DMA (Direct Memory Access) özelliğini kullanarak, CPU’nun müdahalesi olmadan verinin bellekle ağ kartı arasında doğrudan transferini sağlar. Ana kart yonga kümesinin bir parçası olan Doğrudan Bellek Erişimi (Direct Memory Access; DMA), CPU’dan bağımsız olarak, çevre birimlerin RAM bloğuna erişebilmesini sağlayan bir özelliktir. Böylece verilerimizi çevre birimden (peripheral) hafızaya, hafızadan çevre birime ve hafızadan hafızaya olacak şekilde hızlıca kaynak adresten hedefe aktarır.
Her ne kadar transferi CPU başlatsa da, o gerçekleştirmez. DMA transferi esas olarak, bir bellek bloğunu bir cihazdan diğerine kopyalar.
- Veri Kopyalanmaz: Veri, bir yerden başka bir yere fiziki olarak taşınmaz. Bunun yerine, işletim sistemi, verinin bulunduğu bellekteki sayfaların konumunu günceller ve bu sayfalara erişim yetkilerini ayarlar.
Kafka Ayaklandıralım *
Kafka 3.3 sürümünden itibaren KRaft (Kafka Raft) sayesinde artık Zookeeper’a ihtiyaç duyulmamasıyla Kafka’nın dağıtımı büyük ölçüde basitleştirildi.
Bir istemciyi (producer/consumer) çalıştırdığınızda, ona ilettiğiniz brokerın kümedeki brokerlar hakkındaki meta verileri alacağı yer olmasıdır. Verileri okumak/yazmak için bağlanacağı asıl ana bilgisayar ve IP, brokerın bu ilk bağlantıda istemciye cevap olarak döndüğü verilere dayanır (kafka brokerınız tek bir düğümden oluşsa bile bu durum aynıdır).
Kafka’ya brokerların birbirlerine nasıl ulaşabileceklerini anlatmanız, ancak aynı zamanda istemcilerin (üreticiler/tüketiciler) ihtiyaç duydukları brokera nasıl ulaşabileceklerinden de emin olmanız gerekir. Bu iletişimi brokerlar listener
üzerinden sağlar. Kafka’da “listener” kavramı, broker’ın farklı amaçlar için farklı portlar üzerinden iletişim kurmasını sağlamak amacıyla ortaya çıkmıştır. Basitçe listener
şu bilgilerden oluşur: protokol://adres_veya_ip:port
.
Broker’ın aynı zamanda hem iç (küme içindeki diğer broker’lar ile) hem de dış (istemciler ile) iletişim kurması gerekebilir. Bu durumlarda, iç ve dış iletişim için farklı listener’lar tanımlanarak güvenlik (örneğin, dışarıdan gelen bağlantılar için SSL kullanılırken, küme içindeki broker’lar arası iletişim için PLAINTEXT
kullanılabilir) ve yönlendirme sağlanabilir.
Docker resmi sitesinde nasıl ayaklandırılacağına dair bu adresi kullanabilirsiniz.
# Ayaklandırmak için
docker run -d --name=kafka -p 9092:9092 apache/kafka
# Apache/kafka imajı, /opt/kafka/bin dizininde birkaç yararlı
# betikle birlikte gelir. Kümenin çalışır durumda olduğunu doğrulamak r4:npExK.M@69-O7r4:npExK.M@69-O7
# ve küme kimliğini almak için aşağıdaki komutu çalıştırın:
docker exec -ti kafka /opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server :9092
# Aşağıdaki gibi bir çıktı alırsınız:
Cluster ID: 5L6g3nShT-eMCtK--X86sw
KAFKA_LISTENERS
(docker yerine server.properties
içinde listeners
), Broker'ın hangi adres ve portlardan dinleme yapacağını belirtir.
KAFKA_ADVERTISED_LISTENERS
(docker yerine server.properties
içinde advertised.listeners
), İstemcilere (consumer, producer) broker'a hangi adres ve porttan bağlanacaklarını bildirir. Bu genellikle KAFKA_LISTENERS
ile aynıdır, ancak bazı durumlarda farklılık gösterebilir. Örneğin, docker container'ı içindeki hostname farklı, dışarıdan erişilen hostname farklı olabilir.
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
(docker yerine server.properties
içinde listener.security.protocol.map
), Hangi listener'ın hangi güvenlik protokolünü kullanacağını belirtir.
KAFKA_INTER_BROKER_LISTENER_NAME
(docker yerine server.properties
içinde inter.broker.listener.name
), Broker'lar arası iletişim için hangi listener'ın kullanılacağını belirtir.
Ortam Değişkenleri
KAFKA_NODE_ID
, Kafka kümesindeki her bir düğümün (node, controller veya broker her düğümün) benzersiz kimliğini belirten -1'den büyük sayıdır. Bu kimlik, Kafka’nın küme içindeki düğümleri birbirinden ayırt etmesini sağlar.KAFKA_PROCESS_ROLES
, Kafka'nın hangi rol[leri] yerine getireceğini (broker
veyacontroller
) belirler. Bu ortam değişkeni Kafka’nın Zookeeper’sız çalışmasını sağlar. Broker, üreticilerden gelen mesajları depolayan ve tüketicilere sunan bileşendir. Topic’ler ve partition’lar bu rol tarafından yönetilir. Controller, Kafka kümelerinin metadata yönetimini (örneğin, yeni topic oluşturma, partition replica’ların yönetimi vb.) üstlenen bileşendir. KRaft modunda, controller’lar Raft protokolü kullanarak metadata’yı yönetir. Bu değerler virgülle ayrılarak birlikte kullanılabilir. Örneğin, bir düğüm hem broker hem de controller olarak çalışabilir (KAFKA_PROCESS_ROLES: broker,controller
). Bu durumda, düğüm hem mesajları işleyen bir broker olarak hem de küme yönetim görevlerini üstlenen bir controller olarak çalışacaktır.LISTENERS
, Kafka’nın bağlandığı arayüzlerdir.ADVERTISED_LISTENERS
ise istemcilerin nasıl bağlanacağını belirler.KAFKA_LISTENERS
(controller.listener.names
), broker'ın hangi dinleyici adı, adres ve port üzerinden gelen bağlantıları dinleyeceğini belirten iç ağda aracılar arasındaki iletişimde, dış ağda ise istemciler (üretici/tüketici) ile iletişimde kullanılacak bağlantıdır (<Listener_Adı>://<IP_veya_Host>:<Port>
).<Listener_Adı>
: Dinleyiciyi tanımlayan bir isimdir (örneğin,DENETCI
,ARACI
,PLAINTEXT
,SASL_SSL
gibi) ve bağlantı türlerini ayırt etmeyi kolaylaştırır. Her dinleyici, farklı bir güvenlik protokolü kullanabilir (örneğin,PLAINTEXT
,SSL
,SASL_PLAINTEXT
) veKAFKA_LISTENER_SECURITY_PROTOCOL_MAP
ile hangi güvenlik protokolünün kullanılacağı eşleştirilir (örneğin:KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DENETCI:PLAINTEXT
) eğer eşleştirme yapılamazsaPLAINTEXT
protokolü kullanılır. Bu durumda,DENETCI
dinleyicisiPLAINTEXT
protokolünü kullanır.<IP_veya_Host>
: Dinleyicinin bağlanacağı IP adresi veya hostname. Boş bırakılırsa, Kafka varsayılan olarak tüm ağ arabirimlerini (0.0.0.0
) dinler. Dinleyicinin amacını veya kullanım senaryosunu yansıtan isimler seçin (INTERNAL
: broker'lar arası iletişim için,EXTERNAL
: dış istemciler için,CONTROLLER
: controller iletişimi için vb.).
Bir controller için örnek ayar verelim. Kafka düğümümüze controller
rolünde, NODE_ID
değeri 1, dinleyici adı CONTROLLER
olan, 9093 portunda ve şifreleme olmayan protokolde (PLAINTEXT
) iletişim kurulacak bir listener tanımlıyoruz :
KAFKA_NODE_ID=1
KAFKA_PROCESS_ROLES=controller
KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka-controller:9093
KAFKA_LISTENERS=CONTROLLER://controller:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
Aynı şekilde broker ayarı yapalım.
KAFKA_NODE_ID=2
KAFKA_PROCESS_ROLES=broker
KAFKA_CONTROLLER_QUORUM_VOTERS=1@controller:9093
KAFKA_LISTENERS=INTERNAL://broker1:9093,EXTERNAL://broker1:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:SSL
KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME=BROKER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
(listener.security.protocol.map
),KAFKA_CONTROLLER_QUORUM_VOTERS
, Kümeye dahil olan tüm controller düğümleri burada belirtilmeli.KAFKA_CONTROLLER_LISTENER_NAMES
, değeri Kafka’nın KRaft modunda controller ve broker’lar arasındaki iletişimi nasıl yönettiğini anlamak açısından önemli. Bu değercontroller
rolü için hangi dinleyici(ler) üzerinden iletişim kuracağını belirtir. Bu, KRaft modunda controller’ın broker’lar ile iletişim kurmasını sağlar. Kafka’nın KRaft modunda Zookeeper olmadığı için bir veya birden fazla brokercontroller
rolünü üstlenir ve küme yönetiminden sorumludur. Bu ayar, hangi listener’ın Controller ile iletişim kuracağını belirlemek için kullanılır.
Bu dinleyiciyi controller
içinde önce KAFKA_LISTENERS
değerinde tanımlamış olmamız gerekir ve sonra KAFKA_CONTROLLER_LISTENER_NAMES
değeri olarak yer almalıdır. Ve KAFKA_CONTROLLER_QUORUM_VOTERS
değerinde kaç controller varsa hepsinin listener değerleri virgüllerle ayrılmış olarak bu alanda tutulmalı:
Örnek:
- Controller 1:
KAFKA_LISTENERS: CONTROLLER://:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS:1@controller1:9092,2@controller2:9093 - Controller 2:
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: @controller1:9092,2@controller2:9093 - Broker:
KAFKA_CONTROLLER_QUORUM_VOTERS: @controller1:9092,2@controller2:9093
(broker'lar için bu ayar gerekli değildir, ancak KRaft modunda zorunludur).
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
Şimdi broker
rolü için anlamına bakalım: burada yer alan dinleyici(ler), controller’ın kendisini tanımladığı bir dinleyicidir ve broker’lar bu dinleyici üzerinden controller ile iletişim kurar. Bu dinleyici, KAFKA_LISTENERS
ayarında tanımlanmaz. Yani, controller'ın dinleyicisi, broker'ların KAFKA_LISTENERS
ayarında yer almaz. Broker’lar, controller ile iletişim kurarken KAFKA_CONTROLLER_QUORUM_VOTERS
ayarında belirtilen controller adresini kullanır. Bu adres, controller’ın KAFKA_LISTENERS
ayarında tanımladığı dinleyiciye karşılık gelir.
KAFKA_INTER_BROKER_LISTENER_NAME
(veyainter.broker.listener.name
), Broker’ların birbirleriyle iletişim kurarken hangi dinleyiciyi kullanacağını belirtir. Bu, genellikleBROKER
dinleyicisi olarak tanımlanır.KAFKA_ADVERTISED_LISTENERS
(veyaadvertised.listeners
), broker’ın kendisini dış dünyaya nasıl tanıttığını belirler. İstemcilere (üretici/tüketici) ve diğer brokerlara kendilerini bu değer üzerinden tanıtırlar. Bu ayar, istemcilerin ve diğer broker’ların broker’a nasıl bağlanacağını belirler. Ancak, bu ayarınKAFKA_INTER_BROKER_LISTENER_NAME
(veyainter.broker.listener.name
) ile ilişkisi vardır. Kafka,inter.broker.listener.name
ayarının,advertised.listeners
ayarında tanımlı bir dinleyiciyi referans almasını zorunlu kılar. Bu, Kafka'nın broker'lar arası iletişimi doğru şekilde yönetmesini sağlar.- listener.security.protocol.map
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
, Kafka’da tüketiciler (consumers), mesajları nerede bıraktıklarını takip etmek için offset bilgilerini merkezi bir yerde saklar. Bu bilgi__consumer_offsets
adlı özel bir Kafka konusu içinde tutulur. Bu parametre,__consumer_offsets
adlı özel konunun kaç broker arasında çoklanarak saklanacağını belirler (1 değeri tek broker’da, 3 değeri 3 broker arasında yedekli saklar).KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
parametresi, Kafka’nın birim işlem (transactional) mesajlarını takip etmek için kullandığı__transaction_state
adlı özel konunun (topic) replikasyon faktörünü belirler. Eğer bir işlem (transaction
) başarılı olursa, tüm mesajlar işlenir; başarısız olursa, hiçbiri işlenmez. Bunu takip edebilmek için Kafka,__transaction_state
adlı özel bir konu (topic) kullanır. İşteKAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
, bu özel konunun kaç broker arasında yedekleneceğini belirler.KAFKA_NUM_PARTITIONS
, Kafka’da yeni oluşturulan konular (topics
) için varsayılan bölüm (partition
) sayısını belirler. Tek bölüm ile bir konu oluşturursanız mesajlar sırayla yazılır ve okunur. Bölüm sayısı arttıkça paralel işlem yapılabilir hale gelir performans artar, koordinasyon maliyeti artar ancak sıralı yazma ortadan kalkar. Bir konu için bölüm sayısı arttırılabilir ancak yaratıldığı sayıdan aşağı düşürülemez.
version: '3.8'
services:
kafka-controller:
image: apache/kafka:latest
container_name: kafka-controller
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-controller:9092
KAFKA_CONTROLLER_QUORUM_ELECTION_TIMEOUT_MS: 1000
KAFKA_CONTROLLER_QUORUM_FETCH_TIMEOUT_MS: 1000
ports:
- "9092:9092"
kafka-broker-1:
image: apache/kafka:latest
container_name: kafka-broker-1
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: INTERNAL://:9094,EXTERNAL://:9095
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-broker-1:9094,EXTERNAL://kafka-broker-1:9095
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-controller:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
ports:
- "9095:9095"
kafka-broker-2:
image: apache/kafka:latest
container_name: kafka-broker-2
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: INTERNAL://:9096,EXTERNAL://:9097
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-broker-2:9096,EXTERNAL://kafka-broker-2:9097
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-controller:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
ports:
- "9097:9097"
kafka-ui:
image: ghcr.io/kafbat/kafka-ui:latest
container_name: kafka-ui
environment:
KAFKA_CLUSTERS_0_NAME: local-kafka
# KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS ayarı güncellendi.
# Broker'ların dış dinleyicileri (EXTERNAL) kullanılacak şekilde ayarlandı.
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-broker-1:9095,kafka-broker-2:9097
ports:
- "8080:8080"
depends_on:
- kafka-controller
- kafka-broker-1
- kafka-broker-2
kcat (eski kafkacat)
Kafkacat, Kafka mesajları üretmek ve tüketmek için bir komut satırı aracıdır. Ek olarak, küme veya konular hakkında meta verileri görüntüleyebilirsiniz.
Kafkacat’ın oldukça fazla parametresi var ve hepsini öğrenmek korkutucu görünebilir, ancak (çoğu) parametre mantıklı ve hatırlaması kolay. En önemlisiyle başlayalım: modlar. Kafkacat’a çağrı yaparken, her zaman sahip olduğu dört moddan birini kullanacaksınız. Tüm modlar büyük harf kullanır:
- -P = Produce data
- -C = Consume data
- -L = List metadata
- -Q = Query
- L
Listelemek için (-b kafka-broker-2:9097 -L
) bir broker üzerinden kcat komutunu aynı ağ içindeki konteynerden çalıştıralım. Listelediğimizde 2 aracının dahil olduğu bir küme ve ‘isimler’ adında bir konunun 2 bölüm ve 2 ISR olacak şekilde listelendiğini görüyoruz.
$ docker run -it --network=kafka_clusterkafka_ui_default edenhill/kcat:1.7.1 \
-b kafka-broker-2:9097 -L
Metadata for all topics (from broker 3: kafka-broker-2:9097/3):
2 brokers:
broker 2 at kafka-broker-1:9095
broker 3 at kafka-broker-2:9097 (controller)
1 topics:
topic "isimler" with 2 partitions:
partition 0, leader 2, replicas: 2,3, isrs: 2,3
partition 1, leader 3, replicas: 3,2, isrs: 3,2
Diyelim ki brokerlardan bir tanesi bize yakın Frankfurt veri merkezinde duruyor ve okumalarımızı bu replika üzerinden yapalım (-b kafka-broker-2:9097 -t isimler -C -e
):
$ docker run -it --network=kafka_clusterkafka_ui_default edenhill/kcat:1.7.1 \
-b kafka-broker-2:9097 -t isimler -C -e
Salih
Ali
Veli
% Reached end of topic isimler [1] at offset 1
% Reached end of topic isimler [0] at offset 2: exiting
-b
ile hangi broker üzerinden soracağımızı,-t
ile hangi topic bilgisini istediğimizi-C
ile kayıtları çeken bir CONSUMER (tüketici) olduğumuzu-e
ile başarıyla tamamlandıysa komuttan çıkmak istediğimizi
Şimdi isimlere yenisini ekleyelim bir üretici (Producer) gibi (-b kafka-broker-2:9097 -t isimler -P -e
) :
$ docker run -it --network=kafka_clusterkafka_ui_default edenhill/kcat:1.7.1 \
-b kafka-broker-2:9097 -t isimler -P -e
Cem
$ docker run -it --network=kafka_clusterkafka_ui_default edenhill/kcat:1.7.1 \
-b kafka-broker-2:9097 -t isimler -C -e
Salih
Ali
Veli
Cem
% Reached end of topic isimler [1] at offset 1
% Reached end of topic isimler [0] at offset 3: exiting
Tekrar verileri çekmek istediğimizde Cem adını görüyoruz mesajlar içinde.
Mesaj Yapısını İnceleyelim
PLAINTEXT
, Kafka'nın şifrelenmemiş (unencrypted) iletişim kullandığını gösterir. Bu, mesajların ağ üzerinde düz metin olarak iletileceği anlamına gelir. PLAINTEXT
güvenlik açısından zayıftır çünkü veri şifrelenmez. Kafka'da güvenli iletişim sağlamak için SSL://
ya da SASL_SSL://
gibi şifreli protokoller kullanılabilir.
PLAINTEXT://
: Güvenli değildir, genelde yerel ağlar veya test ortamlarında kullanılır.SSL://
: Veri şifrelenir, güvenli ortam sağlar.SASL_SSL://
: Hem şifreleme (SSL) hem de kimlik doğrulama (SASL) destekler.
PLAINTEXT
Bir mesajı inceleyelim:
Producer’dan Broker’a Gönderim:
Üretici, mesajı broker’a gönderirken topic ve partition bilgisi protokol başlıklarında yer alır:
Protokol Başlığı:
+-------------+------------------------+---------------+
| RequestType | Topic: "user-activity" | Partition: 1 |
+-------------+------------------------+---------------+
Üretici Mesajının Fiziki Yapısı:
+---------+---------------------------------------------------+
| Key | Value |
+---------+---------------------------------------------------+
| user123 | {"action": "login", "timestamp": "2025-01-19"} |
+---------+---------------------------------------------------+
Mesaj, Kafka Broker’a iletilirken Header bilgileri de eklenir. Broker, bu başlık bilgilerini kontrol ederek mesajı doğru yere yerleştirir. Broker bu mesajı partition’a yazar:
Başlık bilgisiyle birlikte mesajın tam hali:
+-----------------------------------------+------------------------------------+
| METADATA HEADER | PAYLOAD |
+------------+-----------+-------+--------+------------------------------------+
| MAGIC BYTE | TIMESTAMP | ATTR. | OFFSET | KEY | VALUE |
+------------+-----------+-------+--------+------------------------------------+
| 0x02 | 167... | 0x00 | 42 | user123 | {"action": "login", ...} |
+------------+-----------+-------+--------+------------------------------------+
Bu bölüm mesajın iletimi ve düzenlenmesi için gereken kontrol bilgilerini içerir. Başlıca bileşenler şunlardır:
- Magic Byte: Kafka’nın kullandığı protokol sürümünü belirten bir bayt.
Örn:0x02
Kafka 2.0 için. - Attributes: Mesajın sıkıştırma algoritması (gzip, snappy) veya özel bayraklar gibi bilgiler içerir.
Örn:0x00
(sıkıştırma yok). - Timestamp: Mesajın gönderildiği zaman damgası (milisaniye cinsinden).
- Offset: Mesajın bir topic-partition içinde konumunu belirler.
Örn:Offset = 5
bu mesajın bir topic'teki altıncı mesaj olduğunu gösterir.
Tüketici (Consumer) mesajı Broker’dan Okur:
Protokol başlığı ve mesajın içeriği:
Protokol Bilgisinde:
+-------------+------------------------+
| RequestType | Topic: "user-activity" |
+-------------+------------------------+
Alınan Mesajda:
+--------------------- Kafka Message -------------------------+
| Header (Meta) |
| +------------+-----------+--------+---------+------------+ |
| | Magic Byte | Timestamp | Attr. | Offset | Partition | |
| +------------+-----------+--------+---------+------------+ |
+-------------------------------------------------------------+
| Payload (Data) |
| +---------+-----------------------------------------------+ |
| | Key | Value | |
| +---------+-----------------------------------------------+ |
+-------------------------------------------------------------+
Tüketici, Kafka’dan mesaj okuduğunda başlık ve gövdesinde şu bilgileri alır:
// ------ Başlık Bilgisi/Header -----
{
"key": "user123",
"offset": 42,
"timestamp": 1672509600000,
"headers": {
"eventType": "login",
"source": "web-app"
},
"topic": "user-activity",
"partition": 1,
"compression": "gzip"
}
// ------ Gövdesi/Payload -----
{
"userId": "user123",
"action": "login",
"timestamp": "2025-01-19T12:00:00Z"
}
- Topic: user-activity
- Partition: 0
- Offset: 42
- Key: “user123”
- Value: {“action”: “login”, “timestamp”: “2025–01–19T10:00:00Z”}
Tüketicinin okuduğu:
+--------+---------+---------------------------------------------------+
| Offset | Key | Value |
+--------+---------+---------------------------------------------------+
| 42 | user123 | {"action": "login", "timestamp": "2025-01-19"} |
+--------+---------+---------------------------------------------------+
Tüketicinin aldığı mesajın tam yapısı:
+----------------------------+--------------------------------------+
| Header | Payload |
+----------------------------+--------------------------------------+
| Key: user123 | { |
| Offset: 42 | "userId": "user123", |
| Timestamp: 1672509600000 | "action": "login", |
| Topic: user-activity | "timestamp": "2025-01-19T12:00:00Z"|
| Partition: 1 | } |
| Headers: | |
| eventType: login | |
| source: web-app | |
| Compression: gzip | |
+----------------------------+--------------------------------------+
Komut Satırı Komutları
Konu (topic) İşlemleri
Sunucuya bağlanmak için --command-config
bayrağını kullanarak ayarları komut satırından komut çalıştırabiliriz *. Önce sunucu ayarını içeren dosyanın içindeki alanları öğrenelim:
- Simple Authentication and Security Layer (SASL-Basit Kimlik Doğrulama ve Güvenlik Katmanı), uygulamaların ve sistemlerin kullanmalarını için kimlik doğrulama ve güvenlik mekanizmalarını standart bir şekilde bir çerçevedir (framework). Temel amacı, kimlik doğrulama yöntemlerini uygulama protokolünden ayırmaktır. Başka bir deyişle, bir uygulama (örneğin, Kafka, RabbitMQ, SMTP, IMAP, POP3, PostgreSQL, MySQL sunucusu) kimlik doğrulama işlemini nasıl yapacağını SASL’a devredebilir ve SASL aracılığıyla farklı kimlik doğrulama mekanizmalarını destekleyebilir.
security.protocol
: SSL (Secure Sockets Layer) veya güncel versiyonu olan TLS (Transport Layer Security) protokolü kullanılır. Bu protokol, iletişimi şifreler ve veri bütünlüğünü sağlar. Bu protokol seçildiğinde ek bilgilere gerek olacaktır:ssl.truststore.location
: Güvenilecek sertifikaları içeren truststore dosyasının yolu.ssl.truststore.password
: Truststore parolası.ssl.keystore.location
: Sunucu veya istemci sertifikalarını içeren keystore dosyasının yolu.ssl.keystore.password
: Keystore parolası. SASL_PLAINTEXT, iletişim şifrelenmediği için SASL mekanizmasıyla kullanıcı bilgilerini koruyabiliriz. SASL_SSL (veya SASL_TLS), iletişimin hem güvenli olmasını hem de kimlik doğrulamasının yapılmasını sağlar. GSSAPI (veya KERBEROS), kurumda Kerberos altyapısının zaten mevcut olduğu durumlarda, ağ üzerinde kimlik doğrulama için kullanılan yaygın bir protokoldür.- SASL Mekanizmaları * (
sasl.mechanism
):
PLAIN: Kullanıcı adı ve parola tabanlı, basit kimlik doğrulamadır. Kullanıcı adı ve şifresi şifrelenmeden (yani Base64 ile kodlayarak) gönderdiği için yalnızca SSL/TLS şifrelemesi ile birlikte (yanisecurity.protocol=SASL_SSL
) kullanılması gerekir.
SCRAM-SHA-256 ve SCRAM-SHA-512 (Salted Challenge Response Authentication Mechanism) : Parolayı doğrudan göndermek yerine, paroladan türetilen anahtarlar ve rastgele sayılar (salt) kullanarak kimlik doğrulaması yapar. Parola hiçbir zaman doğrudan gönderilmez ve olası bir şifreleme sızıntısında bile parolalar ele geçirilemez. Ayrıca, SCRAM'in güvenlik açıkları ve kimlik hırsızlıklarına karşı direnci daha yüksektir.
GSSAPI (Kerberos): Kerberos tabanlı kimlik doğrulama.
ANONYMOUS: Kimlik doğrulama gerektirmeyen anonim erişim.
OAUTHBEARER: Bir OAuth sunucusuyla entegre edilmiş ve token tabanlı kimlik doğrulamasının kullanılması gereken durumlarda tercih edilir. Özellikle mikroservis mimarilerinde ve farklı platformlar arasında kimlik doğrulaması yapılması gerektiğinde kullanılır. Bu mekanizma, merkezi bir kimlik doğrulama ve yetkilendirme hizmeti kullanmak isteyen uygulamalar için uygundur. - SASL Nasıl Çalışır:
Müzakere (Negotiation): İstemci ve sunucu, destekledikleri SASL mekanizmalarını müzakere eder. Bu süreçte, her iki tarafın da desteklediği en uygun mekanizma seçilir.
Kimlik Doğrulama (Authentication): Seçilen SASL mekanizmasına göre kimlik doğrulama işlemi yapılır. Bu işlem, kullanıcı adı/parola gönderme, bir anahtar değişimi veya token gönderme gibi farklı şekillerde olabilir.
Yetkilendirme (Authorization): Kimlik doğrulama başarılı olursa, sistem kimliği doğrulanmış kullanıcının yetkilerini kontrol edebilir.
Güvenlik Katmanları: SASL, isteğe bağlı olarak veri gizliliği (şifreleme) ve veri bütünlüğü (bütünlük kontrolü) gibi güvenlik katmanlarını da sağlayabilir.
Buraya kadar security.protocol
ve sasl.mechanism
alanlarını tanıdık, şimdi ayar dosyamızı görüp son satırı öğrenelim:
security.protocol=SASL_SSL veya SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="client" \
password="client-secret";
sasl.jaas.config
: Bu parametre, Kafka istemcisinin SASL kimlik doğrulama modülünü (org.apache.kafka.common.security.plain.PlainLoginModule
), Kafka'nın PLAIN mekanizması için kullanılan kimlik doğrulama modülüdür.
required
: Kimlik doğrulama modülünün başarılı olmasının zorunlu olduğunu belirtir. Eğer kimlik doğrulama başarısız olursa, bağlantı kurulamaz.
username="client"
: Kafka broker’ına bağlanmak için kullanılacak kullanıcı adını belirtir.
password="client-secret"
: Kafka broker’ına bağlanmak için kullanılacak şifreyi belirtir.
Şu komutları çalıştıralım: create, list, describe, delete
Genel komutlar:
kafka-topics.sh --bootstrap-server localhost:9092 --list
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic deneme
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic deneme
list topics
Broker üzerinde tanımlı konuları listelemek için --list
kullanıyoruz ancak hiç topic yaratılmadığı için boş dönüyor listemiz:
a6bcec4ecd02:/$ kafka-topics.sh --bootstrap-server localhost:9092 --list
a6bcec4ecd02:/$
create topic
a6bcec4ecd02:/$ kafka-topics.sh --bootstrap-server localhost:9092 --create --topic deneme
Created topic deneme.
a6bcec4ecd02:/$
a6bcec4ecd02:/$ kafka-topics.sh --bootstrap-server localhost:9092 --list
deneme
a6bcec4ecd02:/$ ll /tmp/kafka-logs/
total 40
drwxr-xr-x 4 appuser appuser 4096 Jan 30 19:17 .
drwxrwxrwt 1 root root 4096 Jan 30 18:22 ..
-rw-r--r-- 1 appuser appuser 0 Jan 30 18:22 .lock
drwxr-xr-x 2 appuser appuser 4096 Jan 30 18:22 __cluster_metadata-0
-rw-r--r-- 1 appuser appuser 249 Jan 30 18:22 bootstrap.checkpoint
-rw-r--r-- 1 appuser appuser 4 Jan 30 19:15 cleaner-offset-checkpoint
drwxr-xr-x 2 appuser appuser 4096 Jan 30 19:15 deneme-0
-rw-r--r-- 1 appuser appuser 4 Jan 30 19:17 log-start-offset-checkpoint
-rw-r--r-- 1 appuser appuser 122 Jan 30 18:22 meta.properties
-rw-r--r-- 1 appuser appuser 15 Jan 30 19:17 recovery-point-offset-checkpoint
-rw-r--r-- 1 appuser appuser 15 Jan 30 19:17 replication-offset-checkpoint
replication-factor
3 Kopyalı (replica) topic yaratacağız : --replication-factor=3
değerini vereceğiz ama Kafka kümesinde sadece bir broker olduğu için hata alıyoruz:
a6bcec4ecd02:/$ kafka-topics.sh --bootstrap-server localhost:9092 \
> --create --topic deneme \
> --replication-factor 3
Error while executing topic command: Unable to replicate the partition 3 time(s):
The target replication factor of 3 cannot be reached
because only 1 broker(s) are registered.....
Diyelim ki Kafka kümemizde 20 tane broker var ve 3 kopyalı bir konu yaratacağız (yukarıdaki komut). Artık 20 broker olduğu için 1 lider 2 takipçi toplam 3 kopya In-Sync-Replicas (ISR) yaratabilecek. Yani neymiş; broker sayımızdan fazla ISR sayımız olamaz!
Producer topic1-part1
konusunu 3 bölüm (partition) ve 3 kopya (replica) olacak şekilde yaratıyor. Bölümler sayı yeterli olduğu için broker’lar arasında dağıtılıyor. broker1
Lider oluyor, broker2
ve broker3
takipçi oluyor. Bu durumda ISR değerini 1,2,3 göreceğiz. Aynı şekilde topic1-part2
için ISR değerini 2,3,4 olurken, topic1-part3
için ISR 3,1,4 ve topic1-part4
için ISR 4,1,2 değerini alacak.
Diyelim ki lider çöktü! Takipçilerden birisi lider seçilir hayat devam eder. Eski lider tekrar ayaklanırsa kopyaların arasında yerini bir takipçi olarak alır ve sadece okumalar için tüketiciler eski lider yeni takipçiye gider. Peki eski lider hiç ayaklanamazsa, bu kez bir lider ve bir takipçi olarak topic erişilebilir olacaktır.
Diyelim ki kopyalarımız farklı ülkelerdeki veri merkezlerinde tutuluyor. Tüketici olarak bize en yakın broker’dan sadece OKUMA yapabiliriz çünkü yazma işini LİDER’den yapmak zorundayız, seçim konusu değildir!