Tuesday, April 3, 2018

SImple Sqoop


Sqoop

In simple word from Tom white in Hadoop Definitive guide (Bible of Hadoop system) - Apache

Sqoop is an open source tool that allows users to extract data from a structured data

store into Hadoop for further processing.

In simple language, Sqoop is a tool, which scoop tabular data from regular RDBMS data store.

Sqoop connect to databases through a JDBC connector, which comes with sqoop installation, and it can connect to any databases, which allows JDBC connection.

Some example to popular database JDBC connector

DB2
jdbc:db2:// <Server DSN>:<PORT>/<DBNAME>:user=<USERNAME>;password=<PASSWORD>
Oracle
jdbc:oracle:thin: <USERNAME>/<PASSWORD>@<Server DSN>:<PORT>:<DBNAME>
Redshift
jdbc:postgresql:// <Server DSN>:<PORT>/<DBNAME>?user=<USERNAME>&password=<PASSWORD>
PostgreSQL
jdbc:postgresql:// <Server DSN>:<PORT>/<DBNAME>?user=<USERNAME>&password=<PASSWORD>
Sap
jdbc:sap: <Server DSN>:<PORT>/?databaseName=<DBNAME>&user=<USERNAME>&password=<PASSWORD>
SQLServer
jdbc:sqlserver:// <Server DSN>:<PORT>;databaseName=<dbname>;userName= <USERNAME>;password=<PASSWORD>

 

Sqoop can import data incrementally and do full load. Let us postmortem an incremental load sqoop job, which is more complicated than full load.

In the below example we are going to create a sqoop job which extract employee records from a DB2 source incrementally. Record size is 3Million.

sqoop job --create LAB_EMP_DATA -- import --connect 'jdbc:db2://abc.dbms.xyz.com:446/EMPDAT:user=arka;password=India' --target-dir /LAB/PERSONAL/EMP_DATA --fields-terminated-by '~' --null-non-string '' --null-string '' -m 32 --append
 
--options-file /data1/home/hdfsadmin/LAB/personal/option_files/EMP_DATA_QUERY.txt
 
--query
"SELECT RID(T1) AS EMP_KEY, EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO, T_STMP_UPD
,CURRENT DATE as D_INSRT_HDFS, CURRENT TIMESTAMP as T_STMP_INSRT_HDFS FROM LAB.EMP AS T1 WHERE $CONDITIONS with ur"
 
--split-by
 RID(T1)
 
--boundary-query
"SELECT MIN(RID(T1)), MAX(RID(T1)) FROM LAB.EMP AS T1 with ur"
 
--incremental
'lastmodified'
 
--check-column
T_STMP_UPD
 
--last-value
'2016-07-04 19:16:13.444677'

 

This whole sqoop job can be pasted directly in the CLI except the option file option. Option file is a file where you can keep your parameters and the query for reuse purpose. One option file can be referred by multiple sqoop jobs. Here you can create EMP_DATA_QUERY.txt and put the rest of the parameter from –query to –last-value in that file.  You can directly put the same code without using the option file. Choice is yours but I personally recommend to create option files.

Now decoding the code one by one

sqoop job --create LAB_EMP_DATA -- import --connect 'jdbc:db2://abc.dbms.xyz.com:446/EMPDAT:user=arka;password=India' --target-dir /LAB/PERSONAL/EMP_DATA --fields-terminated-by '~' --null-non-string '' --null-string '' -m 8 --append
 

 

In above code we are creating [--create] a sqoop job named “LAB_EMP_DATA” which imports [--import] data from a DB2 database connected [--connect] by the mentioned JDBC connector, which contains username and password. Source database is EMPDAT. Target directory [--target-dir] is /LAB/PERSONAL/EMP_DATA where data will be stored as file. We are setting Delimiter [--fields-terminated-by] as tilde (~) in destination file. For null field (string or non-string) string double single quote (‘’) will be written in the destination file. We are using 8 mapper [-m] to fetch data, which means the number of parallel connection to be created to fetch the dataset. At last, when the new data will arrive we are going to append [--append] the data in the existing folder.  

 

 

--query
"SELECT RID(T1) AS EMP_KEY, EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO, T_STMP_UPD
,CURRENT DATE as D_INSRT_HDFS, CURRENT TIMESTAMP as T_STMP_INSRT_HDFS FROM LAB.EMP AS T1 WHERE $CONDITIONS with ur"
 
--split-by
 RID(T1)
 
--boundary-query
"SELECT MIN(RID(T1)), MAX(RID(T1)) FROM LAB.EMP AS T1 with ur"
 

 

When performing parallel imports, Sqoop needs a criterion by which it can split the workload. Sqoop uses a splitting column to split the workload.By default sqoop will use query select min(<split-by>), max(<split-by>) from <table name> to find out boundaries for creating splits. In some cases this query is not the most optimal so you can specify any arbitrary query returning two numeric columns using --boundary-query argument. Here we used RID or Rowid for the table EMP. We added two-date and timestamp column, which are not present in source to track when the data got loaded in Hadoop.

 

$CONDITIONS is used by Sqoop process, it will replace with a unique condition expression internally to get the data-set. Here we are running a parallel import, the map tasks will execute our query with different values substituted in for $CONDITIONS. e.g., one mapper may execute "select empno from lab.emp where (RID >=0 AND RID < 10000)", and the next mapper may execute " select empno from lab.emp where (RID >= 10000 AND RID < 20000)" and so on.

 

--incremental
'lastmodified'
 
--check-column
T_STMP_UPD
 
--last-value
'2016-07-04 19:16:13.444677'

 

As it is an incremental load, we need to mention on which column we need to check last data was updated in source. In above code we are loading data where T_STMP_UPD is more than 2016-07-04 and when it is last modified.

 

 

When you submit the code, it will create LAB_EMP_DATA sqoop job and will remain in the Hadoop system.

 

To check the sqoop job you need to use

sqoop job --show  LAB_EMP_DATA

 

Now you can run the sqoop job by

sqoop job  --exec   LAB_EMP_DATA

 

If you forget the name of the sqoop job

Sqoop job - - list| grep -i EMP

 

Once the data get loaded in the target directory we will see 8 files is created in that folder. Remember sqoop only brings data, not headers. You need to process it in hive or any other database to assign the headers for them.