Pig üzerinden Hadoop ile MongoDB Entegrasyonu
Açık kaynaklı büyük veri teknolojilerinin en beğendiğim özelliği, birden fazla projeyi birlikte kullanabilmek. Bu sayede, farklı projelerin güçlü olduğu taraflardan faydalanarak ortaya daha büyük bir değer çıkartmak mümkün oluyor. Bu yazıda MongoDB ile Hadoop‘u Pig üzerinden entegre ederek nasıl kullanabileceğimize bir örnek vermeye çalışacağız.
Hazırlık
MongoDB’nin geliştirdiği connector sayesinde, MongoDB’deki verileri Hadoop üzerine taşımak ya da Hadoop MapReduce işleri ile MongoDB’deki verileri analiz etmek mümkün oluyor. Bunun için öncelikle connector projesini indirip build etmemiz gerekiyor:
$ git clone https://github.com/mongodb/mongo-hadoop
Yalnız build etmeden önce hangi Hadoop versiyonu ile entegre edecekseniz bunu build.sbt dosyasında belirtmeniz gerekiyor. Ben örneği Hortonworks Sandbox içerisinde çalıştırdığım için build.sbt dosyası içerisindeki ThisBuild satırını 2.2 olacak şekilde, aşağıdaki gibi düzenledim:
name := "mongo-hadoop" organization := "org.mongodb" hadoopRelease in ThisBuild := "2.2"
Hangi Hadoop versiyonu için ne yazmanız gerektiğini GitHub sayfasında bulabilirsiniz. Daha sonrasında projeyi build ediyoruz:
$ ./sbt package
Build işlemi başarılı bir şekilde tamamlandığında aşağıdaki jar dosyalarının oluşması gerekiyor:
$ find . | grep jar ./core/target/mongo-hadoop-core-1.2.0.jar ./core/target/mongo-hadoop-core_2.2.0-1.2.0.jar ./flume/target/mongo-flume-1.2.0.jar ./hive/target/mongo-hadoop-hive-1.2.0.jar ./hive/target/mongo-hadoop-hive_2.2.0-1.2.0.jar ./pig/target/mongo-hadoop-pig-1.2.0.jar ./pig/target/mongo-hadoop-pig_2.2.0-1.2.0.jar ./target/mongo-hadoop-1.2.0.jar ./target/mongo-hadoop_2.2.0-1.2.0.jar ./testing/mus_tests/helpers.jar
Son olarak MongoDB Java Driver kütüphanesinin ve build ettiğimiz jar dosyalarından gerekli olanları Hadoop’un lib dizini içerisine kopyalamamız gerekiyor. Benim kurulumumda bu klasör /usr/lib/hadoop/lib şeklindeydi. Aşağıdaki dosyaların bu dizinde olması gerekiyor:
mongo-hadoop_2.2.0-1.2.0.jar mongo-hadoop-core_2.2.0-1.2.0.jar mongo-hadoop-pig_2.2.0-1.2.0.jar mongo-java-driver-2.11.1.jar
MongoDB tarafında örnek olarak oluşturduğum users tablosu içerisindeki veriyi mongo istemcisi üzerinden kontrol ediyorum:
$ mongo MongoDB shell version: 2.4.9 connecting to: test > use test switched to db test > show collections system.indexes user > db.user.find() { "_id" : ObjectId("52e52e3349008a3f31dea285"), "id" : 1, "name" : "test1", "email" : "test1@test.com" } { "_id" : ObjectId("52e52e5b49008a3f31dea286"), "id" : 2, "name" : "test2", "email" : "test2@test.com" }
MongoDB’den Veriyi Okuma
Amacım kullanıcı verisini Pig kullanarak sorgulamak. Bunu yapmamız için gereki olan Pig kodu şu şekilde:
/* Kullanacagimiz kutuphaneleri ayni zamanda Pig tarafindan da erisilebilir bir yere yuklenmeli */ register /tmp/udfs/mongo-hadoop-core_2.2.0-1.2.0.jar register /tmp/udfs/mongo-hadoop-pig_2.2.0-1.2.0.jar register /tmp/udfs/mongo-java-driver-2.11.1.jar set default_parallel 1 /* MongoLoader sinifi icin kisaltma */ define MongoLoader com.mongodb.hadoop.pig.MongoLoader; /* Veriyi yukle */ a = load 'mongodb://localhost:27017/test.user' using MongoLoader() as (user: map[]); b = foreach a generate user#'id', user#'name', user#'email'; dump b;
Bu kodu açıklamaya çalışırsak; Öncelikle register komutuyla gerekli kütüphaneleri Pig’e tanıtıyoruz. Tabii gerekli olan bu kütüphaneleri HDFS üzerinde Pig tarafından erişilebilir bir yere upload edilmiş olması gerekiyor. Ben /tmp/udfs klasörüne yükledim.
Sonrası oldukça kolay; MongoLoader sınıfı ile belirttiğimiz adresteki MongoDB’ye bağlanıp, test şemasındaki user tablosunu yüklemesini sağlıyoruz. Yüklenen bu veri Pig tarafında Map tipine dönüştürülüyor. Bu veri içerisinden istediğimiz kolonları alıyoruz ve dump komutu ile ekrana basıyoruz.
Bu yöntem ile JS kullanarak MapReduce yazmak yerine Pig’in avantajlarını kullanabiliriz.
Hadoop’tan Veriyi MongoDB’ye Aktarma
Tam tersi şekilde, Hadoop üzerindeki veriyi de MongoDB’ye aktarabiliriz. Örneğin Hadoop üzerinde içerisinde ürün numarası, kullanıcı numarası ve zaman bilgisi içeren bir dosyamız olduğunu düşünelim:
1 111 1386704549204 2 222 1386704549325 3 333 1386704550446 4 444 1386704551837 5 444 1386704554568 6 666 1386704554613
Bu veriyi MongoDB’ye aktarmak için şu Pig kodunu kullanabiliriz:
register /tmp/udfs/mongo-hadoop-core_2.2.0-1.2.0.jar register /tmp/udfs/mongo-hadoop-pig_2.2.0-1.2.0.jar register /tmp/udfs/mongo-java-driver-2.11.1.jar set default_parallel 1 /* Tekrar eden kayit olmamasi icin Speculative Execution ozelligini devre disi birakiyoruz */ set mapred.map.tasks.speculative.execution false set mapred.reduce.tasks.speculative.execution false /* MongoInsertStorage sinifi icin kisaltma */ define MongoInsertStorage com.mongodb.hadoop.pig.MongoInsertStorage; /* dosyayi yukle */ a = load '/user/hue/useritems' using PigStorage() as (userId:int, memberId:int, time:long); /* memberId'ye gore grupla */ b = group a by memberId; c = foreach b generate group as memberId, $1 as items; /* Mongo'ya kaydet */ store c into 'mongodb://localhost:27017/test.useritem' using MongoInsertStorage('items:bag', 'memberId');
MongoDB’ye verileri kaydederken herhangi bir problem yaşamamak için Hadoop’un Speculative Execution özelliğini devre dışı bırakmak gerekiyor. Burada useritems dosyasını yükleyip, verileri memberId alanına göre grupluyoruz. Sonrasında bu veriyi grupladığımız haliyle MongoDB’ye MongoInsertStorage sınıfını kullanarak test şeması içindeki useritem tablosuna kaydediyoruz.
Kodun çalışması sonucunda MongoDB’de aşağıdaki veriler girilmiş bulunuyor; 444 numaralı memberId kaydının birden fazla item bilgisi olduğunu görebilirsiniz:
> db.useritem.find() { "_id" : ObjectId("52e55730e4b0c7d85da7ecd7"), "memberId" : 111, "items" : [ { "userId" : 1, "memberId" : 111, "time" : NumberLong("1386704549204") } ] } { "_id" : ObjectId("52e55730e4b0c7d85da7ecd8"), "memberId" : 222, "items" : [ { "userId" : 2, "memberId" : 222, "time" : NumberLong("1386704549325") } ] } { "_id" : ObjectId("52e55730e4b0c7d85da7ecd9"), "memberId" : 333, "items" : [ { "userId" : 3, "memberId" : 333, "time" : NumberLong("1386704550446") } ] } { "_id" : ObjectId("52e55730e4b0c7d85da7ecda"), "memberId" : 444, "items" : [ { "userId" : 5, "memberId" : 444, "time" : NumberLong("1386704554568") }, {"userId" : 4, "memberId" : 444, "time" : NumberLong("1386704551837") } ] } { "_id" : ObjectId("52e55730e4b0c7d85da7ecdb"), "memberId" : 666, "items" : [ { "userId" : 6, "memberId" : 666, "time" : NumberLong("1386704554613") } ] }
Daha fazla bilgi için aşağıdaki sayfaları inceleyebilirsiniz:
https://github.com/mongodb/mongo-hadoop
http://docs.mongodb.org/ecosystem/tutorial/getting-started-with-hadoop/
http://hortonworks.com/blog/pig-as-connector-part-one-pig-mongodb-and-node-js/
Apache Flume ile Veri Toplama Hive JDBC Bağlantısı