Apache Flume ile Veri Toplama
Devveri, big data, büyük ölçekli veri analizi sözkonusu olduğunda artık Hadoop hızlıca konu başlığı olur durumda. Nerdeyse konu sadece Hadoop ile ilgili, “devveri” başlığı ise sadece nezaketen söylenen bir genelleme gibi. Hadoop dendiğinde ise konu doğrudan MapReduce ve bazen HDFS oluyor. Durum, tarihsel oluşum sebebiyle normal olmakla beraber devveri konusunun sadece MapReduce ve HDFS’ten ibaret olmadığını hatırlamak gerekiyor. Bu yazıda doğrudan Hadoop ekosistemine dahil olmayan ancak konuyla yakından ilgili başka bir sistemden bahsetmeye çalışacağım: Flume.
Bir çok yerde okuyabileceğiniz üzere, HDFS büyük verilerin kaydedildiği yer, MapReduce ise bunların işlendiği programlama yapısı. Genellemeleri bir süreliğine kenara bırakıp pratik örneklerden ilerlersek, web sunucularından alınan accesslog’lar HDFS’te kaydediliyor, MapReduce kullanarak da sitenin belli sayfalarının hangi saatlerde daha aktif olduğu hesaplanıyor. Bu cümledeki bir çok eksik noktadan biri, accesslog’ların HDFS’e nasıl kaydedileceği. İlk akla gelen çözüm HDFS’i normal bir dosya sistemi gibi web sunucuya mount edip logların oraya yazılmasını sağlamak olabilse de endüstride bu tip bir kullanım görmek çok olası değil. Flume’un da bir üyesi olduğu “veri aktarma sistemleri” diyebileceğimiz araçlar ise bu boşluğu doldurmaya çalışıyor. Alternatifleri arasında scribe ve chukwa gibi ürünler var, ancak bunlar hakkında çok bir bilgi sahibi olmadığım için karşılaştırma yapamayacağım.
Flume çalışma yapısı
Flume, agent adında bir java uygulaması şeklinde çalışıyor ve birbirine bağlı 3 yapıya sahip:
- Source (kaynak): Logların flume’a geldiği yer
- Channel (kanal): Logların flume içinde bulunacağı yer
- Sink (hedef): Logların yazılacağı yer
olarak özetlenebilir.
En temel haliyle 1 source, 1 channel ve 1 sink ile ayarlanan bir agent, logları tek bir noktadan alıp tek bir yere yazacak şekilde çalışıyor. Birden fazla channel/sink ve/veya birden fazla agent çalıştırarak karmaşık modeller kurmak mümkün. Bunlara yazının sonunda kısaca değineceğim.
Az önceki örnek ile bu yapıları bağdaştıracak olursak, web sunucunun accesslog’ları (misal syslog ile) göndereceği yer agent konfigürasyonunda source kısmında ayarlanıyor. HDFS’e yazılacak yer ise sink kısmında ayarlanıyor. Flume konfigürasyonu şuna benziyor:
# 'weblog' agent adi, kullanilacak yapilar tanimlaniyor weblog.sources = source1 weblog.channels = channel1 weblog.sinks = hdfs1 # 'source1' adli source ayarlari weblog.sources.source1.type = syslogudp weblog.sources.source1.port = 514 weblog.sources.source1.channels = channel1 # 'channel1' adli channel ayarlari weblog.channels.channel1.type = memory # 'hdfs1' adli sink ayarlari weblog.sinks.hdfs1.type = hdfs weblog.sinks.hdfs1.channel = channel1 weblog.sinks.hdfs1.hdfs.path = hdfs:///flume/weblog
Bu konfigürasyonu çalıştıran bir makina kabaca UDP 514 portunu dinlemeye başlayacak, ve oraya gelen syslog olaylarının verisini HDFS’teki `/flume/weblog` dizinine yazacak. Tarihlere göre bölme, dosya formatları, dosyaların hangi aralıklarda roll edileceği gibi bir çok ek ayar dökümantasyonda detaylıca anlatılmış.
Kritik detaylar
Daha önce de bahsettiğim gibi flume, devveri analizi için gerekli ama önemsiz bir detayı gibi göründüğünden bu yukarıdaki gibi basit bir konfigürasyonla bile arada bir programı restart ederek hayata devam etmek gayet mümkün. Ancak yapılan işler kritikleşmeye başladığında bu restart’ların maliyeti ciddi boyutlara ulaşabilir. Bu durumda birkaç küçük ayar durumu toparlayabilir.
Flume ile benim en çok karşılaştığım hata:
org.apache.flume.ChannelException: Space for commit to queue couldn't be acquired Sinks are likely not keeping up with sources, or the buffer size is too tight
şeklinde bir mesaj. Dikkatlice öğrenildiğinde gayet açıklayıcı olan ama bir çok zaman olduğu gibi ezbere yapılan kurulumlarda çok da anlam ifade bu mesaj, özetle “HDFS’te bir problem oldu ben yazamadım, şimdi de yetiştiremiyorum.” demek. Flume’daki channel yapısı, bu tip hatalar için buffer gibi çalışır, ancak normal değeri 100 olan channel kapasitesi bir çok kurulum için yeterli değildir. Çok kaba bir hesapla web sunucu saniyede 1000 istek karşılıyor ve log üretiyorsa, 100 olay kapasiteli bir channel en fazla 100 ms’lik hataları tolere edebilir. Problem durumu bir kaç saniye sürüyorsa buffer hızlıca dolacak ve yukarıda görülen hata oluşacaktır. Daha kötüsü bu durum geçene kadar olan log’lar hiçbir yere yazılamadığı için kaybolacak ve analizlerde eksikler oluşmasına sebep olacaktır. `weblog.channels.channel1.capacity` gibi bir değeri artırmak bu sorunu azaltacaktır. Bu 100 olan değerin sonuna birkaç sıfır koymak çok da yanlış bir yaklaşım değildir.
Bir diğer durum ise gerçekten yoğun aktiviteden kaynaklı HDFS’in yetişememesi durumu olabilir. Yine yukarıdaki gibi saniyede 1000 event üreten bir sistem için her eventin HDFS’e ayrıca yazılması pek çalışabilecek bir senaryo değildir. Bu yüzden bir çok veri sistemi gibi yazma işlemleri paketler (batch) halinde yapılır. `weblog.sinks.hdfs1.hdfs.batchSize` değeri normal paket boyutunu 100 olarak tanımlamıştır ve kabaca 1000 gibi bir değere çıkarmak yazma hızını ciddi miktarda artıracaktır.
Ekstralar
En sık kullanımı yukarıdaki gibi olsa da Flume bunlarla sınırlı değil. Olayları alabilmek için syslog yanında doğrudan dizinden okuma, netcat, HTTP, avro, thrift gibi farklı yapıları kullanmak mümkün. Channel’lar için ise hafıza yanında, dosya, JDBC gibi kalıcı yerler kullanılabiliyor. Event’leri yazmak içinse HDFS dışında normal dosyalar, başka yerlerdeki avro, thrift alıcıları ya da HBase, Solr, ElasticSearch gibi veritabanları kullanılabilir. Ayrıca birden fazla channel/sink ayarlayarak replication, multiplexing, failover ya da load-balancing senaryoları tasarlanabilir.
Flume olay/log taşıma dışında, bir kısım iş mantığını da halledebilir. Bu yazının yazılmasına da vesile olan yapıların adı interceptor‘ler. Source’lar üzerine tanımlanan küçük kod parçacıkları gelen olayların filtrelenmesi, değiştirilmesi ya da anlamlandırılması gibi işleri yapabiliyor. Hatta birden fazla interceptor zincirleme bağlanarak birinde yapılan iş diğerinde yorumlanabiliyor. Bu konu benim için de gayet yeni olduğundan çok basit bir örnek verebileceğim:
Web sunucularından ya da yük dengeleyiciden alınan olayların hepsi tek bir sistem için anlamlı olmayabilir. Örneğin sitenin web içeriği ile medya içeriği aynı yük-dengeleyiciden geçiyor ancak analizler ayrı ayrı yapılıyorsa misal `/static` şeklinde başlayan medya istekleri başka bir channel’a, oradan da başka bir HDFS dizinine yönlendirilebilir. Böylece web istekleri için yapılacak analizler ciddi miktarda kolaylaşacaktır. Ya da bazı tip istekler tamamen anlamsız olabilir ve filtrelenmek istenebilir. Bu durumda:
weblog.sources.source1.interceptors = i1 weblog.sources.source1.interceptors.i1.type = regex_filter weblog.sources.source1.interceptors.i1.regex = GET\\s/crossdomain.xml weblog.sources.source1.interceptors.i1.excludeEvents = true
gibi bir ayar `/crossdomain.xml` şeklinde bilinen flash’ın otomatik gönderdiği istekleri filtreleyecektir. Utanarak kabul ediyorum ki daha yakın zamanda servislerimizden birinde bunun gibi küçük bir ayarla kaydettiğimiz/işlediğimiz veri miktarında ciddi tasarruf sağladık.
Regex işlemlerinin yanısıra UUID, timestamp ve morphline gibi farklı tip interceptor’leri bağlayarak çok daha akıllıca işler yapmak mümkün. Hatta çok tavsiye edemesem de bazı girişimler sadece interceptor’ler kullanılarak real-time analiz yapılabileceğini gösteriyor.
Son olarak tüm bu yapılar kolaylıkla modifiye edilebiliyor ve yeni tip source/channel/sink/interceptor/vs. tanımlanabiliyor. Bu sayede Flume istenilen tüm işlere tüm sistemlere uygun hale getirilebiliyor. Bu konuyu ayrı bir yazıya bırakıyorum.
Data Scientist kimdir? Pig üzerinden Hadoop ile MongoDB Entegrasyonu