Thursday, 19 March 2015
Wednesday, 18 March 2015
Extract Transform and Loading Data into a Relational Database using Hadoop ECO System Tools:
Hadoop frame work has the lot of flexibility in terms of changing any thing and make the application frame work for your requirement. Hadoop is all about how you can use it.
Today I am trying to Extract data from Oracle table using Sqoop store it in HDFS, then process the data using HIVE and then Load it in to a another RDBMS Database.
The above screen shot is a blue print to achieving ETL process. Here I would like to use Sqoop to import and export data from HDFS we can use PIG/HIVE to process the data and store the data in to HDFS.
In terms of source and destination terms it looks like this:
Source - Staging area - Target
Oracle -> HDFS -> MYSQL
Lets start with importing data from Oracle data base using Sqoop Import command. Here I am importing two tables
sqoop import --connect jdbc:oracle:thin:@hostname:1521/orcl --username SH --password Admin123 --table SH.SALES --split-by PROD_ID --warehouse-dir /sqoop/sales
sqoop import --connect jdbc:oracle:thin:@hostname:1521/orcl --username SH --password Admin123 --table SH.PRODUCTS --split-by PROD_ID -m 1 --target-dir /sqoop/product
Note1: Sqoop has one limitation when we are trying to connect Oracle database we need to Provide the User name and table name in Caps.
Note2: To connect Oracle from Sqoop we need to have a ojdbc6.jar which will be available in
http://www.oracle.com/technetwork/database/enterprise-edition/jdbc-112010-090769.html download it and place it under /usr/lib/sqoop/lib .
Now we have data available in HDFS, lets create HIVE External tables using the sqoop imported path.
To do this we have to connect HIVE and create external hive tables same list of columns and similar data type, so that we should not miss any imported data.
CREATE EXTERNAL TABLE IF NOT EXISTS HIVE_SALES ( PROD_ID FLOAT , CUST_ID FLOAT ,TIME_ID STRING ,CHANNEL_ID FLOAT ,PROMO_ID FLOAT ,QUANTITY_SOLD FLOAT , AMOUNT_SOLD FLOAT ) COMMENT 'This is SALES STREAMING data' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE location '/sqoop/sales';
Check if we are able to fetch data from newly created external hive table HIVE_SALES.
select * from hive_sales limit 10;
CREATE EXTERNAL TABLE IF NOT EXISTS HIVE_PRODUCTS ( PROD_ID FLOAT ,PROD_NAME STRING ,PROD_DESC STRING ,PROD_SUBCATEGORY STRING ,PROD_SUBCATEGORY_ID FLOAT ,PROD_SUBCATEGORY_DESC STRING ,PROD_CATEGORY STRING ,PROD_CATEGORY_ID FLOAT ,PROD_CATEGORY_DESC STRING ,PROD_WEIGHT_CLASS FLOAT ,PROD_UNIT_OF_MEASURE STRING,PROD_PACK_SIZE STRING ,SUPPLIER_ID FLOAT ,PROD_STATUS STRING ,PROD_LIST_PRICE FLOAT ,PROD_MIN_PRICE FLOAT ,PROD_TOTAL STRING ,PROD_TOTAL_ID FLOAT ,PROD_SRC_ID FLOAT,PROD_EFF_FROM STRING,PROD_EFF_TO STRING,PROD_VALID STRING) COMMENT 'This is PRODUCTS streaming data' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE location '/sqoop/product';
Check if we are able to fetch data from newly created external hive table HIVE_PRODUCTS.
select * from hive_products limit 10;
With this we have loaded data from Oracle to HDFS and able to access it via HIVE tables.
By using HIVE tables we can do analyze the data/process it and load it to another HIVE table as well. Now I am creating one more HIVE table to store the Analytical data coming out of HQL.
Let me create HIVE_SALES_ANL table first to store the below small analytical query output.
SELECT PROD_ID, SUM(QUANTITY_SOLD), SUM(AMOUNT_SOLD) FROM HIVE_SALES F join HIVE_PRODUCTS PR on F.PROD_ID = PR.PROD_ID GROUP BY PROD_ID;
CREATE TABLE HIVE_SALES_ANL ( PROD_ID int, QUANTITY_SOLD INT , AMOUNT_SOLD INT) location '/sqoop/Analytics';
INSERT OVERWRITE TABLE HIVE_SALES_ANL SELECT PR.PROD_ID, SUM(QUANTITY_SOLD), SUM(AMOUNT_SOLD) FROM HIVE_SALES F join HIVE_PRODUCTS PR on F.PROD_ID = PR.PROD_ID GROUP BY PR.PROD_ID;
Now we have data in HIVE_SALES_ANL table exporting it to MYSQL DB which has the same structured table.
Also we can create table in MYSQL DB using sqoop query command.
Final step is to create a similar HIVE_SALES_ANL table in MYSQL (SALES_ANL) and export the HIVE_SALES_ANL data to MYSQL table (SALES_ANL).
MYSQL Table creation script:
sqoop eval --connect jdbc:mysql://localhost/mysql --username root --password root --query 'CREATE TABLE SALES_ANL (PROD_NAME VARCHAR(100), QUANTITY_SOLD INT, AMOUNT_SOLD INT)'
Sqoop Export command:
sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table SALES_ANL --export-dir /sqoop/Analytics --input-fields-terminated-by '\001' --lines-terminated-by '\n'
Check in MYSQL weather we have the new SALES_ANL created with data?
:-) We can see the data exported to Hive SALES_ANL table.
Monday, 16 March 2015
Filtering Data using MapReduce, PIG & HIVE:
I am just trying exporting how to filter data available in a file Using MapReduce, PIG & HIVE. Sample data looks like below. Now lets filter all the Bees in the file and get the id as output.
1,Bat
2,Bed
3,Bees
4,Beetles
5,Birch
6,Black
7,Bluegrass
8,Booklouse
9,Borers
10,Borer
11,Boxelder
12,Bristly
13,Brown
14,Budworms
15,Bumblebees
16,Butterflies
17,bugs
18,ticks
19,asparagus
20,bark
21,bean
Filtering data using Map-Reduce code:
I have updated the Map & reducer code to filter the data by using in Wordcount programme. It is always easy to modify/update the existing MapReduce code and the final code looks like below:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FilterMREx
{
static class FilterMapTask extends Mapper<Object, Text, DoubleWritable, Text>
{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String[] cols = value.toString().split(",");
if (cols[1].equals("Bees"))
{
DoubleWritable i= new DoubleWritable(Integer.parseInt(cols[0]));
context.write( i, value );
}
}
}
static class FilterReduceTask extends Reducer<DoubleWritable, Text, DoubleWritable, NullWritable>
{
public void reduce(DoubleWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
for (Text value: values)
{
context.write(key, NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
if (args.length != 2) {
System.err.println("Usage: FilterMREx <input path> <output path>");
System.exit(-1);
}
Job job = new Job(conf,"KEYVALUE-JOB");
job.setJarByClass(FilterMREx.class);
job.setMapperClass(FilterMapTask.class);
job.setReducerClass(FilterReduceTask.class);
job.setOutputKeyClass(DoubleWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)? 0:1);
}
}
Exported the FilerMREx.jar and placed it in Hadoop_jar Folder.
Please the source file in hadoop by using put command.
Running the FilterMREx.jar using hadoop:
Lets see the output:
Got the required output we have only one Bees got the id as output from MapReduce.
Filter data using PIG:
Note: Pig will eat every thing which we are passing. It is our responsibility to take care of
1. Input path
2. What is the input file format
3. In which format the data should be stored
Connecting Pig in local mode and loading FilterMRExSampleData.csv as below. When we are connecting as a local mode the input path should be a local folder, I copied data to /home/training/input
Loading data using pig commands as below
got the required output with 4 line of pig code.
storing data filtered data in to local path
Filtering Data using HIVE:
As we have the data available in HDFS. I am creating an external hive table using
Subscribe to:
Comments (Atom)








