DevVeri.com

Boğulacaksan büyük veride boğul!

Sqoop ile Veritabanı Hadoop Arasında Veri Aktarımı

apache-hadoop-sqoop1

Hadoop platformunun en büyük özelliklerinden birisi de farklı kaynaklardan farklı formatlarda gelen verilerin saklanması ve analiz edilebilmesini sağlaması.

İlişkisel veritabanında saklanan verilerin Hadoop üzerinde efektif biçimde işlenebilmesi için bu verilerin HDFS üzerine aktarılması gerekiyor. Sqoop, ilişkisel veritabanları ile Hadoop arasında veri aktarımı için tasarlanmış bir açık kaynaklı bir araç olarak karşımıza çıkıyor. Zaten ismi de Sql-to-Hadoop kelimelerinden türetilmiş.

Sqoop genel olarak MySQL, PostgreSQL, Oracle, SQL Server ve DB2 gibi popüler veritabanlarının hepsini desteklemekle beraber Hadoop gibi Java ile geliştirilmiş bir proje olduğu için JDBC ile erişilebilen tüm veritabanlarıyla da çalışabiliyor.

Sqoop ile hem import hem export yapılabiliyor. Veritabanından HDFS üzerine, direkt Hive tablosu olarak ya da HBase’e veri aktarmak mümkün. Sqoop’un avantajı ise veri aktarım işlemlerini MapReduce görevleri ile paralel olarak yaparak aktarımı çok daha hızlı tamamlamak. Ayrıca MySQL ve PostgreSQL için JDBC kullanmadan (mysqldump gibi) daha düşük seviyeli ve performanslı veri aktarımı da yapılabiliyor.

Import işlemi sırasında Sqoop basitçe meta verilerden faydalanarak tablonun birincil anahtarını bulup minimum ve maksimum değerlerini alarak eşit olarak Map sayısına uygun olarak bölerek farklı düğümler üzerinde bu verileri paralel olarak aktarıyor. Bu yüzden sonuç klasör içinde birden fazla dosyaya yazılıyor. Aktarım sırasında veritabanına yeni kayıt geliyorsa bunlar aktarılmayabilir, veri tutarlılığına dikkat etmek gerekir.

Export işlemi sırasında ise aksi belirtilmediği sürece verileri binerli gruplar halinde veritabanına INSERT ediyor. Bu işlem de paralel olarak yapıldığı için aktarım sırasında veritabanında yük oluşturabilir. Her bir grubun yazılması kendi başına bir transaction olduğu için burada da veri tutarlılığına dikkat etmek gerekir. Eğer kayıtların tamamı aktarıldıktan sonra aktif olması isteniyorsa ara tablo kullanımını sağlayan –staging-table parametresi kullanılabilir. Bu tablonun yaratılması ve temizlenmesi de otomatik yapılmaz, elle yapmak gerekir.

Şimdi Sqoop ile veritabanından Hive’a verilerin aktarılması, Hive’da yapılan bir sorgu sonucu oluşan verilerin de tekrar veritabanına aktarılmasını örnek olarak inceleyelim. (Hive ile ilgili bilgi için ilgili yazımızı inceleyebilirsiniz)

Örnekleri MySQL üzerinde hazırladım ve internetten bulabileceğimiz örnek verileri kullandım. Verilerimizi internetten indirip veritabanına şu şekilde aktarabiliriz:

$ wget https://launchpad.net/test-db/employees-db-1/1.0.6/+download/employees_db-full-1.0.6.tar.bz2
$ tar -xjf employees_db-full-1.0.6.tar.bz2
$ cd employees_db/
$ mysql -h localhost -u test -P -t < employees.sql

Veriler hazır olduğuna göre şimdi Sqoop’u kuralım. Örnekleri Hadoop’un 1.0.3 sürümünde yaptığım için bu sürüme uygun olan son sürümü indirip kurdum:

$ cd /usr/java/
$ wget http://www.eu.apache.org/dist/sqoop/1.4.3/sqoop-1.4.3.bin__hadoop-1.0.0.tar.gz
$ gzip -dc sqoop-1.4.3.bin__hadoop-1.0.0.tar.gz | tar xf -
$ ln -s /usr/java/sqoop-1.4.3.bin__hadoop-1.0.0 sqoop

Sqoop’un çalışması için Hadoop’un kurulu olduğu dizin ile ilgili tanımlamaları yapmamız gerekiyor:

export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME

MySQL JDBC sürücüsünü de indirip Sqoop kurulumunun buluğunduğu yerin altındaki lib klasörüne aktarmamız gerekiyor. Güncel sürücüyü burdan indirebilirsiniz.

Kurulumlar ve verilerimiz hazır olduğunu anlayabilmek için Sqoop ile tablolarımızı listeleyebiliriz:

$ sqoop list-tables --connect jdbc:mysql://localhost:3306/employees --username test -P
Warning: /usr/lib/hbase does not exist! HBase imports will fail.
Please set $HBASE_HOME to the root of your HBase installation.
Enter password:
13/07/07 19:59:31 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
departments
dept_emp
dept_manager
employees
employees_with_titles
employees_with_titles_ex
salaries
titles

Bu örnekte HBase ile ilgili herhangi bir ayar yapmadığımız için bir uyarı alıyoruz. Eğer tablo isimleri listelendiyse ayarlarımız doğru demektir.

Sqoop komut satırından parametrik olarak çalıştırılan bir araç, genel parametreleri şu şekilde:

$ sqoop help
Warning: /usr/lib/hbase does not exist! HBase imports will fail.
Please set $HBASE_HOME to the root of your HBase installation.
usage: sqoop COMMAND [ARGS]

Available commands:
  codegen            Generate code to interact with database records
  create-hive-table  Import a table definition into Hive
  eval               Evaluate a SQL statement and display the results
  export             Export an HDFS directory to a database table
  help               List available commands
  import             Import a table from a database to HDFS
  import-all-tables  Import tables from a database to HDFS
  job                Work with saved jobs
  list-databases     List available databases on a server
  list-tables        List available tables in a database
  merge              Merge results of incremental imports
  metastore          Run a standalone Sqoop metastore
  version            Display version information

See 'sqoop help COMMAND' for information on a specific command.

Bu komutlardan belli başlıları şöyle; codegen parametresi ile tablolara karşılık gelen Java sınıflarının oluşturulması sağlanıyor. create-hive-table ile Hive üzerinde tabloların yaratılması sağlanıyor. import komutu ile veritabanından Hadoop’a, export komutu ile Hadoop’dan veritabanına veri aktarılıyor. Komutlar ile ilgili daha fazla detay için, örneğin import için, sqoop import help yazabilirsiniz.

Şimdi employees tablosunu HDFS üzerine şu şekilde aktarabiliriz:

$ sqoop import --connect jdbc:mysql://localhost:3306/employees --username test -P --table employees
Warning: /usr/lib/hbase does not exist! HBase imports will fail.
Please set $HBASE_HOME to the root of your HBase installation.
Enter password:
13/07/07 17:49:30 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
13/07/07 17:49:30 INFO tool.CodeGenTool: Beginning code generation
13/07/07 17:49:30 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
13/07/07 17:49:30 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
13/07/07 17:49:30 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/java/hadoop
Note: /tmp/sqoop-haqen/compile/53e09041098415156dfde1b76932ac43/employees.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
13/07/07 17:49:32 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-haqen/compile/53e09041098415156dfde1b76932ac43/employees.jar
13/07/07 17:49:32 WARN manager.MySQLManager: It looks like you are importing from mysql.
13/07/07 17:49:32 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
13/07/07 17:49:32 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
13/07/07 17:49:32 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
13/07/07 17:49:32 INFO mapreduce.ImportJobBase: Beginning import of employees
13/07/07 17:49:33 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`emp_no`), MAX(`emp_no`) FROM `employees`
13/07/07 17:49:34 INFO mapred.JobClient: Running job: job_201307071405_0001
13/07/07 17:49:35 INFO mapred.JobClient: map 0% reduce 0%
13/07/07 17:49:55 INFO mapred.JobClient: map 25% reduce 0%
13/07/07 17:49:58 INFO mapred.JobClient: map 50% reduce 0%
13/07/07 17:50:01 INFO mapred.JobClient: map 75% reduce 0%
13/07/07 17:50:04 INFO mapred.JobClient: map 100% reduce 0%
13/07/07 17:50:09 INFO mapred.JobClient: Job complete: job_201307071405_0001
13/07/07 17:50:09 INFO mapred.JobClient: Counters: 18
13/07/07 17:50:09 INFO mapred.JobClient: Job Counters
13/07/07 17:50:09 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=37151
13/07/07 17:50:09 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/07/07 17:50:09 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/07/07 17:50:09 INFO mapred.JobClient: Launched map tasks=4
13/07/07 17:50:09 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0
13/07/07 17:50:09 INFO mapred.JobClient: File Output Format Counters
13/07/07 17:50:09 INFO mapred.JobClient: Bytes Written=13821993
13/07/07 17:50:09 INFO mapred.JobClient: FileSystemCounters
13/07/07 17:50:09 INFO mapred.JobClient: HDFS_BYTES_READ=464
13/07/07 17:50:09 INFO mapred.JobClient: FILE_BYTES_WRITTEN=122484
13/07/07 17:50:09 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=13821993
13/07/07 17:50:09 INFO mapred.JobClient: File Input Format Counters
13/07/07 17:50:09 INFO mapred.JobClient: Bytes Read=0
13/07/07 17:50:09 INFO mapred.JobClient: Map-Reduce Framework
13/07/07 17:50:09 INFO mapred.JobClient: Map input records=300024
13/07/07 17:50:09 INFO mapred.JobClient: Physical memory (bytes) snapshot=387411968
13/07/07 17:50:09 INFO mapred.JobClient: Spilled Records=0
13/07/07 17:50:09 INFO mapred.JobClient: CPU time spent (ms)=29100
13/07/07 17:50:09 INFO mapred.JobClient: Total committed heap usage (bytes)=343277568
13/07/07 17:50:09 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1597562880
13/07/07 17:50:09 INFO mapred.JobClient: Map output records=300024
13/07/07 17:50:09 INFO mapred.JobClient: SPLIT_RAW_BYTES=464
13/07/07 17:50:09 INFO mapreduce.ImportJobBase: Transferred 13.1817 MB in 37.2724 seconds (362.1456 KB/sec)
13/07/07 17:50:09 INFO mapreduce.ImportJobBase: Retrieved 300024 records.

Bu aktarım işlemini –direct parametresi ile yapsaydık JDBC yerine paralel olarak mysqldump kullanarak tabloyu aktarması 33.5059 sn (402.8555 KB/sec) sürecekti. Şimdi HDFS üzerinde dosyaların oluşup oluşmadığını kontrol edebilirsiniz.

Sqoop ile tablonun tamamını aktarmak zorunda değiliz, –query parametresi ile verdiğimiz sorgu sonucunun aktarılmasını sağlayabiliriz. Bunun dışında varsayılan olarak metin dosyası haline aktarılan verilerin alan ve satır ayraçlarını belirleyebiliriz, metin dosyaları dışında Hadoop’un binary formatı olan SequenceFile veya Avro formatında da veriler aktarılabiliriz.

Örneğimize devam etmek için veritabanındaki tüm tabloları –hive-import komutunu da kullanarak direkt olarak Hive’a aktarmak için şu komutu çalıştırıyoruz:

$ sqoop import-all-tables --connect jdbc:mysql://localhost:3306/employees --username test -P --direct --hive-import

Aktarım tamamlandıktan sonra hive komutu ile tabloların düzgün bir şekilde aktarılıp aktarılmadığını kontrol edebilirsiniz:

hive> show tables;
OK
departments
dept_emp
dept_manager
employees
salaries
titles
Time taken: 3.899 seconds

hive> select * from employees limit 10;
OK
10001 1953-09-02 Georgi Facello M 1986-06-26
10002 1964-06-02 Bezalel Simmel F 1985-11-21
10003 1959-12-03 Parto Bamford M 1986-08-28
10004 1954-05-01 Chirstian Koblick M 1986-12-01
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12
10006 1953-04-20 Anneke Preusig F 1989-06-02
10007 1957-05-23 Tzvetan Zielinski F 1989-02-10
10008 1958-02-19 Saniya Kalloufi M 1994-09-15
10009 1952-04-19 Sumant Peac F 1985-02-18
10010 1963-06-01 Duangkaew Piveteau F 1989-08-24
Time taken: 0.631 seconds

Şimdi Hive üzerinde bir tablo oluşturarak, employees tablosu ile titles tablosunu join işlemiyle birleşiminin sonucunu bu tabloya aktaralım:

hive> create table employees_with_titles (emp_no int, first_name string, last_name string, title string) row format delimited fields terminated by '\t';

hive> insert overwrite table employees_with_titles select e.emp_no, e.first_name, e.last_name, t.title from employees e join titles t on t.emp_no = e.emp_no where t.to_date = '9999-01-01';

hive> select * from employees_with_titles limit 10;
OK
10001 Georgi Facello Senior Engineer
10002 Bezalel Simmel Staff
10003 Parto Bamford Senior Engineer
10004 Chirstian Koblick Senior Engineer
10005 Kyoichi Maliniak Senior Staff
10006 Anneke Preusig Senior Engineer
10007 Tzvetan Zielinski Senior Staff
10009 Sumant Peac Senior Engineer
10010 Duangkaew Piveteau Engineer
10012 Patricio Bridgland Senior Engineer
Time taken: 0.151 seconds

Son olarak da Hive üzerindeki bu verileri MySQL’e aktaralım. Burda dikkat edilmesi gereken konu, export işlemi sırasında import işleminde olduğu gibi tablolar otomatik olarak oluşturulmuyor. Bunun sebebi ise her veritabanında veri tiplerinin farklılık gösteriyor olması. Örneğin Hive’da string formatında bulunan veri veritabanına char, varchar, nvarchar gibi farklı formatlarda aktarılabilir. Bu yüzden export işlemi öncesinde tablo yaratılma işlemini elle biz yapıyoruz.

delimiter $$
CREATE TABLE `employees_with_titles` (
`emp_no` int(11) NOT NULL,
`first_name` varchar(50) NOT NULL,
`last_name` varchar(50) NOT NULL,
`title` varchar(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8$$

Sqoop ile Hive’dan MySQL’e verileri aktararak örneğimizi tamamlıyoruz:

$ sqoop export --connect jdbc:mysql://localhost:3306/employees --username root -P --table employees_with_titles --export-dir /user/hive/warehouse/employees_with_titles --input-fields-terminated-by '\t'
Warning: /usr/lib/hbase does not exist! HBase imports will fail.
Please set $HBASE_HOME to the root of your HBase installation.
Enter password:
13/07/07 18:58:21 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
13/07/07 18:58:21 INFO tool.CodeGenTool: Beginning code generation
13/07/07 18:58:21 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees_with_titles` AS t LIMIT 1
13/07/07 18:58:21 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees_with_titles` AS t LIMIT 1
13/07/07 18:58:21 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/java/hadoop
Note: /tmp/sqoop-haqen/compile/d0f66a6a6613a7a744d66e9779d8a0ec/employees_with_titles.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
13/07/07 18:58:22 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-haqen/compile/d0f66a6a6613a7a744d66e9779d8a0ec/employees_with_titles.jar
13/07/07 18:58:22 INFO mapreduce.ExportJobBase: Beginning export of employees_with_titles
13/07/07 18:58:24 INFO input.FileInputFormat: Total input paths to process : 1
13/07/07 18:58:24 INFO input.FileInputFormat: Total input paths to process : 1
13/07/07 18:58:24 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/07/07 18:58:24 WARN snappy.LoadSnappy: Snappy native library not loaded
13/07/07 18:58:24 INFO mapred.JobClient: Running job: job_201307071405_0026
13/07/07 18:58:25 INFO mapred.JobClient: map 0% reduce 0%
13/07/07 18:58:43 INFO mapred.JobClient: map 50% reduce 0%
13/07/07 18:58:52 INFO mapred.JobClient: map 100% reduce 0%
13/07/07 18:58:57 INFO mapred.JobClient: Job complete: job_201307071405_0026
13/07/07 18:58:57 INFO mapred.JobClient: Counters: 18
13/07/07 18:58:57 INFO mapred.JobClient: Job Counters
13/07/07 18:58:57 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=38842
13/07/07 18:58:57 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/07/07 18:58:57 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/07/07 18:58:57 INFO mapred.JobClient: Launched map tasks=4
13/07/07 18:58:57 INFO mapred.JobClient: Data-local map tasks=4
13/07/07 18:58:57 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0
13/07/07 18:58:57 INFO mapred.JobClient: File Output Format Counters
13/07/07 18:58:57 INFO mapred.JobClient: Bytes Written=0
13/07/07 18:58:57 INFO mapred.JobClient: FileSystemCounters
13/07/07 18:58:57 INFO mapred.JobClient: HDFS_BYTES_READ=8453800
13/07/07 18:58:57 INFO mapred.JobClient: FILE_BYTES_WRITTEN=121908
13/07/07 18:58:57 INFO mapred.JobClient: File Input Format Counters
13/07/07 18:58:57 INFO mapred.JobClient: Bytes Read=0
13/07/07 18:58:57 INFO mapred.JobClient: Map-Reduce Framework
13/07/07 18:58:57 INFO mapred.JobClient: Map input records=240124
13/07/07 18:58:57 INFO mapred.JobClient: Physical memory (bytes) snapshot=417497088
13/07/07 18:58:57 INFO mapred.JobClient: Spilled Records=0
13/07/07 18:58:57 INFO mapred.JobClient: CPU time spent (ms)=25760
13/07/07 18:58:57 INFO mapred.JobClient: Total committed heap usage (bytes)=388956160
13/07/07 18:58:57 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1607860224
13/07/07 18:58:57 INFO mapred.JobClient: Map output records=240124
13/07/07 18:58:57 INFO mapred.JobClient: SPLIT_RAW_BYTES=596
13/07/07 18:58:57 INFO mapreduce.ExportJobBase: Transferred 8.0622 MB in 34.3876 seconds (240.077 KB/sec)
13/07/07 18:58:57 INFO mapreduce.ExportJobBase: Exported 240124 records.

, , ,

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

This site uses Akismet to reduce spam. Learn how your comment data is processed.