ElasticSearch ve Pig Entegrasyonu
Pig ile sadece Hadoop üzerindeki verileri değil, MongoDB gibi farklı veri kaynaklarını da analiz edebileceğimizden bir yazımızda bahsetmiştik. Pig ile ElasticSearch üzerindeki verileri de analiz etmemiz mümkün. Aynı şekilde Hadoop üzerindeki verileri Pig aracılığı ile ElasticSearch üzerinde kolayca indekslemeniz ve analizler yapmanız oldukça kolay.
Hazırlık
Pig ile ElasticSearch entegrasyonu ile ilgili projeye GitHub üzerinden erişmeniz mümkün. Proje ile sadece Pig değil, Hive ve MapReduce entegrasyonu da gerçekleştirmek mümkün. Build ettikten sonra oluşan elasticsearch-hadoop.jar dosyasını Hadoop’un lib dizini içerisine kopyalamamız gerekiyor. Benim kurulumumda bu klasör /usr/lib/hadoop/lib şeklindeydi.
Maven ile kullanmak istediğinizde aşağıdaki ayarları kullanabilirsiniz.
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>2.0.0.BUILD-SNAPSHOT</version> </dependency>
<repositories> <repository> <id>sonatype-oss</id> <url>http://oss.sonatype.org/content/repositories/snapshots</url> <snapshots><enabled>true</enabled></snapshots> </repository> </repositories>
Pig ve Hive ile ElasticSearch verilerini analiz ederken “Speculative Execution” özelliğini devredışı bırakmamız gerekiyor. “Speculative Execution” özelliği MapReduce işlemleri sırasında bir job’un yavaş çalışması durumunda o işin bitmesini veya fail etmesini beklemek yerine aynı anda aynı işi yapacak ve belki daha önce tamamlayacak başka bir job’ın başlatılarak sonucu garantilemeyi amaçlayan bir özellik.
Örnek amaçlı olarak Hortonworks’ün paylaştığı Newyork borsasının 2000-2001 yılı verilerini kullanacağız. Verileri https://s3.amazonaws.com/hw-sandbox/tutorial1/NYSE-2000-2001.tsv.gz adresinden indirdikten sonra HDFS üzerine aktarmamız gerekiyor.
ElasticSearch’e Veriyi Aktarma
ElasticSearch üzerindeki verilere Pig üzerinden erişim için EsStorage sınıfını kullanıyoruz. Bu sınıfın constructor metoduna bağlanacağımız sunucuların adres ve portları, bağlantı timeout değerlerini, indeksin otomatik yaratılıp yaratılmayacağı gibi ayarları veriyoruz.
HDFS üzerinde duran borsa verisini Pig ile yükleyip ElasticSearch’e aktarmak için neredeyse hiçbirşey yapmamıza gerek yok. Pig bizim için herşeyi halletmiş. Tek yapmamız gereken Load komutunu PigStorage sınıfı ile HDFS üzerindeki veriyi yüklemek, daha sonra Store komutunu EsStorage sınıfı ile kullanarak veriyi indekslemek.
REGISTER elasticsearch-hadoop.jar; /* disable speculative execution */ set mapred.map.tasks.speculative.execution false set mapred.reduce.tasks.speculative.execution false DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage('es.nodes=localhost:9200'); /* load nyse data */ A = LOAD '$INPUT' USING PigStorage() AS (exchange:chararray, stock_symbol:chararray, stock_date:chararray, stock_price_open:double, stock_price_high:double, stock_price_low:double, stock_price_close:double, stock_volume:double, stock_price_adj_close:double); /* write to elasticsearch */ STORE A INTO '$OUTPUT' USING EsStorage();
ElasticSearch’ten Veriyi Okuma
EsStorage sınıfı ile veriyi okurken indeks içerisinden istediğimiz alanları sanki veritabanından okuyormuş gibi seçebiliyoruz. Hatta sorgu vererek sadece bu sorgunun sonucunu Pig tarafında da analiz etme şansımız bulunuyor.
REGISTER elasticsearch-hadoop.jar; /* disable speculative execution */ set mapred.map.tasks.speculative.execution false set mapred.reduce.tasks.speculative.execution false DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage('es.nodes=localhost:9200', 'es.query=?q=*'); /* load all documents from elasticsearch and filter by a stock symbol */ A = LOAD '$INPUT' USING EsStorage() AS (exchange:chararray, stock_symbol:chararray, stock_date:chararray, stock_price_open:double, stock_price_high:double, stock_price_low:double, stock_price_close:double, stock_volume:double, stock_price_adj_close:double); B = FILTER A BY stock_symbol == '$SYMBOL'; DUMP B;
Örnek kodumuzda borsa bilgilerini yükleyip içerisinden sadece istediğimiz sembole sahip kayıtları getiriyoruz. Bu entegrasyon sayesinde örneğin HDFS üzerindeki verileri, ElasticSearch üzerindeki verileri ve örneğin MongoDB üzerindeki verileri join’leyebilme ve her türlü analiz etme imkanımız oluyor.
Kaynak: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/pig.html
Cloudera Apache Hadoop Geliştirici eğitimini başarıyla gerçekleştirdi Apache Spark