This Banner is For Sale !!
Get your ad here for a week in 20$ only and get upto 15k traffic Daily!!!

Apache Kafka ile veri yazma ve okuma


Apache Kafka üzerinde konular (matter) vasıtası ile veriler gönderebilir ve bu verileri okuyabiliriz.

Bu işlem için bir çok programlama dilinde çözümler olduğu gibi örnek olması açısından Python3 üzerinde python3-kafka paketi ile denemelerimizi yapacağım.

Bir önceki yazımızdaki docker konteynırı üzerinde yapabileceğiniz gibi aşağıda belirli ayarları yaparak kendi sunucunuzla da iletime geçebilirsiniz.

Python için üzerinde bir çok kafka kütüphanesi olmasına rağmen en çok kullanılanlardan birisi olan python3-kafka paketini kullanacağım. Kurulum için aşağıdaki komutun yazılması gerekecektir

sudo apt set up python3-kafka
Enter fullscreen mode

Exit fullscreen mode

Bu adımdan sonra python üzerinde kafka kütüphanesini kullanabilir olacağız.

Bu adımda “uretici.py” isimli basit bir dosya oluşturabiliriz. İsmi istediğiniz gibi verebilirsiniz ama kavram olarak üretici (producer) Kafka anlatımlarında kullanıldığı için bu ismi kullandım. Bu dosya içeriği aşağıdaki gibi yapılabilir.

from kafka import KafkaProducer
import json

producer = KafkaProducer(
 bootstrap_servers='localhost:9092',
 value_serializer=lambda v: json.dumps(v).encode('ascii')
)

producer.ship(
 'ornekinsanlar',
 worth=
    {
     "title": "Ali Yazar",
     "yas": "12",
     "dogumyeri": "Zonguldak",
     "detay": "iyi bayramlar!"
     }
)
producer.flush()

Enter fullscreen mode

Exit fullscreen mode

Yazılan uygulamayı incelersek “ornekinsanlar” olarak yazılmış ifadenin Kafka üzerindeki bir konu (matter) olduğunu anlayabiliriz.

Üst kısımda KafkaProducer olarak da hangi sunucu adresine hangi port üzerinden erişilebileceğini sormakta. Docker üzerinde örnek yapıyorsanız aynı şekilde (localhost:9092) bırakmanız yeterli olacaktır.

Ayrıca Kafka’nın SSL desteği bulunduğu için regular şartlarda SSL ayarlarının da yapılması gerektiğini söylemem gerekir. Fakat örnek olması açısından bu ayarları es geçiyoruz.

Üretici uygulamamızın Kafka ile haberleşmesi sonrasında aşağıdaki gibi bir JSON formatının Kafka’ya gönderilmesi sağlanmakta.

    {
     "title": "Ali Yazar",
     "yas": "12",
     "dogumyeri": "Zonguldak",
     "detay": "iyi bayramlar!"
     }
Enter fullscreen mode

Exit fullscreen mode

Bu kısımdaki içerik uygulamanızın ihtiyaçlarını anlatmakta. İstediğiniz gibi oluşturabilirsiniz.

Dosyayı oluşturdu isek aşağıdaki gibi dosyanın içerisine yazdığımız json verisini Kafka’ya gönderebiliriz. İçeriğini değiştirerek birkaç kere çalıştırdığınızda peş peşe eklendiğini göreceksiniz.

python3 uretici.py
Enter fullscreen mode

Exit fullscreen mode

Yazdığınız verilerin okunması kısmı da oldukça basit olmaktadır.

Bu adımda “tuketici.py” isimli basit bir dosya oluşturabiliriz. İsmi istediğiniz gibi verebilirsiniz ama kavram olarak tüketici (client) Kafka anlatımlarında kullanıldığı için bu ismi kullandım. Bu dosya içeriği aşağıdaki gibi yapılabilir.

from kafka import KafkaConsumer
from pprint import pprint

if __name__ == '__main__':
    client = KafkaConsumer('ornekinsanlar', bootstrap_servers="localhost:9092",
                             enable_auto_commit=False, auto_offset_reset="earliest")
    pprint(client.metrics())
    for msg in client:
        pprint(msg)
Enter fullscreen mode

Exit fullscreen mode

Yazılan uygulamayı incelersek “ornekinsanlar” olarak yazılmış ifadenin Kafka üzerindeki bir konu (matter) olduğunu anlayabiliriz.

Benzer şekilde KafkaProducer olarak da hangi sunucu adresine hangi port üzerinden erişilebileceğini sormakta. Docker üzerinde örnek yapıyorsanız aynı şekilde (localhost:9092) bırakmanız yeterli olacaktır.

İlk olarak bağlantı kurduğumuz Kafka ile temel bilgileri çekmek için “client.metrics()” fonksiyonunu kullanıyoruz.

Daha sonrasında ise çektiğimiz konu (matter) içerisindeki verilerin for döngüsü içerisinde verilerinin tamamının çekileceğini görebiliriz.

python3 tuketici.py
Enter fullscreen mode

Exit fullscreen mode

Uygulamanın çıktısı aşağıdaki gibi olacaktır.

ConsumerRecord(matter="ornekinsanlar", partition=0, offset=0, timestamp=1657528880472, timestamp_type=0, key=None, worth=b'{"title": "Ali Bir", "yas": "21", "dogumyeri": "Ankara", "detay": "iyi bayramlar!"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=87, serialized_header_size=-1)
ConsumerRecord(matter="ornekinsanlar", partition=0, offset=1, timestamp=1657529032891, timestamp_type=0, key=None, worth=b'{"title": "Ali Yazar", "yas": "12", "dogumyeri": "Zonguldak", "detay": "iyi bayramlar!"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=87, serialized_header_size=-1)
Enter fullscreen mode

Exit fullscreen mode


Nam et ipsa scientia potestas est

The Article was Inspired from tech community site.
Contact us if this is inspired from your article and we will give you credit for it for serving the community.

This Banner is For Sale !!
Get your ad here for a week in 20$ only and get upto 10k Tech related traffic daily !!!

Leave a Reply

Your email address will not be published. Required fields are marked *

Want to Contribute to us or want to have 15k+ Audience read your Article ? Or Just want to make a strong Backlink?