DevVeri.com

Boğulacaksan büyük veride boğul!

ElasticSearch ve Pig Entegrasyonu

elasticsearchPig ile sadece Hadoop üzerindeki verileri değil, MongoDB gibi farklı veri kaynaklarını da analiz edebileceğimizden bir yazımızda bahsetmiştik. Pig ile ElasticSearch üzerindeki verileri de analiz etmemiz mümkün. Aynı şekilde Hadoop üzerindeki verileri Pig aracılığı ile ElasticSearch üzerinde kolayca indekslemeniz ve analizler yapmanız oldukça kolay.

Hazırlık

Pig ile ElasticSearch entegrasyonu ile ilgili projeye GitHub üzerinden erişmeniz mümkün. Proje ile sadece Pig değil, Hive ve MapReduce entegrasyonu da gerçekleştirmek mümkün. Build ettikten sonra oluşan elasticsearch-hadoop.jar dosyasını Hadoop’un lib dizini içerisine kopyalamamız gerekiyor. Benim kurulumumda bu klasör /usr/lib/hadoop/lib şeklindeydi.

Maven ile kullanmak istediğinizde aşağıdaki ayarları kullanabilirsiniz.

<dependency>
 <groupId>org.elasticsearch</groupId>
 <artifactId>elasticsearch-hadoop</artifactId>
 <version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
<repositories>
 <repository>
 <id>sonatype-oss</id>
 <url>http://oss.sonatype.org/content/repositories/snapshots</url>
 <snapshots><enabled>true</enabled></snapshots>
 </repository>
</repositories>

Pig ve Hive ile ElasticSearch verilerini analiz ederken “Speculative Execution” özelliğini devredışı bırakmamız gerekiyor. “Speculative Execution” özelliği MapReduce işlemleri sırasında bir job’un yavaş çalışması durumunda o işin bitmesini veya fail etmesini beklemek yerine aynı anda aynı işi yapacak ve belki daha önce tamamlayacak başka bir job’ın başlatılarak sonucu garantilemeyi amaçlayan bir özellik.

Örnek amaçlı olarak Hortonworks’ün paylaştığı Newyork borsasının 2000-2001 yılı verilerini kullanacağız. Verileri https://s3.amazonaws.com/hw-sandbox/tutorial1/NYSE-2000-2001.tsv.gz adresinden indirdikten sonra HDFS üzerine aktarmamız gerekiyor.

ElasticSearch’e Veriyi Aktarma

ElasticSearch üzerindeki verilere Pig üzerinden erişim için EsStorage sınıfını kullanıyoruz. Bu sınıfın constructor metoduna bağlanacağımız sunucuların adres ve portları, bağlantı timeout değerlerini, indeksin otomatik yaratılıp yaratılmayacağı gibi ayarları veriyoruz.

HDFS üzerinde duran borsa verisini Pig ile yükleyip ElasticSearch’e aktarmak için neredeyse hiçbirşey yapmamıza gerek yok. Pig bizim için herşeyi halletmiş. Tek yapmamız gereken Load komutunu PigStorage sınıfı ile HDFS üzerindeki veriyi yüklemek, daha sonra Store komutunu EsStorage sınıfı ile kullanarak veriyi indekslemek.

REGISTER elasticsearch-hadoop.jar;

/* disable speculative execution */
set mapred.map.tasks.speculative.execution false
set mapred.reduce.tasks.speculative.execution false

DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage('es.nodes=localhost:9200');

/* load nyse data */
A = LOAD '$INPUT' USING PigStorage() AS (exchange:chararray, stock_symbol:chararray, stock_date:chararray, stock_price_open:double,
 stock_price_high:double, stock_price_low:double, stock_price_close:double, stock_volume:double, stock_price_adj_close:double);

/* write to elasticsearch */
STORE A INTO '$OUTPUT' USING EsStorage();

ElasticSearch’ten Veriyi Okuma

EsStorage sınıfı ile veriyi okurken indeks içerisinden istediğimiz alanları sanki veritabanından okuyormuş gibi seçebiliyoruz. Hatta sorgu vererek sadece bu sorgunun sonucunu Pig tarafında da analiz etme şansımız bulunuyor.

REGISTER elasticsearch-hadoop.jar;

/* disable speculative execution */
set mapred.map.tasks.speculative.execution false
set mapred.reduce.tasks.speculative.execution false

DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage('es.nodes=localhost:9200', 'es.query=?q=*');

/* load all documents from elasticsearch and filter by a stock symbol */
A = LOAD '$INPUT' USING EsStorage() AS (exchange:chararray, stock_symbol:chararray, stock_date:chararray, stock_price_open:double,
                stock_price_high:double, stock_price_low:double, stock_price_close:double, stock_volume:double, stock_price_adj_close:double);
B = FILTER A BY stock_symbol == '$SYMBOL';
DUMP B;

Örnek kodumuzda borsa bilgilerini yükleyip içerisinden sadece istediğimiz sembole sahip kayıtları getiriyoruz. Bu entegrasyon sayesinde örneğin HDFS üzerindeki verileri, ElasticSearch üzerindeki verileri ve örneğin MongoDB üzerindeki verileri join’leyebilme ve her türlü analiz etme imkanımız oluyor.

Kaynak: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/pig.html

, , ,

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

This site uses Akismet to reduce spam. Learn how your comment data is processed.