DevVeri.com

Boğulacaksan büyük veride boğul!

Spark SQL

spark-logoApache Spark projesinin yeteneklerinden birisi de SQL ve HiveQL arayüzleri ile veri işleyebilmesi. Yazıyı yayınlamadan önce bu proje Shark olarak isimlendiriliyordu, ancak ismi değiştirilerek Spark SQL olarak adlandırıldı ve mevcut projeye dahil edildi.

Spark SQL Hive, Impala, Drill projelerine bir alternatif oluşturuyor. JSON, Parquet vs gibi populer tüm veri tiplerini destekliyor. Ayrıca Hive metadata’sını da kullanabildiği için Hive tablolarını Spark SQL ile sorgulamak mümkün.

Spark yazımızda verilerin RDD’ler üzerinden işlendiğinden bahsetmiştik. SQL arayüzü için de SchemaRDD (Java için JavaSchemaRDD) tipini kullanıyoruz. Veriyi RDD olarak yükledikten sonra tablo olarak register ediyoruz, ardından bu tablo üzerinde SQL komutununu çalıştırabiliyoruz. SQLContext üzerinde sorgumuzu çalıştırdıktan sonra veri bize Row tipinde dönüyor.

Örnek amaçlı olarak daha önce de kullandığımız NYSE verilerini kullanacağız. Bu veriye karşılık gelen ExchangeRecord sınıfını yazdıktan sonra veriyi aşağıdaki gibi sorgulayabiliriz:

package com.devveri.spark.job;

import com.devveri.spark.model.ExchangeRecord;
import com.devveri.spark.util.SparkHelper;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;

import java.util.List;

public class NYSEExample {

    public static void main(String[] args) throws Exception {
        final String appName = "NYSEExample " + System.currentTimeMillis();
        final String stockFile = "file:///Users/devveri/dev/test/NYSE-2000-2001.tsv";

        // get contexts
        JavaSparkContext jsc = new JavaSparkContext(SparkHelper.getDefaultConf()); 
        JavaSQLContext sqlContext = new JavaSQLContext(jsc);

        // load data and convert to java object
        JavaRDD exchanges = jsc.textFile(stockFile).map(new Function<String, ExchangeRecord>() {
            public ExchangeRecord call(String s) throws Exception {
                return new ExchangeRecord(s);
            }
        });

        // apply schema
        JavaSchemaRDD schemaExchange = sqlContext.applySchema(exchanges, ExchangeRecord.class);
        schemaExchange.registerAsTable("exchanges");

        // execute an sql query
        JavaSchemaRDD topExchanges = sqlContext.sql(
                "select stockSymbol, sum(stockPriceClose) as total_value " +
                "from exchanges " +
                "where stockVolume > 1000 " +
                "group by stockSymbol " +
                "order by total_value desc " +
                "limit 10");

        List result = topExchanges.map(new Function<Row, String>() {
            public String call(Row row) throws Exception {
                return String.format("%s\t%f",
                        row.getString(0),
                        row.getDouble(1));
            }
        }).collect();

        for (String s : result) {
            System.out.println(s);
        }
    }

}

SQLContext sadece basit SQL cümlelerini destekliyor. Daha kompleks SQL cümleleri gerekirse HiveContext kullanmak gerekiyor.

Spark SQL, Hive uyumluluğu, SerDe desteği, JSON, Parquet dosyalarında direk destek vermesi, farklı projelerle olan entegrasyonu (Cassanda gibi) oldukça yetenekli görünüyor. Özellikle SQL arayüzü desteklemeyen projelere entegre ederek çok başarılı sonuçlar elde edilebilir. Sadece benchmark’lara baktığımızda performans olarak benzer projelerin önüne geçemediğini görüyoruz.

Daha fazla detay için dökümantasyona buradan erişebilirsiniz.

, ,

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

This site uses Akismet to reduce spam. Learn how your comment data is processed.