Skip to content

Latest commit

 

History

History
165 lines (136 loc) · 5.43 KB

mapreduce_1_2_maptask.md

File metadata and controls

165 lines (136 loc) · 5.43 KB

#MapTask的数据处理流程


MapTask mapper map之间的关系

  • MapTask类似于OS中一个进程
  • mapper仅仅是其中一个专门处理数据的对象,那么还需要输入数据和输出数据的对象
  • map就是mapper如何处理数据的一个方法

###MapTask的流程


#####MapTask的run方法

(来自org.apache.hadoop.mapred.MapTask)
  public void run(...) {
    //...一些初始化工作
    runNewMapper(...);
    //...精简代码
  }
  • MapTask就是做一些初始化工作,然后调用runNewMapper方法
  • 和名字一样,runNewMapper就是启动mapper
  • runNewMapper中的New是指使用了新的API

#####runNewMapper方法的几个作用

(来自org.apache.hadoop.mapred.MapTaskrunNewMapper(...){
        //初始化输入输出操作等对象
        mapper.run(...);//处理数据
        //清理工作
    }
  • 初始化各种对象
  • 输入
  • 输出
  • mapper
  • 运行mapper来处理数据
  • 最后清理工作

###mapper运行的核心逻辑


(来自org.apache.hadoop.mapreduce.Mapperpublic void run(Context context) throws IOException, InterruptedException {
        setup(context);
        while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
}
    默认的map方法
    protected void map(KEYIN key, VALUEIN value, 
                         Context context) throws IOException, InterruptedException {
        context.write((KEYOUT) key, (VALUEOUT) value);
    }

#####流程如下:

  • setup方法做了一些配置,默认是空。
  • 不断的读取下一个(K,V),并交给map逐个处理KV
  • 默认的map方法就是什么都不做,输入和输出时一样的
  • 这里使用的getCurrentKey()是对RecordReader的getCurrentKey()的封装。
  • 这里使用的getCurrentValue()是对RecordReader的getCurrentValue()对应封装。
  • write方法是对输出对象的write方法的封装

#####以WordCount来举例

  • 类LineReader可以从流中读取一行,类LineRecordReader将这一行变成一个键值对。
  • 之后如何获取这些键值对并且处理呢? 注意: map()只处理一个键值对
  • 但是每个split中肯定不止一行,那么是如何不断的读取每一行,生成一个个的键值对来处理呢?
  • 答案就是这里,mapper在运行的时候不断的获得键值对,不断的交给map去处理。

###输入的实例化


#####InputFormat和RecordReader的区别

  • InputFormat和RecordReader的分别初始化,感觉有点奇怪
  • 因为InputFormat关注于文件如何分割,所以有isSplitable、getSplits方法
  • 而RecordReader关注于将一个文件中的内容转换成了键值对

#####以WordCount来举例

  • FileInputFormat将一个文件分成几个split
  • LineRecordReader将文本封装为(K,V),K是偏移字节数,V是每行的内容。

#####InputFormat

(来自org.apache.hadoop.mapred.MapTaskrunNewMapper()部分代码
    // make the input format
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
  • 这个主要是如何从一个文件分为split,生成一个输入对象
  • 可以看出来,调用了getInputFormatClass,这样可以获取到我们自己设置的输入类
  • 默认输入方法是TextInputFormat

#####RecordReader

runNewMapper()中初始化RecordReader
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
          (split, inputFormat, reporter, job, taskContext);
  • NewTrackingRecordReader<INKEY,INVALUE>其实是对获得(K,V)的方法又进行了封装,增加了一些 记录。
  • 这个输入对象才能够读取文本,封装了键值对,提供nextKeyValue(),getCurrentkey(),getCurrentValue()等方法。

###输出的实例化


  • map处理后的输出是放在缓冲区中

####RecordWriter

(来自org.apache.hadoop.mapred.MapTaskrunNewMapper()中初始化RecordWriter
       // get an output object
      if (job.getNumReduceTasks() == 0) {
         output =
           new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
      } else {
        output = new NewOutputCollector(taskContext, job, umbilical, reporter);
      }

#####NewOutputCollector中的write()

  • 可以从map的运行中看出,调用的输出对象context的write方法,其实就是out的write方法
  public void write(K key, V value) throws IOException, InterruptedException {
      collector.collect(key, value,
                        partitioner.getPartition(key, value, partitions));
    }

其中

MapOutputCollector<K,V> collector;
collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
  • 这里可以知道了最终是使用的MapOutputBuffer中的collect(),直接写入了缓存中
  • 收集的数据就是(key,value,partition)三个数据作为一个逻辑单元。至于为什么要加入partition,之后再解释。
  • 现在整体的架构已经清楚了,那么我们之后重点关注(K,V)在经过map处理之后又发生了什么。

###总结

  • MapTask负责初始化各种输入、处理、输出的对象
  • InputFormat对象和RecordReader对象负责将数据封装好输入
  • mapper对象中不断的读取数据,通过map方法去处理
  • map处理之后的数据,缓冲在MapOutputBuffer对象中