博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop源码篇---解读Mapprer源码outPut输出
阅读量:7075 次
发布时间:2019-06-28

本文共 11362 字,大约阅读时间需要 37 分钟。

一。前述

上次讲完MapReduce的输入后,这次开始讲MapReduce的输出。注意MapReduce的原语很重要:

相同”的key为一组,调用一次reduce方法,方法内迭代这一组数据进行计算!!!!!

二。代码

继续看MapTask任务。

private 
void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException { // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); // make a mapper org.apache.hadoop.mapreduce.Mapper
mapper = (org.apache.hadoop.mapreduce.Mapper
) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // make the input format org.apache.hadoop.mapreduce.InputFormat
inputFormat = (org.apache.hadoop.mapreduce.InputFormat
) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); // rebuild the input split org.apache.hadoop.mapreduce.InputSplit split = null; split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); LOG.info("Processing split: " + split); org.apache.hadoop.mapreduce.RecordReader
input = new NewTrackingRecordReader
(split, inputFormat, reporter, taskContext); job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); org.apache.hadoop.mapreduce.RecordWriter output = null; // get an output object if (job.getNumReduceTasks() == 0) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { output = new NewOutputCollector(taskContext, job, umbilical, reporter);源码解析一 } org.apache.hadoop.mapreduce.MapContext
mapContext = new MapContextImpl
(job, getTaskID(), input, output, committer, reporter, split); org.apache.hadoop.mapreduce.Mapper
.Context mapperContext = new WrappedMapper
().getMapContext( mapContext); try { input.initialize(split, mapperContext); mapper.run(mapperContext); mapPhase.complete(); setPhase(TaskStatus.Phase.SORT); statusUpdate(umbilical); input.close(); input = null; output.close(mapperContext); output = null; } finally { closeQuietly(input); closeQuietly(output, mapperContext); } }

解析一。构造OutPut对象:

NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,                       JobConf job,                       TaskUmbilicalProtocol umbilical,                       TaskReporter reporter                       ) throws IOException, ClassNotFoundException {      collector = createSortingCollector(job, reporter);//对应解析源码1.2      partitions = jobContext.getNumReduceTasks();//分区数等于Reduce数,分区数大于分组的概念。      if (partitions > 1) {        partitioner = (org.apache.hadoop.mapreduce.Partitioner
) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);//对应源码1.1 } else { partitioner = new org.apache.hadoop.mapreduce.Partitioner
() { @Override public int getPartition(K key, V value, int numPartitions) { return partitions - 1;//用户不设置时默认框架一个reduce,并且分区号为0 } }; } }   @Override     public void write(K key, V value) throws IOException, InterruptedException {
      collector.collect(key, value,                         partitioner.getPartition(key, value, partitions));//上下文对象构造写出的值,放在collect缓存区中。     }

解析1.1

public Class
> getPartitionerClass()throws ClassNotFoundException {return (Class
>)conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);//当用户设置取用户的,没设置默认HashPartitioner 对应解析源码1.1.1

解析源码1.2createSortingCollector类的具体实现

private 
MapOutputCollector
createSortingCollector(JobConf job, TaskReporter reporter) throws IOException, ClassNotFoundException { MapOutputCollector.Context context = new MapOutputCollector.Context(this, job, reporter); Class
[] collectorClasses = job.getClasses( JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class); int remainingCollectors = collectorClasses.length; for (Class clazz : collectorClasses) { try { if (!MapOutputCollector.class.isAssignableFrom(clazz)) { throw new IOException("Invalid output collector class: " + clazz.getName() + " (does not implement MapOutputCollector)"); } Class
subclazz = clazz.asSubclass(MapOutputCollector.class); LOG.debug("Trying map output collector class: " + subclazz.getName()); MapOutputCollector
collector = ReflectionUtils.newInstance(subclazz, job); collector.init(context);//解析源码对应1.2.1 LOG.info("Map output collector class = " + collector.getClass().getName()); return collector; } catch (Exception e) { String msg = "Unable to initialize MapOutputCollector " + clazz.getName(); if (--remainingCollectors > 0) { msg += " (" + remainingCollectors + " more collector(s) to try)"; } LOG.warn(msg, e); } } throw new IOException("Unable to initialize any output collector"); }

 解析源码1.2.1 缓冲区collect的初始化

public void init(MapOutputCollector.Context context                    ) throws IOException, ClassNotFoundException {      job = context.getJobConf();      reporter = context.getReporter();      mapTask = context.getMapTask();      mapOutputFile = mapTask.getMapOutputFile();      sortPhase = mapTask.getSortPhase();      spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);      partitions = job.getNumReduceTasks();      rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();      //sanity checks      final float spillper =        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);//缓冲区溢写阈值,      final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);//缓冲区默认单位是100M      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);      if (spillper > (float)1.0 || spillper <= (float)0.0) {        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +            "\": " + spillper);      }      if ((sortmb & 0x7FF) != sortmb) {        throw new IOException(            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);      }      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",            QuickSort.class, IndexedSorter.class), job);//Map从缓冲区往磁盘写文件的时候需要排序,用的快排。      // buffers and accounting      int maxMemUsage = sortmb << 20;      maxMemUsage -= maxMemUsage % METASIZE;      kvbuffer = new byte[maxMemUsage];      bufvoid = kvbuffer.length;      kvmeta = ByteBuffer.wrap(kvbuffer)         .order(ByteOrder.nativeOrder())         .asIntBuffer();      setEquator(0);      bufstart = bufend = bufindex = equator;      kvstart = kvend = kvindex;      maxRec = kvmeta.capacity() / NMETA;      softLimit = (int)(kvbuffer.length * spillper);      bufferRemaining = softLimit;      LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);      LOG.info("soft limit at " + softLimit);      LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);      LOG.info("kvstart = " + kvstart + "; length = " + maxRec);  comparator = job.getOutputKeyComparator();//排序所使用的比较器 见源码解析1,2.1.1       keyClass = (Class
)job.getMapOutputKeyClass();       valClass = (Class
)job.getMapOutputValueClass();       serializationFactory = new SerializationFactory(job);       keySerializer = serializationFactory.getSerializer(keyClass);       keySerializer.open(bb);       valSerializer = serializationFactory.getSerializer(valClass);       valSerializer.open(bb); // combiner       final Counters.Counter combineInputCounter =         reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);       combinerRunner = CombinerRunner.create(job, getTaskID(), //map端的组合                                              combineInputCounter,                                              reporter, null);       if (combinerRunner != null) {
        final Counters.Counter combineOutputCounter =           reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);         combineCollector= new CombineOutputCollector
(combineOutputCounter, reporter, job);       } else {
        combineCollector = null;       }       spillInProgress = false;       minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);//小文件最少是3时,会合并小文件。       spillThread.setDaemon(true);//线程是另外一个线程负责写的 见解析源码1.2.1.2       spillThread.setName("SpillThread");       spillLock.lock();

总结:Mappper输出到缓冲区默认是100M,写到0.8时,会溢写!!!!这块可以调优。通过来回折半来调比如第一次调整50% 然后再80%中减小 70% 然后60%来回折半。

          Combine一定要注意,比如求平均值

 解析1,2.1.1排序比较器的实现

 

public RawComparator getOutputKeyComparator() {    Class
theClass = getClass( JobContext.KEY_COMPARATOR, null, RawComparator.class);字典排序 默认 if (theClass != null) return ReflectionUtils.newInstance(theClass, this); return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);//如果用户没有设置排序比较器,就是Key类型自己的比较器,所以Key必须实现序列化,反序列化,比较器。 }

 

总结:框架默认使用Key的比较器,字典排序 默认,用户也可以覆盖Key的比较器,自定义。!!!

 

解析源码1.2.1.2 溢写线程做的事
protected class SpillThread extends Thread {      @Override      public void run() {        spillLock.lock();        spillThreadRunning = true;        try {          while (true) {            spillDone.signal();            while (!spillInProgress) {              spillReady.await();            }            try {              spillLock.unlock();              sortAndSpill();//排序溢写            } catch (Throwable t) {              sortSpillException = t;            } finally {              spillLock.lock();              if (bufend < bufstart) {                bufvoid = kvbuffer.length;              }              kvstart = kvend;              bufstart = bufend;              spillInProgress = false;            }          }        } catch (InterruptedException e) {          Thread.currentThread().interrupt();        } finally {          spillLock.unlock();          spillThreadRunning = false;        }      }    }

总结:Map往缓冲区写入东西,线程把缓冲区中的内容做溢写,开始排序,溢写使用快排!!!Combine也在内存中,buffer也在内存,这些计算逻辑都在内存中,排序算法也在内存中,因为Map方法在内存中,这是第一次Combine,从Buffer产生一堆小文件的时候,然后一堆小文件在合并的时候还会执行一次Combine,这次有条件限制(小文件数量大于3)。

 

 

 

解析源码1.1.1

public class HashPartitioner
extends Partitioner
{ /** Use {
@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;!!! }
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;!!!重要取分区的写法!! 总结1.以上源码来源于 output = new NewOutputCollector(taskContext, job, umbilical, reporter);所以可得出在输出构造的时候需要构造一个分区器。要么是0的,要么是用户设置的,要么是默认的。 总结2.在输出构造中,有缓冲区的设置。 总结3,以上方法都是OutPut的初始化。 总结4.Map输出的K,V变成K,V,P然后写入到环形缓冲区,内存缓存区80%,然后溢写排序,(先按分区排序,然后再按Key的组排序),然后生成小文件,然后合并,用的归并算法,此时小文件已经是内部有序的,所以使用归并算法,一次io即可。

 

持续更新中。。。。,欢迎大家关注我的公众号LHWorld.

 

 

 

转载于:https://www.cnblogs.com/LHWorldBlog/p/8252953.html

你可能感兴趣的文章