Hadoop MapReduce Örnek Uygulama
Bu yazımızda Cloudera CDH3 Hadoop Kurulumu yazımızın devamı olarak Hadoop üzerinde Java dili ile örnek bir MapReduce uygulamasının nasıl yazıldığını incelemeye çalışacağız.
Hadoop platformu da Java dili ile geliştirildiği için MapReduce uygulamaları da temelde Java ile geliştiriliyor. Ancak Hadoop streaming özelliği sayesinde C, Python gibi dillerin yanı sıra Pig diliyle de yüksek seviyede MapReduce uygulamaları yazmak mümkün.
Örnek olarak artık MapReduce’ün “Hello World!” örneği haline gelen WordCount örneğini kullanacağız. Bu uygulama kendisine verilen bir metin dosyası içindeki kelimeleri sayıyor. Örnek size çok anlamlı gelmeyebilir, Java ile bunu çok kolay yazılabilir diyebilirsiniz fakat içerisindeki kelimeleri saymanız gereken metin dosyasının 1 Petabyte olduğu bir durumda bunu normal uygulama ile yapmanız çok çok çok sıkıntılı olabilir. Oysa Hadoop MapReduce ile yazacağınız uygulama Hadoop kümesi üzerindeki çalışarak çok rahat bir şekilde bunu yapabilir.
Uygulamayı geliştirmek için pom.xml dosyasına Hadoop için gereken dependency bilgisi şu şekilde:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>0.20.2</version> <scope>compile</scope> </dependency>
Map ve Reduce fonksiyonlarını yazmak için Mapper ve Reducer sınıflarından türeyen birer sınıf yazmamız gerekiyor. Bir de uygulamayı çalıştıracak Tool sınıfından türeyen bir sınıf yazıyoruz. Kolaylık olması açısından bu üç sınıfı tek bir sınıf içerisinde yazmayı tercih edebilirsiniz. Öteki türlü kodunuz içerisinde bir sürü sınıf olacağından yönetmekte zorluk çekebilirsiniz.
Tool sınıfından türeteceğimiz uygulamamızın main metodunu içeren sınıf uygulamamızın çalışması için gereken giriş parametrelerinin alınması ile başlayıp devamında uygulamanın konfigüre edilmesi işlevini yerine getiriyor.
Uygulama çalıştırıldığı zaman giriş dosyası olarak verilen metin dosyası satır satır okunarak Mapper sınıfının map metoduna value parametresi ile aktarılıyor. map metodu bu value değerini kelimelere ayırarak context objesine anahtar kelime olacak şekilde 1 değerini veriyor. Yani örnek ile anlatmak gerekirse, value nesnesinde “bugün hava çok güzel bugün” gibi bir veri olsaydı bunun map metodundan çıktısı şu şekilde olacaktı:
bugün 1
hava 1
çok 1
güzel 1
bugün 1
Map işlemi sona erdikten sonra veriler shuffle edilerek aynı anahtar değerli olan veriler bir araya getirilerek Reducer sınıfının reduce metoduna aktarılıyor. Yani bu kez her kelime tek tek reduce sınıfına key nesnesinde aktarılıyor, values nesnesinde ise bu kelimenin karşılığındaki değer listesi aktarılıyor. Örneğin Mapper örneğinde kullandığımız cümledeki “bugün” kelimesi reduce metoduna geldiğinde values nesnesi içerisinde 2 adet 1 değerindeki sayı bulunur. reduce metodu da bu değerleri toplar ve context nesnesine yazar. Reduce aşaması da sona erdiğinde çıktı dosyasının içeriği şu şekilde olacaktır:
bugün 2
çok 1
hava 1
güzel 1
Yukarıda saydığımız işlemlerin tamamı Hadoop kümesi üzerinde dağıtık olarak çalıştığı için kümedeki sunucu sayısı ve kümenin ayarlarına göre aynı anda bir çok işlem gerçekleştirilmiş oluyor. Aşağıdaki şemaya gözatabilirsiniz:
WordCount örneğinin tamamı şu şekilde:
package com.devveri.hadoop.mapreduce; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WordCount extends Configured implements Tool { public int run(String[] args) throws Exception { if (args.length != 2) { System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } Job job = new Job(); job.setJarByClass(WordCount.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new WordCount(), args); System.exit(exitCode); } /** * Mapper */ static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString().toLowerCase(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { Text word = new Text(tokenizer.nextToken()); context.write(word, new IntWritable(1)); } } } /** * Reducer */ static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); } } }
Uygulamada kullanmak üzere örnek dosyayı oluşturup HDFS üzerine şöyle kopyalayabiliriz:
$echo "hello hadoop, hello mapreduce!" > test.txt $hadoop dfs -put test.txt test.txt
Uygulamayı jar dosyası haline getirdikten sonra komut satırından aşağıdaki şekilde çalıştırıyoruz. hadoop jar komutundan sonra jar dosyamızın ismi, ardından uygulamamızın sınıf adı, sonrasında işleme sokulacak giriş dosyası ve son olarak çıktının yazılacağı klasörün adı:
$ hadoop jar devveri-mapreduce-0.0.1-SNAPSHOT.jar com.devveri.hadoop.mapreduce.WordCount test.txt devveri 12/07/09 23:44:41 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 12/07/09 23:44:41 INFO input.FileInputFormat: Total input paths to process : 1 12/07/09 23:44:41 INFO mapred.JobClient: Running job: job_201204231254_5522 12/07/09 23:44:42 INFO mapred.JobClient: map 0% reduce 0% 12/07/09 23:44:56 INFO mapred.JobClient: map 100% reduce 0% 12/07/09 23:45:08 INFO mapred.JobClient: map 100% reduce 100% 12/07/09 23:45:13 INFO mapred.JobClient: Job complete: job_201204231254_5522 12/07/09 23:45:13 INFO mapred.JobClient: Counters: 29 12/07/09 23:45:13 INFO mapred.JobClient: Job Counters 12/07/09 23:45:13 INFO mapred.JobClient: Launched reduce tasks=1 12/07/09 23:45:13 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=11984 12/07/09 23:45:13 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 12/07/09 23:45:13 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 12/07/09 23:45:13 INFO mapred.JobClient: Launched map tasks=1 12/07/09 23:45:13 INFO mapred.JobClient: Data-local map tasks=1 12/07/09 23:45:13 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=10033 12/07/09 23:45:13 INFO mapred.JobClient: File Output Format Counters 12/07/09 23:45:13 INFO mapred.JobClient: Bytes Written=31 12/07/09 23:45:13 INFO mapred.JobClient: FileSystemCounters 12/07/09 23:45:13 INFO mapred.JobClient: FILE_BYTES_READ=61 12/07/09 23:45:13 INFO mapred.JobClient: HDFS_BYTES_READ=160 12/07/09 23:45:13 INFO mapred.JobClient: FILE_BYTES_WRITTEN=43497 12/07/09 23:45:13 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=31 12/07/09 23:45:13 INFO mapred.JobClient: File Input Format Counters 12/07/09 23:45:13 INFO mapred.JobClient: Bytes Read=31 12/07/09 23:45:13 INFO mapred.JobClient: Map-Reduce Framework 12/07/09 23:45:13 INFO mapred.JobClient: Map output materialized bytes=61 12/07/09 23:45:13 INFO mapred.JobClient: Map input records=1 12/07/09 23:45:13 INFO mapred.JobClient: Reduce shuffle bytes=0 12/07/09 23:45:13 INFO mapred.JobClient: Spilled Records=8 12/07/09 23:45:13 INFO mapred.JobClient: Map output bytes=47 12/07/09 23:45:13 INFO mapred.JobClient: CPU time spent (ms)=1850 12/07/09 23:45:13 INFO mapred.JobClient: Total committed heap usage (bytes)=220266496 12/07/09 23:45:13 INFO mapred.JobClient: Combine input records=0 12/07/09 23:45:13 INFO mapred.JobClient: SPLIT_RAW_BYTES=129 12/07/09 23:45:13 INFO mapred.JobClient: Reduce input records=4 12/07/09 23:45:13 INFO mapred.JobClient: Reduce input groups=3 12/07/09 23:45:13 INFO mapred.JobClient: Combine output records=0 12/07/09 23:45:13 INFO mapred.JobClient: Physical memory (bytes) snapshot=254259200 12/07/09 23:45:13 INFO mapred.JobClient: Reduce output records=3 12/07/09 23:45:13 INFO mapred.JobClient: Virtual memory (bytes) snapshot=4877574144 12/07/09 23:45:13 INFO mapred.JobClient: Map output records=4
Uygulama herhangi bir hata vermeden çalışıp bittikten sona erdikten sonra uygulama ile ilgili istatistikleri de konsola basıyor. Oluşan dosyaları görmek için hadoop ls komutunu çalıştırıyoruz:
$ hadoop dfs -ls devveri Found 3 items -rw-r--r-- 2 hadoop supergroup 0 2012-07-09 23:45 /user/hadoop/devveri/_SUCCESS drwxr-xr-x - hadoop supergroup 0 2012-07-09 23:44 /user/hadoop/devveri/_logs -rw-r--r-- 2 hadoop supergroup 31 2012-07-09 23:45 /user/hadoop/devveri/part-r-00000
Analizin sonucu part-r-00000 dosyasında bulunuyor. Bu dosyanın içeriğini ekrana basmak için cat komutunu kullanıyoruz:
$ hadoop dfs -cat /user/hadoop/devveri/part-r-00000 hadoop, 1 hello 2 mapreduce! 1
Java ile MapReduce uygulamaları yazmak oldukça zahmetli. Bu yüzden Hive, Pig gibi yan projeler geliştirilmiş. Pig kendine özgü bir syntax’a sahipken Hive projesi bildiğimiz SQL cümlelerini kullanıyor. İlerleyen zamanlarda bunlar ile ilgili yazılara da yer vermeye çalışacağız.
Apache Cassandra Astyanax Hadoop 1.0.3 Kurulumu Adım Adım