cachejava task 定时任务是什么任务

2428人阅读
hadoop(18)
reduce执行流程经历三个阶段:copy、sort、reduce,在第一阶段reduce任务会把map的输出拷贝至本地,通过线程MapOutputCopier,该线程通过http协议将map输出拷贝至本地,该copy操作可以并行进行,默认情况下有5个线程执行此操作,如果map数量较大时可以适当调大此值,拷贝时使用http协议,此时reducetask为client,map端以jetty作为web服务器。reduce任务的执行与map一样在Child类启动,但在TaskFinal.run(job,umbilical)进入ReduceTask类执行。reduce的过程比较复杂,本节只分析copy部分,最后会分析整个reduce流程,需要注意的是每个reduce只拷贝自己需要处理那个partition数据。
拷贝map输出结果代码:ReduceTask.java 1309行
/** Loop forever and fetch map outputs as they become available.
* The thread exits when it is interrupted by {@link ReduceTaskRunner}
public void run() {
while (true) {
MapOutputLocation loc =
long size = -1;
//不停查询调度拷贝的集合,如果有数据到来,则说明需要做copy操作
synchronized (scheduledCopies) {
while (scheduledCopies.isEmpty()) {
scheduledCopies.wait();
//取出第一个输出位置
loc = scheduledCopies.remove(0);
CopyOutputErrorType error = CopyOutputErrorType.OTHER_ERROR;
readError =
shuffleClientMetrics.threadBusy();
start(loc);
//开始读取数据并返回长度,这也是下面代码中要分析的函数
size = copyOutput(loc);
//更新统计信息
shuffleClientMetrics.successFetch();
error = CopyOutputErrorType.NO_ERROR;
} catch (IOException e) {
LOG.warn(reduceTask.getTaskID() + & copy failed: & +
loc.getTaskAttemptId() + & from & + loc.getHost());
LOG.warn(StringUtils.stringifyException(e));
shuffleClientMetrics.failedFetch();
if (readError) {
error = CopyOutputErrorType.READ_ERROR;
size = -1;
} finally {
shuffleClientMetrics.threadFree();
//当前连接加入copyResults集合
finish(size, error);
} catch (InterruptedException e) {
// 执行到此步则证明拷贝完成
} catch (FSError e) {//文件系统错误
LOG.error(&Task: & + reduceTask.getTaskID() + & - FSError: & +
StringUtils.stringifyException(e));
umbilical.fsError(reduceTask.getTaskID(), e.getMessage(), jvmContext);
} catch (IOException io) {
LOG.error(&Could not notify TT of FSError: & +
StringUtils.stringifyException(io));
} catch (Throwable th) {
String msg = getTaskID() + & : Map output copy failure : &
+ StringUtils.stringifyException(th);
reportFatalError(getTaskID(), th, msg);
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
下面分析copyOutput函数:ReduceTask.java 1373行
/** Copies a a map output from a remote host, via HTTP.
* @param currentLocation the map output location to be copied
* @return the path (fully qualified) of the copied file
* @throws IOException if there is an error copying the file
* @throws InterruptedException if the copier should give up
private long copyOutput(MapOutputLocation loc
) throws IOException, InterruptedException {
// 检查该位置是否还需拷贝
if (copiedMapOutputs.contains(loc.getTaskId()) ||
obsoleteMapIds.contains(loc.getTaskAttemptId())) {
return CopyResult.OBSOLETE;
// 内存写满时需要用到的临时文件
TaskAttemptID reduceId = reduceTask.getTaskID();
Path filename =
new Path(String.format(
MapOutputFile.REDUCE_INPUT_FILE_FORMAT_STRING,
TaskTracker.OUTPUT, loc.getTaskId().getId()));
// Copy the map output to a temp file whose name is unique to this attempt
Path tmpMapOutput = new Path(filename+&-&+id);
// 开始拷贝map的输出
MapOutput mapOutput = getMapOutput(loc, tmpMapOutput,
reduceId.getTaskID().getId());
if (mapOutput == null) {
throw new IOException(&Failed to fetch map-output for & +
loc.getTaskAttemptId() + & from & +
loc.getHost());
// 获得输出尺寸
long bytes = pressedS
// lock the ReduceTask while we do the rename
synchronized (ReduceTask.this) {
if (copiedMapOutputs.contains(loc.getTaskId())) {
mapOutput.discard();
return CopyResult.OBSOLETE;
// Special case: discard empty map-outputs
if (bytes == 0) {
mapOutput.discard();
} catch (IOException ioe) {
(&Couldn't discard output of & + loc.getTaskId());
// Note that we successfully copied the map-output
noteCopiedMapOutput(loc.getTaskId());
// 判断是否完全在内存中,根据具体情况执行不同分支
if (mapOutput.inMemory) {
// 如果完全在内存中则放入内存文件的集合中
mapOutputsFilesInMemory.add(mapOutput);
// Rename the temporary fi
// ensure it is on the same partition
tmpMapOutput = mapOutput.
filename = new Path(tmpMapOutput.getParent(), filename.getName());
if (!localFileSys.rename(tmpMapOutput, filename)) {
localFileSys.delete(tmpMapOutput, true);
bytes = -1;
throw new IOException(&Failed to rename map output & +
tmpMapOutput + & to & + filename);
synchronized (mapOutputFilesOnDisk) {
addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename));
// Note that we successfully copied the map-output
noteCopiedMapOutput(loc.getTaskId());
getMapOutput函数负责拷贝输出的工作,利用URLConnection建立连接,url格式类似:http://PC-RGUY:50060/mapOutput?job=job__0003&map=attempt__0003_m_&reduce=1
,包含协议类型:http,主机及端口:PC-RGUY:50060,路径名称:mapOutput,查询参数包含作业名、map任务名、reduce编号:ob=job__0003&map=attempt__0003_m_&reduce=1
url会根据这个地址建立连接,并打开一个输入流读取数据。在开始读取前会判断本次的读取是否能全部放入缓存中,这部分缓存使用是有限制的:jvm_heap_size × mapred.job.shuffle.input.buffer.percent × MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION,其中jvm_heap_size可以通过mapred.job.reduce.total.mem.bytes来设置,如果没设置则通过Runtime.getRuntime().maxMemory()来获取,可以通过mapred.child.opts来影响jvm堆的大小,mapred.job.shuffle.input.buffer.percent可以在参数文件中设置,默认为0.7,MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION在当前版本中为一常量&#,也就是说加入我们指定jvm堆大小为1024M,那么一个ReduceTask在拷贝时用到的缓存为×0.25=179M,当我们的map输出大于179M时,则直接写入文件.
ReduceTask.java 1373行
private MapOutput getMapOutput(MapOutputLocation mapOutputLoc,
Path filename, int reduce)
throws IOException, InterruptedException {
// 建立url连接
URL url = mapOutputLoc.getOutputLocation();
URLConnection connection = url.openConnection();
InputStream input = setupSecureConnection(mapOutputLoc, connection);
// 校验任务ID
TaskAttemptID mapId =
TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK));
} catch (IllegalArgumentException ia) {
LOG.warn(&Invalid map id &, ia);
TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId();
if (!mapId.equals(expectedMapId)) {
LOG.warn(&data from wrong map:& + mapId +
& arrived to reduce task & + reduce +
&, where as expected map output should be from & + expectedMapId);
//判断返回数据长度是否异常
//取得压缩和未压缩长度,后面判断在内存还是硬盘做shuffle
long decompressedLength =
Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
long compressedLength =
Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
if (compressedLength & 0 || decompressedLength & 0) {
LOG.warn(getName() + & invalid lengths in map output header: id: & +
mapId + & compressed len: & + compressedLength +
&, decompressed len: & + decompressedLength);
//判断reduce编号是否相同
int forReduce =
(int)Integer.parseInt(connection.getHeaderField(FOR_REDUCE_TASK));
if (forReduce != reduce) {
LOG.warn(&data for the wrong reduce: & + forReduce +
& with compressed len: & + compressedLength +
&, decompressed len: & + decompressedLength +
& arrived to reduce task & + reduce);
if (LOG.isDebugEnabled()) {
LOG.debug(&header: & + mapId + &, compressed len: & + compressedLength +
&, decompressed len: & + decompressedLength);
//We will put a file in memory if it meets certain criteria:
//1. The size of the (decompressed) file should be less than 25% of
the total inmem fs
//2. There is space available in the inmem fs
// 判断拷贝的数据能否完全放入内存中,内存计算公式为: JVM堆尺寸×mapred.job.shuffle.input.buffer.percent(0.7)× 1/4
boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength);
// Shuffle
MapOutput mapOutput =
if (shuffleInMemory) {
if (LOG.isDebugEnabled()) {
LOG.debug(&Shuffling & + decompressedLength + & bytes (& +
compressedLength + & raw bytes) & +
&into RAM from & + mapOutputLoc.getTaskAttemptId());
//如果可以放入内存,则放入新建立的byte buffer中
mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
(int)decompressedLength,
(int)compressedLength);
if (LOG.isDebugEnabled()) {
LOG.debug(&Shuffling & + decompressedLength + & bytes (& +
compressedLength + & raw bytes) & +
&into Local-FS from & + mapOutputLoc.getTaskAttemptId());
//内存中放不下则放入磁盘中
mapOutput = shuffleToDisk(mapOutputLoc, input, filename,
compressedLength);
return mapO
如果内存足够大,则copy过来的数据直接放入内存中,首先会分配一个byte数组,然后从上面建立的输入流冲取得所需数据。
ReduceTask.java 1646行
private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
URLConnection connection,
InputStream input,
int mapOutputLength,
int compressedLength)
throws IOException, InterruptedException {
// 判断是否有足够内存存放数据,如果没有则等待内存刷新,
//刷新内存前会讲输入流置空,所以在这个函数返回为false时需要重新连接,刷新数据时会唤醒内存合并线程
boolean createdNow = ramManager.reserve(mapOutputLength, input);
//是否需要重新连接
if (!createdNow) {
// Reconnect
connection = mapOutputLoc.getOutputLocation().openConnection();
input = setupSecureConnection(mapOutputLoc, connection);
} catch (IOException ioe) {
(&Failed reopen connection to fetch map-output from & +
mapOutputLoc.getHost());
// Inform the ram-manager
ramManager.closeInMemoryFile(mapOutputLength);
ramManager.unreserve(mapOutputLength);
//包装输入流
IFileInputStream checksumIn =
new IFileInputStream(input,compressedLength);
input = checksumIn;
// Are map-outputs compressed?
if (codec != null) {
decompressor.reset();
input = codec.createInputStream(input, decompressor);
// 创建buffer,从输入流读取并填充
byte[] shuffleData = new byte[mapOutputLength];
MapOutput mapOutput =
new MapOutput(mapOutputLoc.getTaskId(),
mapOutputLoc.getTaskAttemptId(), shuffleData, compressedLength);
int bytesRead = 0;
//循环读取流冲数据至缓存中
int n = input.read(shuffleData, 0, shuffleData.length);
while (n & 0) {
bytesRead +=
shuffleClientMetrics.inputBytes(n);
// indicate we're making progress
reporter.progress();
n = input.read(shuffleData, bytesRead,
(shuffleData.length-bytesRead));
if (LOG.isDebugEnabled()) {
LOG.debug(&Read & + bytesRead + & bytes from map-output for & +
mapOutputLoc.getTaskAttemptId());
//数据读取完毕则关闭输入流
input.close();
} catch (IOException ioe) {
(&Failed to shuffle from & + mapOutputLoc.getTaskAttemptId(),
// 关闭内存文件,并唤醒内存合并线程
ramManager.closeInMemoryFile(mapOutputLength);
// 校验数据读取长度
if (bytesRead != mapOutputLength) {
// Inform the ram-manager
ramManager.unreserve(mapOutputLength);
// Discard the map-output
mapOutput.discard();
} catch (IOException ignored) {
// IGNORED because we are cleaning up
(&Failed to discard map-output from & +
mapOutputLoc.getTaskAttemptId(), ignored);
mapOutput =
throw new IOException(&Incomplete map output received for & +
mapOutputLoc.getTaskAttemptId() + & from & +
mapOutputLoc.getOutputLocation() + & (& +
bytesRead + & instead of & +
mapOutputLength + &)&
return mapO
如果内存过小不能存放本次读取的数据则直接写入磁盘文件中,我们会在相关目录中看到这个文件如:C:/hadoop/tasklog/taskTracker/hadoop/jobcache/job__0001/attempt__0001_r_/output/map_0.out-0
private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc,
InputStream input,
Path filename,
long mapOutputLength)
throws IOException {
// 构建本地文件系统文件名
Path localFilename =
lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(),
mapOutputLength, conf);
//创建基于磁盘文件的MapOutput
MapOutput mapOutput =
new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(),
conf, localFileSys.makeQualified(localFilename),
mapOutputLength);
// 开始数据拷贝
OutputStream output =
long bytesRead = 0;
output = rfs.create(localFilename);
//讲map输出直接写入磁盘时分配的缓存,固定64K
byte[] buf = new byte[64 * 1024];
int n = -1;
n = input.read(buf, 0, buf.length);
} catch (IOException ioe) {
readError =
while (n & 0) {
bytesRead +=
shuffleClientMetrics.inputBytes(n);
output.write(buf, 0, n);
// indicate we're making progress
reporter.progress();
n = input.read(buf, 0, buf.length);
} catch (IOException ioe) {
readError =
(&Read & + bytesRead + & bytes from map-output for & +
mapOutputLoc.getTaskAttemptId());
output.close();
input.close();
} catch (IOException ioe) {
(&Failed to shuffle from & + mapOutputLoc.getTaskAttemptId(),
// Discard the map-output
mapOutput.discard();
} catch (IOException ignored) {
(&Failed to discard map-output from & +
mapOutputLoc.getTaskAttemptId(), ignored);
mapOutput =
// Close the streams
IOUtils.cleanup(LOG, input, output);
// Re-throw
// 读取数据后的检测
if (bytesRead != mapOutputLength) {
mapOutput.discard();
} catch (Exception ioe) {
// IGNORED because we are cleaning up
(&Failed to discard map-output from & +
mapOutputLoc.getTaskAttemptId(), ioe);
} catch (Throwable t) {
String msg = getTaskID() + & : Failed in shuffle to disk :&
+ StringUtils.stringifyException(t);
reportFatalError(getTaskID(), t, msg);
mapOutput =
throw new IOException(&Incomplete map output received for & +
mapOutputLoc.getTaskAttemptId() + & from & +
mapOutputLoc.getOutputLocation() + & (& +
bytesRead + & instead of & +
mapOutputLength + &)&
return mapO
阅读笔记:
1. MapOutput类 本质上是一个指向一个数据块的指针,该数据块可以在硬盘上,也可以在内存上。(1)final boolean inMemory表示该数据块是否在内存中 (2)final Path file表示数据在硬盘上的路径 (3)byte[] data表示数据在内存中的数据块
2. ReduceTask.run() 是ReduceTask的起始点。其中分为三部分:(1).Copy阶段(由reduceCopier.fetchOutputs()完成)
(2).Sort阶段(由Merger.merge()完成) (3).Reduce阶段(由runOldReducer()或者runNewReducer()完成)
3. fetchOutputs()函数中启动多个(由mapred.reduce.parallel.copies属性设置,默认为5个)MapOutputCopier线程进行远程数据拷贝到本地。远程拷贝运行过程中,存在&InMemFSMergeThread线程 和&LocalFSMerger线程 进行文件合并。
4. MapOutput类中的 discard()函数即抛弃拷贝的map输出结果。若该MapOutput数据块在内存上,则将数据指针data置null,若数据块在硬盘上,则调用 fs.delete(file,true) 删除该文件。
5. 远程拷贝过程中,每次拷贝一个数据块时,若该数据块可以放入内存则放入内存,否则放入硬盘。有两个标准决定该数据块是否应该放入硬盘:(1) 数据块小于 java_heaps _size * mapred.job.shuffle.input.buffer.percent * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION(0.25) (2) 内存中有足够空间放入该数据块。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:73667次
积分:1000
积分:1000
排名:千里之外
原创:14篇
转载:28篇
评论:28条
(1)(4)(3)(3)(2)(1)(5)(9)(1)(2)(2)(1)(1)(1)(4)(4)(2)Bigdatda-MapReduce(24)
1.Combiner的作用是什么?
2.作业级别参数如何调优?
3.任务及管理员级别有哪些可以调优?
Hadoop为用户作业提供了多种可配置的参数,以允许用户根据作业特点调整这些参数值使作业运行效率达到最优。
一 应用程序编写规范
1.设置Combiner
& && &&&对于一大批MapReduce程序,如果可以设置一个Combiner,那么对于提高作业性能是十分有帮助的。Combiner可减少Map
Task中间输出的结果,从而减少各个Reduce Task的远程拷贝数据量,最终表现为Map Task和Reduce Task执行时间缩短。
2. 选择合理的Writable类型
& && & 在MapReduce模型中,Map Task和Reduce Task的输入和输出类型均为Writable。Hadoop本身已经提供了很多Writable实现,包括IntWritable、FloatWritable。为应用程序处理的数据选择合适的Writable类型可大大提升性能。比如处理整数类型数据时,直接采用IntWritable比先以Text类型读入在转换为整数类型要高效。如果输出整数的大部分可用一个或两个字节保存,那么直接采用VIntWritable或者VLongWritable,它们采用了变长整型的编码方式,可以大大减少输出数据量。
二 作业级别参数调优
1.规划合理的任务数目
& &在Hadoop中,每个Map Task处理一个Input Split。Input Split的划分方式是由用户自定义的InputFormat决定的,默认情况下,有以下三个参数决定。
& &mapred.min.split.size :Input Split的最小值 默认值1
& &mapred.max.split.szie:& &Input Split的最大值
& &dfs.block.size:HDFS 中一个block大小& &默认值64MB
& &golsize:它是用户期望的Input Split数目=totalSize/numSplits ,其中totalSize为文件的总大小;numSplits为用户设定的Map Task个数,默认情况下是1.
& &&splitSize
= max{minSize,min{goalSize,blockSize}} 如果想让InputSize尺寸大于block尺寸,直接增大配置参数mpared.min.split.size即可。
2.增加输入文件的副本数
& && &如果一个作业并行执行的任务数目非常多,那么这些任务共同的输入文件可能成为瓶颈。为防止多个任务并行读取一个文件内容造成瓶颈,用户可根据需要增加输入文件的副本数目。
3.启动推测执行机制
& &&&推测执行是Hadoop对“拖后腿”的任务的一种优化机制,当一个作业的某些任务运行速度明显慢于同作业的其他任务时,Hadoop会在另一个节点上为“慢任务”启动一个备份任务,这样两个任务同时处理一份数据,而Hadoop最终会将优先完成的那个任务的结果作为最终结果,并将另一个任务杀掉。
4.设置失败容忍度
& &&&Hadoop运行设置任务级别和作业级别的失败容忍度。作业级别的失败容忍度是指Hadoop允许每个作业有一定比例的任务运行失败,这部分任务对应的输入数据将被忽略;
& &&&任务级别的失败容忍度是指Hadoop允许任务失败后再在另外节点上尝试运行,如果一个任务经过若干次尝试运行后仍然运行失败,那么Hadoop才会最终认为该任务运行失败。
& && &用户应该根据应用程序的特点设置合理的失败容忍度,以尽快让作业运行完成和避免没必要的资源浪费。
5.适当打开JVM重用功能
& && & 为了实现任务隔离,Hadoop将每个任务放到一个单独的JVM中执行,而对于执行时间较短的任务,JVM启动和关闭的时间将占用很大比例时间,为此,用户可以启用JVM重用功能,这样一个JVM可连续启动多个同类型的任务。
6.设置任务超时时间
& && & 如果一个任务在一定的时间内未汇报进度,则TaskTracker会主动将其杀死,从而在另一个节点上重新启动执行。用户可根据实际需要配置任务超时时间。
7.合理使用DistributedCache
& && & 一般情况下,得到外部文件有两种方法:一种是外部文件与应用程序jar包一起放到客户端,当提交作业时由客户端上传到HDFS的一个目录下,然后通过Distributed Cache分发到各个节点上;另一种方法是事先将外部文件直接放到HDFS上,从效率上讲,第二种方法更高效。第二种方法不仅节省了客户端上传文件的时间,还隐含着告诉DistributedCache:&请将文件下载到各个节点的pubic级别共享目录中”,这样,后续所有的作业可重用已经下载好的文件,不必重复下载。
8.跳过坏记录
& && & Hadoop为用户提供了跳过坏记录的功能,当一条或几条坏数据记录导致任务运行失败时,Hadoop可自动识别并跳过这些坏记录。
9.提高作业优先级
& && & 所有Hadoop作业调度器进行任务调度时均会考虑作业优先级这一因素。作业的优先级越高,它能够获取的资源(slot数目)也越多。Hadoop提供了5种作业优先级,分别为VERY_HIGH、 HIGH、 NORMAL、 LOW、 VERY_LOW。
& &&&注:在生产环境中,管理员已经按照作业重要程度对作业进行了分级,不同重要程度的作业允许配置的优先级不同,用户可以擅自进行调整。
10.合理控制Reduce Task的启动时机
& && && &如果Reduce Task启动过早,则可能由于Reduce Task长时间占用Reduce slot资源造成&slot Hoarding&现象,从而降低资源利用率;反之,如果Reduce Task启动过晚,则会导致Reduce Task获取资源延迟,增加了作业的运行时间。
三 任务级别参数调优
& &&&hadoop任务级别参数调优分两个方面: Map Task和Reduce Task。
1.Map Task调优
& && & map运行阶段分为:Read、Map、Collect、Spill、Merge五个阶段。
& && & map 任务执行会产生中间数据,但这些中间结果并没有直接IO到磁盘上,而是先存储在缓存(buffer)中,并在缓存中进行一些预排序来优化整个map的性能,存储map中间数据的缓存默认大小为100M,由io.sort.mb
参数指定。这个大小可以根据需要调整。当map任务产生了非常大的中间数据时可以适当调大该参数,使缓存能容纳更多的map中间数据,而不至于大频率的IO磁盘,当系统性能的瓶颈在磁盘IO的速度上,可以适当的调大此参数来减少频繁的IO带来的性能障碍。
& && &&&由于map任务运行时中间结果首先存储在缓存中,默认当缓存的使用量达到80%(或0.8)的时候就开始写入磁盘,这个过程叫做spill(也叫溢出),进行spill的缓存大小可以通过io.sort.spill.percent
参数调整,这个参数可以影响spill的频率。进而可以影响IO的频率。
& && &&&当map任务计算成功完成之后,如果map任务有输出,则会产生多个spill。接下来map必须将些spill进行合并,这个过程叫做merge, merge过程是并行处理spill的,每次并行多少个spill是由参数io.sort.factor指定的默认为10个。但是当spill的数量非常大的时候,merge一次并行运行的spill仍然为10个,这样仍然会频繁的IO处理,因此适当的调大每次并行处理的spill数有利于减少merge数因此可以影响map的性能。
& && &&&当map输出中间结果的时候也可以配置压缩。
2. Reduce Task调优
& && &&&reduce 运行阶段分为shuflle(copy) merge sort& &reduce write五个阶段。
& && &&&shuffle 阶段为reduce 全面拷贝map任务成功结束之后产生的中间结果,如果上面map任务采用了压缩的方式,那么reduce 将map任务中间结果拷贝过来后首先进行解压缩,这一切是在reduce的缓存中做的,当然也会占用一部分cpu。为了优化reduce的执行时间,reduce也不是等到所有的map数据都拷贝过来的时候才开始运行reduce任务,而是当job执行完第一个map任务时开始运行的。reduce
在shuffle阶段 实际上是从不同的并且已经完成的map上去下载属于自己的数据,由于map任务数很多,所有这个copy过程是并行的,既同时有许多个reduce取拷贝map,这个并行的线程是通过mapred.reduce.parallel.copies&参数指定,默认为5个,也就是说无论map的任务数是多少个,默认情况下一次只能有5个reduce的线程去拷贝map任务的执行结果。所以当map任务数很多的情况下可以适当的调整该参数,这样可以让reduce快速的获得运行数据来完成任务。
& && & reduce线程在下载map数据的时候也可能因为各种各样的原因(网络原因、系统原因等),存储该map数据所在的datannode 发生了故障,这种情况下reduce任务将得不到该datanode上的数据了,同时该 download thread 会尝试从别的datanode下载,可以通过mapred.reduce.copy.backoff
(默认为30秒)来调整下载线程的下载时间,如果网络不好的集群可以通过增加该参数的值来增加下载时间,以免因为下载时间过长reduce将该线程判断为下载失败。
reduce 下载线程在map结果下载到本地时,由于是多线程并行下载,所以也需要对下载回来的数据进行merge,所以map阶段设置的io.sort.factor 也同样会影响这个reduce的。
& && &同map一样 该缓冲区大小也不是等到完全被占满的时候才写入磁盘而是默认当完成0.66的时候就开始写磁盘操作,该参数是通过mapred.job.shuffle.merge.percent 指定的。
& && &当reduce 开始进行计算的时候通过mapred.job.reduce.input.buffer.percent 来指定需要多少的内存百分比来作为reduce读已经sort好的数据的buffer百分比,该值默认为0。Hadoop假设用户的reduce()函数需要所有的JVM内存,因此执行reduce()函数前要释放所有内存。如果设置了该值,可将部分文件保存在内存中(不必写到磁盘上)。
& &&&总之,Map Task和Reduce Task调优的一个原则就是减少数据的传输量、尽量使用内存、减少磁盘IO的次数、增大任务并行数,除此之外还有根据自己集群及网络的实际情况来调优。
四、管理员角度调优
& && & 管理员负责为用户作业提供一个高效的运行环境。管理员需要从全局出发,通过调整一些关键参数提高系统的吞吐率和性能。总体上来看,管理员需从硬件选择、操作系统参数调优、JVM参数调优和Hadoop参数调优等四个角度入手,为Hadoop用户提供一个高效的作业运行环境。
& &&&Hadoop自身架构的基本特点决定了其硬件配置的选项。Hadoop采用了Master/Slave架构,其中,master维护了全局元数据信息,重要性远远大于slave。在较低Hadoop版本中,master存在单点故障问题,因此,master的配置应远远好于各个slave。
操作系统参数调优
& &&&1.增大同时打开的文件描述符和网络连接上限
& &&&使用ulimit命令将允许同时打开的文件描述符数目上限增大至一个合适的值。同时调整内核参数net.core.somaxconn网络连接数目至一个足够大的值。
& & 补充:net.core.somaxconn的作用&
& &&&net.core.somaxconn是Linux中的一个kernel参数,表示socket监听(listen)的backlog上限。什么是backlog呢?backlog就是socket的监听队列,当一个请求(request)尚未被处理或建立时,它会进入backlog。而socket
server可以一次性处理backlog中的所有请求,处理后的请求不再位于监听队列中。当server处理请求较慢,以至于监听队列被填满后,新来的请求会被拒绝。在Hadoop 1.0中,参数ipc.server.listen.queue.size控制了服务端socket的监听队列长度,即backlog长度,默认值是128。而Linux的参数net.core.somaxconn默认值同样为128。当服务端繁忙时,如NameNode或JobTracker,128是远远不够的。这样就需要增大backlog,例如我们的3000台集群就将ipc.server.listen.queue.size设成了32768,为了使得整个参数达到预期效果,同样需要将kernel参数net.core.somaxconn设成一个大于等于32768的值。
2.关闭swap分区
& &&&避免使用swap分区,提供程序的执行效率。
& &&&除此之外,设置合理的预读取缓冲区的大小、文件系统选择与配置及I/O调度器选择等
JVM参数调优
& &&&由于Hadoop中的每个服务和任务均会运行在一个单独的JVM中,因此,JVM的一些重要参数也会影响Hadoop性能。管理员可通过调整JVM FLAGS和JVM垃圾回收机制提高Hadoop性能。
Hadoop参数调优
&&1.合理规划资源
& && &设置合理的槽位数目
& && &在Hadoop中,计算资源是用槽位表示的。slot分为两种:Map&&Slot和Reduce Slot。每种slot代表一定量的资源,且同种slot是同质的,也就是说,同种slot代表的资源量是相同的。管理员需要根据实际需要为TaskTracker配置一定数目的Map Slot和Reduce
Slot数目,从而限制每个TaskTracker上并发执行的Map Task和Reduce Task的数目。
& && &编写健康监测脚本
& && & Hadoop允许管理员为每个TaskTracker配置一个节点健康状况监测脚本。TaskTracker中包含一个专门的线程周期性执行该脚本,并将脚本执行结果通过心跳机制汇报给JobTracker。一旦JobTracker发现某个TaskTracker的当前状况为“不健康”,则会将其加入黑名单,从此不再为它分配任务。
&&2. 调整心跳配置
& && & 调整心跳的间隔 因根据自己集群的规模适度的调整心跳间隔
& && & 启用带外心跳& &为了减少任务分配延迟,Hadoop引入了带外心跳。带外心跳不同于常规心跳,它是任务运行结束或者任务运行失败时触发的,能够在出现空闲资源时第一时间通知JobTracker,以便它能够迅速为空闲资源分配新的任务。
& && &除此之外,还包括磁盘块配置、设置合理的RPC Handler和HTTP线程数目、慎用黑名单机制、启用批量任务调度、选择合适的压缩算法、启用预读取机制等。
& && &注:当一个集群的规模较小时,如果一定数量的节点被频繁的加入系统黑名单中,则会大大降低集群的吞吐率和计算能力。
& && & Hadoop 性能调优是一项工程浩大的工作,它不仅涉及Hadoop本身的性能调优,还涉及更底层的硬件、操作系统和Java虚拟机等系统的调优。
& && & 总体来说,提高作业运行效率需要Hadoop管理员和作业拥有者共同的努力,其中,管理员负责为用户提供一个高效的作业运行环境,而用户则根据自己作业的特点让它尽可能快地运行完成。
原文地址/thread-.html
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:139391次
积分:4302
积分:4302
排名:第4657名
原创:269篇
转载:158篇
评论:19条
说明:小青年在奋斗中
&&&&&&一个爱学习的小菜鸟。。。
专业:养猪
年级:小学未毕业
所在地:村头干活那小伙就是我~
姓名:鲍礼彬
hadoop/Spark QQ群:
279-807-394
点击图片可以与我QQ交谈
阅读:3254
文章:27篇
阅读:18791
(2)(2)(3)(13)(3)(2)(7)(6)(4)(2)(6)(21)(12)(53)(37)(35)(84)(61)(9)(9)(15)(15)(12)(9)(6)}

我要回帖

更多关于 cachetask计划任务 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信