Wednesday, August 28, 2013

An Example Java Map Reduce Program - NYSE Yearly Analysis

High Level Map Reduce Execution in Brief:



Firstly, a MapReduce job is split into the Mapping phase and the Reduce phase on a high level.
The mapping part of a job is distributed to all nodes on a cluster which hold the blocks of the input files used in the job. Each node can then spawn multiple mappers, ie using all the cores of a Quad core CPU, to process the data residing on that computer. All of this occurs in parallel. The mapper class is mainly for data filtering and transformations.
The Reducers run on different nodes and the number of reducers to be run are defined by the programmer in the job configuration. The selection of nodes to run reducer is done by the job tracker based on availability of resources.

The process that a MapReduce job goes through to complete a job is as follows:
  • The number of map and reduce functions are defined by the job you are running. Each map processes its input data, sorts it to group the keys and writes it to disk. The job defines how many reduce functions you wish to apply to the output from the maps.
  • Each reduce needs to see all the data for a given key. For a single reduce running for the job all the outputs from each map are sent to the node in the cluster that is running that reduce. Before the reduce runs, the data from each map is merged to group all the keys.
  • For multiple reducers, the maps partition their output, creating one per reduce. The partitions are sent to the correct reduce. This ensures that all the data for a given key is processed by a single reduce.
  • To help reduce the amount of data needed to be sent over the network a combine function can be applied to the output of a map. This has the effect of running a reduce on the output from the map. Thus minimizing the amount of data that needs to be transferred to the reducers, speeding up the execution time of the overall job.

The complete MapReduce model is shown in the 2nd figure. Output of the mappers are processed by the combiners, which perform local aggregation to cut down on the number of intermediate key-value pairs. The partitioner determines which reducer will be responsible for processing a particular key, and the execution framework uses this information to copy the data to the right location during the shuffle and sort phase. Therefore, a complete MapReduce job consists of code for the mapper, reducer, combiner, and partitioner, along with job configuration parameters. The execution frame- work handles everything else.

Our Example Map Reduce Program

Input Datasets:

The NYSE Daily Prices File has the following colums separated by commas(CSV File):
-    exchange, stock_symbol, date, stock_price_open, stock_price_high, stock_price_low, stock_price_close, stock_volume, stock_price_adj_close
Sample Record: NYSE,AEA,2010-02-08,4.42,4.42,4.21,4.24,205500,4.24

The NYSE Daily Dividends File has the following columns separated by commas(CSV File):
-    exchange, stock_symbol, date, dividends
Sample record: NYSE,AIT,2009-11-12,0.15

The NYSE Companies file lists the stock symbol along with its Company name. This is a tab separated file with the following columns:
-    symbol, company_name
Sample Record: ACAS AMERICAN CAPITAL LTD

You can download these input data files from here and put them on Your hdfs cluster using the put or copyFromLocal command.

Requirements:

Now lets consider generating reports split by years which shows the below fields:
stock_symbol, company_name, max_stock_price for the year for the company, min_stock_price for the year for the company, average dividends for the year for the company. Reports for different years have to go to different output files.

Design Considerations:

Point 1: As in most MapReduce Program cases, our mapper(s) will do the filtration and transformation of the input data and our reducer will do the aggregation and generation of desired output.

Point 2: Since I use data from multiple files, I have to join the input files to generate the desired report. Since the dividends and Prices files are huge, I am going to use MultipleInputs Class on them and create two Mappers one for each file. The two mappers are going to generate similar key-value pairs and there will be a identifier in the values which will tell the reducer about which file the key-value pair has come from. Thus this program is going to be a classic example of joining input files based on some keys.

Point 3: Since I am dealing with multiple inputs and outputs, it would be easier implementing the MapReduce job using the deprecated(old) MapRed API.

Point 4: The NYSE Companies File is not a huge one and thus qualifies to be used as a Distributed Cache File to map the company_symbol to the Company_Name.

Point 5: I create two Customized Hadoop Datatypes(Objects) to be passed from the mappers to the reducer as key-value pairs - NYSEWritable to be used as the value and NYSESymbolYearWritable to be used as the Key. Since its used as the key, NYSESymbolYearWritable has to implement WritableComparable. NYSEWritable implements Writable interface. Hence You can learn creating and using Hadoop User Defined Data Types.

Point 6: I use the MultipleTextOutputFormat class to split the reducer output to files in different directories based on the Year. This class is defined in job.setOutputFormat. This is another important feature of MapReduce Frame Work that can be learnt from this example.

Code:

NYSE Writable Class:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/* Customized hadoop datatype created to be used as value from mapper to reducer. 
 * Refer to point 5 in Design Methodology */

public class NYSEWritable implements Writable {

 String file_identifier;
 String stock_exchange;
 String stock_symbol;
 String stock_date;
 double stock_price_open;
 double stock_price_high;
 double stock_price_low;
 double stock_price_close;
 long stock_volume;
 double stock_price_adj_close;
 double stock_dividend;

 @Override
 public void readFields(DataInput in) throws IOException {
  file_identifier = in.readUTF();
  stock_exchange = in.readUTF();
  stock_symbol = in.readUTF();
  stock_date = in.readUTF();
  stock_price_open = in.readDouble();
  stock_price_high = in.readDouble();
  stock_price_low = in.readDouble();
  stock_price_close = in.readDouble();
  stock_volume = in.readLong();
  stock_price_adj_close = in.readDouble();
  stock_dividend = in.readDouble();
 }

 @Override
 public void write(DataOutput out) throws IOException {
  out.writeUTF(file_identifier);
  out.writeUTF(stock_exchange);
  out.writeUTF(stock_symbol);
  out.writeUTF(stock_date);
  out.writeDouble(stock_price_open);
  out.writeDouble(stock_price_high);
  out.writeDouble(stock_price_low);
  out.writeDouble(stock_price_close);
  out.writeLong(stock_volume);
  out.writeDouble(stock_price_adj_close);
  out.writeDouble(stock_dividend);
 }

 public String getFile_identifier() {
  return file_identifier;
 }

 public void setFile_identifier(String file_identifier) {
  this.file_identifier = file_identifier;
 }

 public String getStock_exchange() {
  return stock_exchange;
 }

 public String getStock_symbol() {
  return stock_symbol;
 }

 public String getStock_date() {
  return stock_date;
 }

 public double getStock_price_open() {
  return stock_price_open;
 }

 public double getStock_price_high() {
  return stock_price_high;
 }

 public double getStock_price_low() {
  return stock_price_low;
 }

 public double getStock_price_close() {
  return stock_price_close;
 }

 public long getStock_volume() {
  return stock_volume;
 }

 public double getStock_price_adj_close() {
  return stock_price_adj_close;
 }

 public double getStock_dividend() {
  return stock_dividend;
 }

 public void setStock_exchange(String stock_exchange) {
  this.stock_exchange = stock_exchange;
 }

 public void setStock_symbol(String stock_symbol) {
  this.stock_symbol = stock_symbol;
 }

 public void setStock_date(String stock_date) {
  this.stock_date = stock_date;
 }

 public void setStock_price_open(double stock_price_open) {
  this.stock_price_open = stock_price_open;
 }

 public void setStock_price_high(double stock_price_high) {
  this.stock_price_high = stock_price_high;
 }

 public void setStock_price_low(double stock_price_low) {
  this.stock_price_low = stock_price_low;
 }

 public void setStock_price_close(double stock_price_close) {
  this.stock_price_close = stock_price_close;
 }

 public void setStock_volume(long stock_volume) {
  this.stock_volume = stock_volume;
 }

 public void setStock_price_adj_close(double stock_price_adj_close) {
  this.stock_price_adj_close = stock_price_adj_close;
 }

 public void setStock_dividend(double stock_dividend) {
  this.stock_dividend = stock_dividend;
 }

}
NYSESymbolYearWritable Class:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/* Customized hadoop datatype created to be used as key from mapper to reducer. 
 * Refer to point 5 in Design Methodology */

@SuppressWarnings("rawtypes")
public class NYSESymbolYearWritable implements WritableComparable{
 String stock_symbol;
 int stock_year;
 @Override
 public void readFields(DataInput in) throws IOException {
  stock_symbol = in.readUTF();
  stock_year = in.readInt();
 }

 @Override
 public void write(DataOutput out) throws IOException {
  out.writeUTF(stock_symbol);
  out.writeInt(stock_year);
 }

 @Override
 // The objects of this class are sorted by stock_year and then by stock_symbol
 public int compareTo(Object o) {
  NYSESymbolYearWritable other = (NYSESymbolYearWritable)o;
  if (this.stock_year == other.stock_year){
   return this.stock_symbol.compareTo(other.stock_symbol);
  } else {
   return (this.stock_year > other.stock_year) ? 1 : -1;
  }
 }
 

 @Override
 public int hashCode() {
  final int prime = 31;
  int result = 1;
  result = prime * result
    + ((stock_symbol == null) ? 0 : stock_symbol.hashCode());
  result = prime * result + stock_year;
  return result;
 }

 @Override
 public boolean equals(Object obj) {
  if (this == obj)
   return true;
  if (obj == null)
   return false;
  if (getClass() != obj.getClass())
   return false;
  NYSESymbolYearWritable other = (NYSESymbolYearWritable) obj;
  if (stock_symbol == null) {
   if (other.stock_symbol != null)
    return false;
  } else if (!stock_symbol.equals(other.stock_symbol))
   return false;
  if (stock_year != other.stock_year)
   return false;
  return true;
 }

 @Override
 public String toString() {
  return "stock_symbol=" + stock_symbol
    + ", stock_year=" + stock_year + ",";
 }

 public String getStock_symbol() {
  return stock_symbol;
 }

 public int getStock_year() {
  return stock_year;
 }

 public void setStock_symbol(String stock_symbol) {
  this.stock_symbol = stock_symbol;
 }

 public void setStock_year(int stock_year) {
  this.stock_year = stock_year;
 }

}
NYSEYearlyAnalysis Class:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.MultipleInputs;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;

public class NYSEYearlyAnalysis {

 /*
  * This mapper processes the NYSE Prices file. Refer to Point 2 in Design
  * Considerations
  */

 public static class PricesMapper extends MapReduceBase implements
   Mapper<LongWritable, Text, NYSESymbolYearWritable, NYSEWritable> {
  NYSESymbolYearWritable mapOutKey = new NYSESymbolYearWritable();
  NYSEWritable mapOutValue = new NYSEWritable();

  @SuppressWarnings("deprecation")
  @Override
  public void map(
    LongWritable mapInKey,
    Text mapInValue,
    OutputCollector<NYSESymbolYearWritable, NYSEWritable> collector,
    Reporter reporter) throws IOException {
   String[] mapInFieldsArray = mapInValue.toString().split(",");
   DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
   if (mapInFieldsArray.length == 9
     & mapInFieldsArray[0].trim().equals("exchange") == false) {
    /* Set the Map Output Key Variables */
    mapOutKey.setStock_symbol(mapInFieldsArray[1]);
    try {
     /*
      * Note that the getYear method returns the year subtracting
      * 1900 from the year value. And hence we need to add 1900
      * to output year
      */
     mapOutKey.setStock_year(df.parse(mapInFieldsArray[2])
       .getYear() + 1900);
    } catch (ParseException e) {
     System.out.println("Not a valid date : "
       + mapInFieldsArray[2]);
     e.printStackTrace();
    }
    /* Set the Map Output Values Variables */
    mapOutValue.setFile_identifier("prices");
    mapOutValue.setStock_date(mapInFieldsArray[2]);
    mapOutValue.setStock_dividend(0.0);
    mapOutValue.setStock_exchange(mapInFieldsArray[0]);
    mapOutValue.setStock_price_adj_close(Double
      .valueOf(mapInFieldsArray[8]));
    mapOutValue.setStock_price_close(Double
      .valueOf(mapInFieldsArray[6]));
    mapOutValue.setStock_price_high(Double
      .valueOf(mapInFieldsArray[4]));
    mapOutValue.setStock_price_low(Double
      .valueOf(mapInFieldsArray[5]));
    mapOutValue.setStock_price_open(Double
      .valueOf(mapInFieldsArray[3]));
    mapOutValue.setStock_symbol(mapInFieldsArray[1]);
    mapOutValue.setStock_volume(Integer
      .valueOf(mapInFieldsArray[7]).longValue());
    collector.collect(mapOutKey, mapOutValue);
   }
  }
 }

 /*
  * This mapper processes the NYSE Dividend file. Refer to Point 2 in Design
  * Considerations
  */

 public static class DividendMapper extends MapReduceBase implements
   Mapper<LongWritable, Text, NYSESymbolYearWritable, NYSEWritable> {
  NYSESymbolYearWritable mapOutKey = new NYSESymbolYearWritable();
  NYSEWritable mapOutValue = new NYSEWritable();

  @SuppressWarnings("deprecation")
  @Override
  public void map(
    LongWritable mapInKey,
    Text mapInValue,
    OutputCollector<NYSESymbolYearWritable, NYSEWritable> collector,
    Reporter reporter) throws IOException {
   String[] mapInFieldsArray = mapInValue.toString().split(",");
   DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
   if (mapInFieldsArray.length == 4
     & mapInFieldsArray[0].trim().equals("exchange") == false) {
    /* Set the Map Output Key Variables */
    mapOutKey.setStock_symbol(mapInFieldsArray[1]);
    try {
     mapOutKey.setStock_year(df.parse(mapInFieldsArray[2])
       .getYear() + 1900);
    } catch (ParseException e) {
     System.out.println("Not a valid date : "
       + mapInFieldsArray[2]);
     e.printStackTrace();
    }
    /* Set the Map Output Values Variables */
    mapOutValue.setFile_identifier("dividends");
    mapOutValue.setStock_date(mapInFieldsArray[2]);
    mapOutValue.setStock_dividend(Double
      .valueOf(mapInFieldsArray[3]));
    mapOutValue.setStock_exchange(mapInFieldsArray[0]);
    mapOutValue.setStock_price_adj_close(0.0);
    mapOutValue.setStock_price_close(0.0);
    mapOutValue.setStock_price_high(0.0);
    mapOutValue.setStock_price_low(0.0);
    mapOutValue.setStock_price_open(0.0);
    mapOutValue.setStock_symbol(mapInFieldsArray[1]);
    mapOutValue.setStock_volume(0);
    collector.collect(mapOutKey, mapOutValue);
   }
  }
 }

 public static class MyReducer extends MapReduceBase implements
   Reducer<NYSESymbolYearWritable, NYSEWritable, Text, Text> {
  Text reduceOutKey = new Text();
  Text reduceOutValue = new Text();
  Map<String, String> companiesMap = new HashMap<String, String>();

  /*
   * Read the companies file and load it to a HashMap to be used as
   * Distributed Cache. The HashMap will have the Symbol as the Key and
   * Full company name as Value. See Point 4 in Design Considerations.
   */
  public void loadMap(JobConf job) throws IOException {
   Path companiesFilePath = new Path(
     "/user/hduser/NYSE/companies/NYSESymbolCompany");
   FileSystem fs = FileSystem.get(job);
   FSDataInputStream iStream = fs.open(companiesFilePath);
   InputStreamReader iStreamReader = new InputStreamReader(iStream);
   BufferedReader br = new BufferedReader(iStreamReader);
   String line = "";
   while ((line = br.readLine()) != null) {
    String[] companiesArr = line.split("\t");
    if (companiesArr.length == 2) {
     companiesMap.put(companiesArr[0].trim(), companiesArr[1]);
    }
   }
   /*
    * for (Map.Entry<String, String> mE : companiesMap .entrySet()) {
    * System.out.println("atom : key - " + mE.getKey() + " value - " +
    * mE.getValue()); }
    */
  }

  /*
   * In the old MapRed API, the configure method is run one time at the
   * beginning of Map/Reduce Task. In the New API setup() method takes its
   * place
   */

  public void configure(JobConf job) {
   try {
    loadMap(job);
   } catch (IOException e) {
    System.out.println(" Error calling loadMap");
    e.printStackTrace();
   }
  }

  @Override
  public void reduce(NYSESymbolYearWritable reduceInKey,
    Iterator<NYSEWritable> reduceInValues,
    OutputCollector<Text, Text> collector, Reporter reporter)
    throws IOException {

   double maxStockPrice = 0;
   double minStockPrice = Double.MAX_VALUE;
   double sumDividends = 0;
   NYSEWritable reduceInValue = new NYSEWritable();
   StringBuilder sb = new StringBuilder("");
   int count = 0;
   reduceOutKey.set(reduceInKey.toString());
   while (reduceInValues.hasNext()) {
    reduceInValue = reduceInValues.next();
    if (reduceInValue.getFile_identifier().equals("prices")) {
     maxStockPrice = (maxStockPrice > reduceInValue
       .getStock_price_high()) ? maxStockPrice
       : reduceInValue.getStock_price_high();
     minStockPrice = (minStockPrice < reduceInValue
       .getStock_price_low()) ? minStockPrice
       : reduceInValue.getStock_price_low();
    } else if (reduceInValue.getFile_identifier().equals(
      "dividends")) {
     sumDividends = sumDividends
       + reduceInValue.getStock_dividend();
     count++;
    }
   }
   sb.append("companyName=");
   sb.append(companiesMap.get(reduceInKey.getStock_symbol().trim()));
   sb.append(",maxStockPrice=");
   sb.append(maxStockPrice);
   sb.append(",minStockPrice=");
   sb.append(minStockPrice);
   sb.append(",avgDividend=");
   sb.append(sumDividends / count);
   reduceOutValue.set(sb.toString());
   collector.collect(reduceOutKey, reduceOutValue);
  }
 }

 /*
  * Customized class output file format class to split the output file of
  * reducer based on the Year. Refer to Point 6 in Design Considerations.
  */
 public static class MyMultipleOutputFileFormat extends
   MultipleTextOutputFormat<Text, Text> {
  public String generateFileNameForKeyValue(Text key, Text value,
    String name) {
   String[] keyArr = key.toString().split("=");
   String outfolder = keyArr[2].substring(0, 4);
   return new Path(outfolder, name).toString();
  }
 }

 /* Main Driver Method */
 public static void main(String[] args) throws IOException,
   ClassNotFoundException, InterruptedException {
  Configuration conf = new Configuration();
  JobConf job = new JobConf(conf);

  job.setJobName("NYSE Yearly Analysis");

  job.setJarByClass(NYSEYearlyAnalysis.class);
  // Define Reducer Class. Mapper Classes are defined later for two diff
  // input files.
  job.setReducerClass(MyReducer.class);

  // Output Key-Value Data types
  job.setMapOutputKeyClass(NYSESymbolYearWritable.class);
  job.setMapOutputValueClass(NYSEWritable.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  // Inform Input/Output Formats and Directory Locations

  /*
   * Since there are two mappers here for two different files, You need to
   * define which mapper processes which input file. Refer to Point 2 in
   * Design Considerations.
   */
  MultipleInputs.addInputPath(job, new Path(args[0]),
    TextInputFormat.class, PricesMapper.class);
  MultipleInputs.addInputPath(job, new Path(args[1]),
    TextInputFormat.class, DividendMapper.class);
  FileOutputFormat.setOutputPath(job, new Path(args[2]));
  job.setOutputFormat(MyMultipleOutputFileFormat.class);

  // Inform termination criteria
  JobClient.runJob(job);

 }

}

2 comments:

  1. Could you please include the execution command as well?

    ReplyDelete
  2. Please include the Linux command to execute a a move job..thank you

    ReplyDelete

Popular

Featured

Three Months of Chadhei