Sqoop ile Veritabanı Hadoop Arasında Veri Aktarımı
Hadoop platformunun en büyük özelliklerinden birisi de farklı kaynaklardan farklı formatlarda gelen verilerin saklanması ve analiz edilebilmesini sağlaması.
İlişkisel veritabanında saklanan verilerin Hadoop üzerinde efektif biçimde işlenebilmesi için bu verilerin HDFS üzerine aktarılması gerekiyor. Sqoop, ilişkisel veritabanları ile Hadoop arasında veri aktarımı için tasarlanmış bir açık kaynaklı bir araç olarak karşımıza çıkıyor. Zaten ismi de Sql-to-Hadoop kelimelerinden türetilmiş.
Sqoop genel olarak MySQL, PostgreSQL, Oracle, SQL Server ve DB2 gibi popüler veritabanlarının hepsini desteklemekle beraber Hadoop gibi Java ile geliştirilmiş bir proje olduğu için JDBC ile erişilebilen tüm veritabanlarıyla da çalışabiliyor.
Sqoop ile hem import hem export yapılabiliyor. Veritabanından HDFS üzerine, direkt Hive tablosu olarak ya da HBase’e veri aktarmak mümkün. Sqoop’un avantajı ise veri aktarım işlemlerini MapReduce görevleri ile paralel olarak yaparak aktarımı çok daha hızlı tamamlamak. Ayrıca MySQL ve PostgreSQL için JDBC kullanmadan (mysqldump gibi) daha düşük seviyeli ve performanslı veri aktarımı da yapılabiliyor.
Import işlemi sırasında Sqoop basitçe meta verilerden faydalanarak tablonun birincil anahtarını bulup minimum ve maksimum değerlerini alarak eşit olarak Map sayısına uygun olarak bölerek farklı düğümler üzerinde bu verileri paralel olarak aktarıyor. Bu yüzden sonuç klasör içinde birden fazla dosyaya yazılıyor. Aktarım sırasında veritabanına yeni kayıt geliyorsa bunlar aktarılmayabilir, veri tutarlılığına dikkat etmek gerekir.
Export işlemi sırasında ise aksi belirtilmediği sürece verileri binerli gruplar halinde veritabanına INSERT ediyor. Bu işlem de paralel olarak yapıldığı için aktarım sırasında veritabanında yük oluşturabilir. Her bir grubun yazılması kendi başına bir transaction olduğu için burada da veri tutarlılığına dikkat etmek gerekir. Eğer kayıtların tamamı aktarıldıktan sonra aktif olması isteniyorsa ara tablo kullanımını sağlayan –staging-table parametresi kullanılabilir. Bu tablonun yaratılması ve temizlenmesi de otomatik yapılmaz, elle yapmak gerekir.
Şimdi Sqoop ile veritabanından Hive’a verilerin aktarılması, Hive’da yapılan bir sorgu sonucu oluşan verilerin de tekrar veritabanına aktarılmasını örnek olarak inceleyelim. (Hive ile ilgili bilgi için ilgili yazımızı inceleyebilirsiniz)
Örnekleri MySQL üzerinde hazırladım ve internetten bulabileceğimiz örnek verileri kullandım. Verilerimizi internetten indirip veritabanına şu şekilde aktarabiliriz:
$ wget https://launchpad.net/test-db/employees-db-1/1.0.6/+download/employees_db-full-1.0.6.tar.bz2 $ tar -xjf employees_db-full-1.0.6.tar.bz2 $ cd employees_db/ $ mysql -h localhost -u test -P -t < employees.sql
Veriler hazır olduğuna göre şimdi Sqoop’u kuralım. Örnekleri Hadoop’un 1.0.3 sürümünde yaptığım için bu sürüme uygun olan son sürümü indirip kurdum:
$ cd /usr/java/ $ wget http://www.eu.apache.org/dist/sqoop/1.4.3/sqoop-1.4.3.bin__hadoop-1.0.0.tar.gz $ gzip -dc sqoop-1.4.3.bin__hadoop-1.0.0.tar.gz | tar xf - $ ln -s /usr/java/sqoop-1.4.3.bin__hadoop-1.0.0 sqoop
Sqoop’un çalışması için Hadoop’un kurulu olduğu dizin ile ilgili tanımlamaları yapmamız gerekiyor:
export HADOOP_COMMON_HOME=$HADOOP_HOME export HADOOP_MAPRED_HOME=$HADOOP_HOME
MySQL JDBC sürücüsünü de indirip Sqoop kurulumunun buluğunduğu yerin altındaki lib klasörüne aktarmamız gerekiyor. Güncel sürücüyü burdan indirebilirsiniz.
Kurulumlar ve verilerimiz hazır olduğunu anlayabilmek için Sqoop ile tablolarımızı listeleyebiliriz:
$ sqoop list-tables --connect jdbc:mysql://localhost:3306/employees --username test -P Warning: /usr/lib/hbase does not exist! HBase imports will fail. Please set $HBASE_HOME to the root of your HBase installation. Enter password: 13/07/07 19:59:31 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset. departments dept_emp dept_manager employees employees_with_titles employees_with_titles_ex salaries titles
Bu örnekte HBase ile ilgili herhangi bir ayar yapmadığımız için bir uyarı alıyoruz. Eğer tablo isimleri listelendiyse ayarlarımız doğru demektir.
Sqoop komut satırından parametrik olarak çalıştırılan bir araç, genel parametreleri şu şekilde:
$ sqoop help Warning: /usr/lib/hbase does not exist! HBase imports will fail. Please set $HBASE_HOME to the root of your HBase installation. usage: sqoop COMMAND [ARGS] Available commands: codegen Generate code to interact with database records create-hive-table Import a table definition into Hive eval Evaluate a SQL statement and display the results export Export an HDFS directory to a database table help List available commands import Import a table from a database to HDFS import-all-tables Import tables from a database to HDFS job Work with saved jobs list-databases List available databases on a server list-tables List available tables in a database merge Merge results of incremental imports metastore Run a standalone Sqoop metastore version Display version information See 'sqoop help COMMAND' for information on a specific command.
Bu komutlardan belli başlıları şöyle; codegen parametresi ile tablolara karşılık gelen Java sınıflarının oluşturulması sağlanıyor. create-hive-table ile Hive üzerinde tabloların yaratılması sağlanıyor. import komutu ile veritabanından Hadoop’a, export komutu ile Hadoop’dan veritabanına veri aktarılıyor. Komutlar ile ilgili daha fazla detay için, örneğin import için, sqoop import help yazabilirsiniz.
Şimdi employees tablosunu HDFS üzerine şu şekilde aktarabiliriz:
$ sqoop import --connect jdbc:mysql://localhost:3306/employees --username test -P --table employees Warning: /usr/lib/hbase does not exist! HBase imports will fail. Please set $HBASE_HOME to the root of your HBase installation. Enter password: 13/07/07 17:49:30 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset. 13/07/07 17:49:30 INFO tool.CodeGenTool: Beginning code generation 13/07/07 17:49:30 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1 13/07/07 17:49:30 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1 13/07/07 17:49:30 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/java/hadoop Note: /tmp/sqoop-haqen/compile/53e09041098415156dfde1b76932ac43/employees.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 13/07/07 17:49:32 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-haqen/compile/53e09041098415156dfde1b76932ac43/employees.jar 13/07/07 17:49:32 WARN manager.MySQLManager: It looks like you are importing from mysql. 13/07/07 17:49:32 WARN manager.MySQLManager: This transfer can be faster! Use the --direct 13/07/07 17:49:32 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path. 13/07/07 17:49:32 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql) 13/07/07 17:49:32 INFO mapreduce.ImportJobBase: Beginning import of employees 13/07/07 17:49:33 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`emp_no`), MAX(`emp_no`) FROM `employees` 13/07/07 17:49:34 INFO mapred.JobClient: Running job: job_201307071405_0001 13/07/07 17:49:35 INFO mapred.JobClient: map 0% reduce 0% 13/07/07 17:49:55 INFO mapred.JobClient: map 25% reduce 0% 13/07/07 17:49:58 INFO mapred.JobClient: map 50% reduce 0% 13/07/07 17:50:01 INFO mapred.JobClient: map 75% reduce 0% 13/07/07 17:50:04 INFO mapred.JobClient: map 100% reduce 0% 13/07/07 17:50:09 INFO mapred.JobClient: Job complete: job_201307071405_0001 13/07/07 17:50:09 INFO mapred.JobClient: Counters: 18 13/07/07 17:50:09 INFO mapred.JobClient: Job Counters 13/07/07 17:50:09 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=37151 13/07/07 17:50:09 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 13/07/07 17:50:09 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 13/07/07 17:50:09 INFO mapred.JobClient: Launched map tasks=4 13/07/07 17:50:09 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0 13/07/07 17:50:09 INFO mapred.JobClient: File Output Format Counters 13/07/07 17:50:09 INFO mapred.JobClient: Bytes Written=13821993 13/07/07 17:50:09 INFO mapred.JobClient: FileSystemCounters 13/07/07 17:50:09 INFO mapred.JobClient: HDFS_BYTES_READ=464 13/07/07 17:50:09 INFO mapred.JobClient: FILE_BYTES_WRITTEN=122484 13/07/07 17:50:09 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=13821993 13/07/07 17:50:09 INFO mapred.JobClient: File Input Format Counters 13/07/07 17:50:09 INFO mapred.JobClient: Bytes Read=0 13/07/07 17:50:09 INFO mapred.JobClient: Map-Reduce Framework 13/07/07 17:50:09 INFO mapred.JobClient: Map input records=300024 13/07/07 17:50:09 INFO mapred.JobClient: Physical memory (bytes) snapshot=387411968 13/07/07 17:50:09 INFO mapred.JobClient: Spilled Records=0 13/07/07 17:50:09 INFO mapred.JobClient: CPU time spent (ms)=29100 13/07/07 17:50:09 INFO mapred.JobClient: Total committed heap usage (bytes)=343277568 13/07/07 17:50:09 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1597562880 13/07/07 17:50:09 INFO mapred.JobClient: Map output records=300024 13/07/07 17:50:09 INFO mapred.JobClient: SPLIT_RAW_BYTES=464 13/07/07 17:50:09 INFO mapreduce.ImportJobBase: Transferred 13.1817 MB in 37.2724 seconds (362.1456 KB/sec) 13/07/07 17:50:09 INFO mapreduce.ImportJobBase: Retrieved 300024 records.
Bu aktarım işlemini –direct parametresi ile yapsaydık JDBC yerine paralel olarak mysqldump kullanarak tabloyu aktarması 33.5059 sn (402.8555 KB/sec) sürecekti. Şimdi HDFS üzerinde dosyaların oluşup oluşmadığını kontrol edebilirsiniz.
Sqoop ile tablonun tamamını aktarmak zorunda değiliz, –query parametresi ile verdiğimiz sorgu sonucunun aktarılmasını sağlayabiliriz. Bunun dışında varsayılan olarak metin dosyası haline aktarılan verilerin alan ve satır ayraçlarını belirleyebiliriz, metin dosyaları dışında Hadoop’un binary formatı olan SequenceFile veya Avro formatında da veriler aktarılabiliriz.
Örneğimize devam etmek için veritabanındaki tüm tabloları –hive-import komutunu da kullanarak direkt olarak Hive’a aktarmak için şu komutu çalıştırıyoruz:
$ sqoop import-all-tables --connect jdbc:mysql://localhost:3306/employees --username test -P --direct --hive-import
Aktarım tamamlandıktan sonra hive komutu ile tabloların düzgün bir şekilde aktarılıp aktarılmadığını kontrol edebilirsiniz:
hive> show tables; OK departments dept_emp dept_manager employees salaries titles Time taken: 3.899 seconds hive> select * from employees limit 10; OK 10001 1953-09-02 Georgi Facello M 1986-06-26 10002 1964-06-02 Bezalel Simmel F 1985-11-21 10003 1959-12-03 Parto Bamford M 1986-08-28 10004 1954-05-01 Chirstian Koblick M 1986-12-01 10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 10006 1953-04-20 Anneke Preusig F 1989-06-02 10007 1957-05-23 Tzvetan Zielinski F 1989-02-10 10008 1958-02-19 Saniya Kalloufi M 1994-09-15 10009 1952-04-19 Sumant Peac F 1985-02-18 10010 1963-06-01 Duangkaew Piveteau F 1989-08-24 Time taken: 0.631 seconds
Şimdi Hive üzerinde bir tablo oluşturarak, employees tablosu ile titles tablosunu join işlemiyle birleşiminin sonucunu bu tabloya aktaralım:
hive> create table employees_with_titles (emp_no int, first_name string, last_name string, title string) row format delimited fields terminated by '\t'; hive> insert overwrite table employees_with_titles select e.emp_no, e.first_name, e.last_name, t.title from employees e join titles t on t.emp_no = e.emp_no where t.to_date = '9999-01-01'; hive> select * from employees_with_titles limit 10; OK 10001 Georgi Facello Senior Engineer 10002 Bezalel Simmel Staff 10003 Parto Bamford Senior Engineer 10004 Chirstian Koblick Senior Engineer 10005 Kyoichi Maliniak Senior Staff 10006 Anneke Preusig Senior Engineer 10007 Tzvetan Zielinski Senior Staff 10009 Sumant Peac Senior Engineer 10010 Duangkaew Piveteau Engineer 10012 Patricio Bridgland Senior Engineer Time taken: 0.151 seconds
Son olarak da Hive üzerindeki bu verileri MySQL’e aktaralım. Burda dikkat edilmesi gereken konu, export işlemi sırasında import işleminde olduğu gibi tablolar otomatik olarak oluşturulmuyor. Bunun sebebi ise her veritabanında veri tiplerinin farklılık gösteriyor olması. Örneğin Hive’da string formatında bulunan veri veritabanına char, varchar, nvarchar gibi farklı formatlarda aktarılabilir. Bu yüzden export işlemi öncesinde tablo yaratılma işlemini elle biz yapıyoruz.
delimiter $$ CREATE TABLE `employees_with_titles` ( `emp_no` int(11) NOT NULL, `first_name` varchar(50) NOT NULL, `last_name` varchar(50) NOT NULL, `title` varchar(50) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8$$
Sqoop ile Hive’dan MySQL’e verileri aktararak örneğimizi tamamlıyoruz:
$ sqoop export --connect jdbc:mysql://localhost:3306/employees --username root -P --table employees_with_titles --export-dir /user/hive/warehouse/employees_with_titles --input-fields-terminated-by '\t' Warning: /usr/lib/hbase does not exist! HBase imports will fail. Please set $HBASE_HOME to the root of your HBase installation. Enter password: 13/07/07 18:58:21 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset. 13/07/07 18:58:21 INFO tool.CodeGenTool: Beginning code generation 13/07/07 18:58:21 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees_with_titles` AS t LIMIT 1 13/07/07 18:58:21 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees_with_titles` AS t LIMIT 1 13/07/07 18:58:21 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/java/hadoop Note: /tmp/sqoop-haqen/compile/d0f66a6a6613a7a744d66e9779d8a0ec/employees_with_titles.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 13/07/07 18:58:22 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-haqen/compile/d0f66a6a6613a7a744d66e9779d8a0ec/employees_with_titles.jar 13/07/07 18:58:22 INFO mapreduce.ExportJobBase: Beginning export of employees_with_titles 13/07/07 18:58:24 INFO input.FileInputFormat: Total input paths to process : 1 13/07/07 18:58:24 INFO input.FileInputFormat: Total input paths to process : 1 13/07/07 18:58:24 INFO util.NativeCodeLoader: Loaded the native-hadoop library 13/07/07 18:58:24 WARN snappy.LoadSnappy: Snappy native library not loaded 13/07/07 18:58:24 INFO mapred.JobClient: Running job: job_201307071405_0026 13/07/07 18:58:25 INFO mapred.JobClient: map 0% reduce 0% 13/07/07 18:58:43 INFO mapred.JobClient: map 50% reduce 0% 13/07/07 18:58:52 INFO mapred.JobClient: map 100% reduce 0% 13/07/07 18:58:57 INFO mapred.JobClient: Job complete: job_201307071405_0026 13/07/07 18:58:57 INFO mapred.JobClient: Counters: 18 13/07/07 18:58:57 INFO mapred.JobClient: Job Counters 13/07/07 18:58:57 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=38842 13/07/07 18:58:57 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 13/07/07 18:58:57 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 13/07/07 18:58:57 INFO mapred.JobClient: Launched map tasks=4 13/07/07 18:58:57 INFO mapred.JobClient: Data-local map tasks=4 13/07/07 18:58:57 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0 13/07/07 18:58:57 INFO mapred.JobClient: File Output Format Counters 13/07/07 18:58:57 INFO mapred.JobClient: Bytes Written=0 13/07/07 18:58:57 INFO mapred.JobClient: FileSystemCounters 13/07/07 18:58:57 INFO mapred.JobClient: HDFS_BYTES_READ=8453800 13/07/07 18:58:57 INFO mapred.JobClient: FILE_BYTES_WRITTEN=121908 13/07/07 18:58:57 INFO mapred.JobClient: File Input Format Counters 13/07/07 18:58:57 INFO mapred.JobClient: Bytes Read=0 13/07/07 18:58:57 INFO mapred.JobClient: Map-Reduce Framework 13/07/07 18:58:57 INFO mapred.JobClient: Map input records=240124 13/07/07 18:58:57 INFO mapred.JobClient: Physical memory (bytes) snapshot=417497088 13/07/07 18:58:57 INFO mapred.JobClient: Spilled Records=0 13/07/07 18:58:57 INFO mapred.JobClient: CPU time spent (ms)=25760 13/07/07 18:58:57 INFO mapred.JobClient: Total committed heap usage (bytes)=388956160 13/07/07 18:58:57 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1607860224 13/07/07 18:58:57 INFO mapred.JobClient: Map output records=240124 13/07/07 18:58:57 INFO mapred.JobClient: SPLIT_RAW_BYTES=596 13/07/07 18:58:57 INFO mapreduce.ExportJobBase: Transferred 8.0622 MB in 34.3876 seconds (240.077 KB/sec) 13/07/07 18:58:57 INFO mapreduce.ExportJobBase: Exported 240124 records.
Introduction to MongoDB ElasticSearch ve Native Script Kullanımı