当前位置:首页>开发>正文

怎么优化hadoop任务调度算法 hadoop怎么使用算法

2023-06-27 08:50:19 互联网 未知 开发

 怎么优化hadoop任务调度算法 hadoop怎么使用算法

怎么优化hadoop任务调度算法

首先介绍了Hadoop平台下作业的分布式运行机制,然后对Hadoop平台自带的4种任务调度器做分析和比较,最后在分析JobTracker类文件的基础上指出了创建自定义任务调度器所需完成的工作。
首先Hadoop集群式基于单服务器的,只有一个服务器节点负责调度整个集群的作业运行,主要的具体工作是切分大数据量的作业,指定哪些Worker节点做Map工作、哪些Worker节点做Reduce工作、与Worker节点通信并接受其心跳信号、作为用户的访问入口等等。其次,集群中的每个Worker节点相当于一个器官,运行着主节点所指派的具体作业。这些节点会被分为两种类型,一种是接收分块之后的作业并做映射工作。另一种是负责把前面所做的映射工作按照约定的规则做一个统计。
Task-Tracker通过运行一个简单循环来定期地发送心跳信号(heartbeat)给JobTracker.这个心跳信号会把TaskTracker是否还在存活告知JobTracker,TaskTracker通过信号指明自己是否已经准备
好运行新的任务.一旦TaskTracker已经准备好接受任务,JobTracker就会从作业优先级表中选定一个作业并分配下去.至于到底是执行Map任务还是Reduce任务,是由TaskTracker的任务槽所决定的.默认的任务调度器在处理Reduce任务之前,会优先填满空闲的Map任务槽.因此,如果TaskTracker满足存在至少一个空闲任务槽时,JobTracker会为它分配Map任务,否则为它选择一个Reduce任务.TaskTracker在运行任务的时候,第一步是从共享文件系统中把作业的JAR文件复制过来,从而实现任务文件的本地化.第二步是TaskTracker为任务新建一个本地文件夹并把作业文件解压在此目录中.第三步是由Task-Tracker新建一个TaskRunner实例来运行该任务.
Hadoop平台默认的调度方案就是JobQueueTaskScheduler,这是一种按照任务到来的时间先后顺序而执行的调度策略.这种方式比较简单,JobTracker作为主控节点,仅仅是依照作业到来的先后顺序而选择将要执行的作业.当然,这有一定的缺陷,由于Hadoop平台是默认将作业运行在整个集群上的,那么如果一个耗时非常大的作业进入执行期,将会导致其余大量作业长时间得不到运行.这种长时间运行的优先级别并不高的作业带来了严重的作业阻塞,使得整个平台的运行效率处在较低的水平.Hadoop平台对这种FIFO(First INAnd First Out)机制所给出的解决办法是调用SetJobPriority()方法,通过设置作业的权重级别来做平衡调度.
Fair Scheduler是一种“公平”调度器,它的目标是让每个用户能够公平地共享Hadoop集群计算能力.当只有一个作业运行的时候,它会得到整个集群的资源.随着提交到作业表中作业的增多,Hadoop平台会把集群中空闲出来的时间槽公平分配给每个需要执行的作业.这样即便其中某些作业需要较长时间运行,平台仍然有能力让那些短作业在合理时间内完成[3].Fair Scheduler支持资源抢占,当一个资源池在一定时段内没有得到公平共享时,它会终止该资源池所获得的过多的资源,同时把这些释放的资源让给那些资源不足的资源池.
Hadoop平台中的Capacity Scheduler是由Yahoo贡献的,在调度器上,设置了三种粒度的对象:queue,job,task.在该策略下,平台可以有多个作业队列,每个作业队列经提交后,都会获得一定数量的TaskTracker资源.具体调度流程如下.
(1)选择queue,根据资源库的使用情况从小到大排序,直到找到一个合适的job.
(2)选择job,在当前所选定的queue中,按照作业提交的时间先后以及作业的权重优先级别进行排序,选择合适的job.当然,在job选择时还需要考虑所选作业是否超出目前现有的资源上限,以及资源池中的内存是否够该job的task用等因素.
(3)选择task,根据本地节点的资源使用情况来选择合适的task.
虽然Hadoop平台自带了几种调度器,但是上述3种调度方案很难满足公司复杂的应用需求.因此作为平台的个性化使用者,往往需要开发自己的调度器.Hadoop的调度器是在JobTracker中加载和调用的,因此开发一个自定义的调度器就必须搞清楚JobTracker类文件的内部机制.作为Hadoop平台的核心组件,JobTracker监控着整个集群的作业运行情况并对资源进行管理调度.每个Task-Tracker每隔3s通过heartbeat向JobTracker汇报自己管理的机器的一些基本信息,包括内存使用量、内存的剩余量以及空闲的slot数目等等[5].一
旦JobTracker发现了空闲slot,便会调用调度器中的AssignTask方法为该TaskTracker分配task。

hadoop怎么使用算法

实例一、对以下数据进行排序,根据收入减去支出得到最后结余从大到小排序,数据如下:


SumStep运行之后结果如下:


SortStep运行之后结果为上图根据结余从大到小排序。

代码如下:
[java] view plain copy
public class InfoBean implements WritableComparable{  

private String account  

private double income  

private double expenses  

private double surplus  

public void set(String account, double income, double expenses){  
this.account = account  
this.income = income  
this.expenses = expenses  
this.surplus = income - expenses  
}  

@Override  
public String toString() {  
return this.income   " "   this.expenses   " "   this.surplus  
}  

/** 
* serialize 
*/  
public void write(DataOutput out) throws IOException {  
out.writeUTF(account)  
out.writeDouble(income)  
out.writeDouble(expenses)  
out.writeDouble(surplus)  
}  

/** 
* deserialize 
*/  
public void readFields(DataInput in) throws IOException {  
this.account = in.readUTF()  
this.income = in.readDouble()  
this.expenses = in.readDouble()  
this.surplus = in.readDouble()  
}  
public int compareTo(InfoBean o) {  
if(this.income == o.getIncome()){  
return this.expenses > o.getExpenses() ? 1 : -1   
} else {  
return this.income > o.getIncome() ? -1 : 1  
}  
}  

public String getAccount() {  
return account  
}  

public void setAccount(String account) {  
this.account = account  
}  

public double getIncome() {  
return income  
}  

public void setIncome(double income) {  
this.income = income  
}  

public double getExpenses() {  
return expenses  
}  

public void setExpenses(double expenses) {  
this.expenses = expenses  
}  

public double getSurplus() {  
return surplus  
}  

public void setSurplus(double surplus) {  
this.surplus = surplus  
}  
}  
[java] view plain copy
public class SumStep {  

public static void main(String[] args) throws Exception {  
Configuration conf = new Configuration()  
Job job = Job.getInstance(conf)  

job.setJarByClass(SumStep.class)  

job.setMapperClass(SumMapper.class)  
job.setMapOutputKeyClass(Text.class)  
job.setMapOutputValueClass(InfoBean.class)  
FileInputFormat.setInputPaths(job, new Path(args[0]))  

job.setReducerClass(SumReducer.class)  
job.setOutputKeyClass(Text.class)  
job.setOutputValueClass(InfoBean.class)  
FileOutputFormat.setOutputPath(job, new Path(args[1]))  

job.waitForCompletion(true)  
}  

public static class SumMapper extends Mapper{  

private InfoBean bean = new InfoBean()  
private Text k = new Text()  
@Override  
protected void map(LongWritable key, Text value, Context context)  
throws IOException, InterruptedException {  
// split   
String line = value.toString()  
String[] fields = line.split(" ")  
// get useful field  
String account = fields[0]  
double income = Double.parseDouble(fields[1])  
double expenses = Double.parseDouble(fields[2])  
k.set(account)  
bean.set(account, income, expenses)  
context.write(k, bean)  
}  
}  

public static class SumReducer extends Reducer{  

private InfoBean bean = new InfoBean()  
@Override  
protected void reduce(Text key, Iterable v2s, Context context)  
throws IOException, InterruptedException {  

double in_sum = 0  
double out_sum = 0  
for(InfoBean bean : v2s){  
in_sum  = bean.getIncome()  
out_sum  = bean.getExpenses()  
}  
bean.set("", in_sum, out_sum)  
context.write(key, bean)  
}  

}  
}  

此处的输入为SumStep的输出而不是源文件作为输入,当然也可以将两个job合并到一起执行,此处不再讨论。

[java] view plain copy
public class SortStep {  

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
Configuration conf = new Configuration()  
Job job = Job.getInstance(conf)  

job.setJarByClass(SortStep.class)  

job.setMapperClass(SortMapper.class)  
job.setMapOutputKeyClass(InfoBean.class)  
job.setMapOutputValueClass(NullWritable.class)  
FileInputFormat.setInputPaths(job, new Path(args[0]))  

job.setReducerClass(SortReducer.class)  
job.setOutputKeyClass(Text.class)  
job.setOutputValueClass(InfoBean.class)  
FileOutputFormat.setOutputPath(job, new Path(args[1]))  

job.waitForCompletion(true)  

}  

public static class SortMapper extends Mapper{  

private InfoBean bean = new InfoBean()  

@Override  
protected void map(LongWritable key, Text value, Context context)  
throws IOException, InterruptedException {  
String line = value.toString()  
String[] fields = line.split(" ")  
String account = fields[0]  
double income = Double.parseDouble(fields[1])  
double expenses = Double.parseDouble(fields[2])  
bean.set(account, income, expenses)  
context.write(bean, NullWritable.get())  
}  

}  
public static class SortReducer extends Reducer{  

private Text k = new Text()  
@Override  
protected void reduce(InfoBean bean, Iterable v2s, Context context)  
throws IOException, InterruptedException {  
String account = bean.getAccount()  
k.set(account)  
context.write(k, bean)  
}  

}  
}  

实例二、倒排索引,过程如下:
[plain] view plain copy
Map阶段  
<0,"hello tom">  
....  
context.write("hello->a.txt",1)  
context.write("hello->a.txt",1)  
context.write("hello->a.txt",1)  
context.write("hello->a.txt",1)  
context.write("hello->a.txt",1)  

context.write("hello->b.txt",1)  
context.write("hello->b.txt",1)  
context.write("hello->b.txt",1)  
--------------------------------------------------------  
combiner阶段  
<"hello->a.txt",1>  
<"hello->a.txt",1>  
<"hello->a.txt",1>  
<"hello->a.txt",1>  
<"hello->a.txt",1>  

<"hello->b.txt",1>  
<"hello->b.txt",1>  
<"hello->b.txt",1>  

context.write("hello","a.txt->5")  
context.write("hello","b.txt->3")  
--------------------------------------------------------  
Reducer阶段  
<"hello",{"a.txt->5","b.txt->3"}>  
context.write("hello","a.txt->5 b.txt->3")  
-------------------------------------------------------  
hello   "a.txt->5 b.txt->3"  
tom     "a.txt->2 b.txt->1"  
kitty   "a.txt->1"  
.......  
代码如下:
[java] view plain copy
public class InverseIndex {  

public static void main(String[] args) throws Exception {  
Configuration conf = new Configuration()  

Job job = Job.getInstance(conf)  
//设置jar  
job.setJarByClass(InverseIndex.class)  

//设置Mapper相关的属性  
job.setMapperClass(IndexMapper.class)  
job.setMapOutputKeyClass(Text.class)  
job.setMapOutputValueClass(Text.class)  
FileInputFormat.setInputPaths(job, new Path(args[0]))//words.txt  

//设置Reducer相关属性  
job.setReducerClass(IndexReducer.class)  
job.setOutputKeyClass(Text.class)  
job.setOutputValueClass(Text.class)  
FileOutputFormat.setOutputPath(job, new Path(args[1]))  

job.setCombinerClass(IndexCombiner.class)  

//提交任务  
job.waitForCompletion(true)  
}  
public static class IndexMapper extends Mapper{  

private Text k = new Text()  
private Text v = new Text()  
@Override  
protected void map(LongWritable key, Text value,  
Mapper.Context context)  
throws IOException, InterruptedException {  
String line = value.toString()  
String[] fields = line.split(" ")  
FileSplit inputSplit = (FileSplit) context.getInputSplit()  
Path path = inputSplit.getPath()  
String name = path.getName()  
for(String f : fields){  
k.set(f   "->"   name)  
v.set("1")  
context.write(k, v)  
}  
}  

}  
public static class IndexCombiner extends Reducer{  

private Text k = new Text()  
private Text v = new Text()  
@Override  
protected void reduce(Text key, Iterable values,  
Reducer.Context context)  
throws IOException, InterruptedException {  
String[] fields = key.toString().split("->")  
long sum = 0  
for(Text t : values){  
sum  = Long.parseLong(t.toString())  
}  
k.set(fields[0])  
v.set(fields[1]   "->"   sum)  
context.write(k, v)  
}  

}  
public static class IndexReducer extends Reducer{  

private Text v = new Text()  
@Override  
protected void reduce(Text key, Iterable values,  

最新文章