Fork me on GitHub

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class Job extends JobContextImpl implements JobContext {
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {
if (this.state == Job.JobState.DEFINE) {
this.submit();
}
// 监控一个Job对象以及实时打印进程的状态,在客户端执行分布式作业的时候,我们能够看到很多输出
//如果verbose=false,我们则看不到作业输出信息
if (verbose) {
this.monitorAndPrintJob();
} else {
// 以一定的时间间隔检查所提交的job是否执行完成
int completionPollIntervalMillis = getCompletionPollInterval(this.cluster.getConf());
while(!this.isComplete()) {
try {
Thread.sleep((long)completionPollIntervalMillis);
} catch (InterruptedException var4) {
;
}
}
}
return this.isSuccessful();
}
@Evolving
public static enum JobState {
DEFINE,
RUNNING;
}
// 客户端每5秒钟来检查作业是否完成。
// 查询的间隔可以通过mapreduce.client.completion.pollinterval属性(默认值是5000 milliseconds)进行设置。
public static int getCompletionPollInterval(Configuration conf) {
int completionPollIntervalMillis = conf.getInt("mapreduce.client.completion.pollinterval", 5000);
if (completionPollIntervalMillis < 1) {
LOG.warn("mapreduce.client.completion.pollinterval has been set to an invalid value; replacing with 5000");
completionPollIntervalMillis = 5000;
}
return completionPollIntervalMillis;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class JobStatus implements Writable, Cloneable {
public boolean isComplete() throws IOException {
this.ensureState(Job.JobState.RUNNING);
this.updateStatus();
return this.status.isJobComplete();
}
public synchronized boolean isJobComplete() {
return this.runState == JobStatus.State.SUCCEEDED || this.runState == JobStatus.State.FAILED
|| this.runState == JobStatus.State.KILLED;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
public class Job extends JobContextImpl implements JobContext {
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
this.ensureState(Job.JobState.DEFINE);
this.setUseNewAPI();
// 创建提交job的代理 ,初始化cluster对象,判断是yarn还是本地 ----->
this.connect();
final JobSubmitter submitter = this.getJobSubmitter(this.cluster.getFileSystem(), this.cluster.getClient());
//ugi是UserGroupInformation类的实例,表示Hadoop中的用户和组信息,
this.status = (JobStatus)this.ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
// 任务提交----->
return submitter.submitJobInternal(Job.this, Job.this.cluster);
}
});
// 将job的状态设置为RUNNING
this.state = Job.JobState.RUNNING;
LOG.info("The url to track the job: " + this.getTrackingURL());
}
private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException {
if (this.cluster == null) {
this.cluster = (Cluster)this.ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run() throws IOException, InterruptedException, ClassNotFoundException {
return new Cluster(Job.this.getConfiguration());
}
});
}
}
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {
// 1.检查作业的输出。如果没有指定输出目录或输出目录已经存在,作业不提交,抛出错误
this.checkSpecs(job);
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
// 创建给集群提交数据的staging目录
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 获取提交任务的当前主机的IP,并将ip、主机名等相关信息封装仅Configuration对象中
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
this.submitHostAddress = ip.getHostAddress();
this.submitHostName = ip.getHostName();
conf.set("mapreduce.job.submithostname", this.submitHostName);
conf.set("mapreduce.job.submithostaddress", this.submitHostAddress);
}
// 获取Jobid, 并创建路径
JobID jobId = this.submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
JobStatus var23;
try {
conf.set("mapreduce.job.user.name", UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set("mapreduce.job.dir", submitJobDir.toString());
TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[]{submitJobDir}, conf);
this.populateTokenCache(conf, job.getCredentials());
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance("HmacSHA1");
keyGen.init(64);
} catch (NoSuchAlgorithmException var19) {
throw new IOException("Error generating shuffle secret key", var19);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials());
}
// copy jar包到集群
this.copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// 得到map数,计算m切片,生成切片规划文件 --->
int maps = this.writeSplits(job, submitJobDir);
conf.setInt("mapreduce.job.maps", maps);
// 应用被放到yarn容量调度的默认队列中
String queue = conf.get("mapreduce.job.queuename", "default");
AccessControlList acl = this.submitClient.getQueueAdmins(queue);
conf.set(QueueManager.toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean("mapreduce.job.token.tracking.ids.enabled", false)) {
ArrayList<String> trackingIds = new ArrayList();
Iterator i$ = job.getCredentials().getAllTokens().iterator();
while(i$.hasNext()) {
Token<? extends TokenIdentifier> t = (Token)i$.next();
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings("mapreduce.job.token.tracking.ids", (String[])trackingIds.toArray(new String[trackingIds.size()]));
}
// 向stagek路径写xml配置文件
this.writeConf(conf, submitJobFile);
this.printTokens(jobId, job.getCredentials());
// 提交job, 返回提交状态
status = this.submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
if (status == null) {
throw new IOException("Could not launch job");
}
var23 = status;
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (this.jtFs != null && submitJobDir != null) {
this.jtFs.delete(submitJobDir, true);
}
}
}
return var23;
}
}
1
2
3
4
5
6
private void copyAndConfigureFiles(Job job, Path jobSubmitDir) throws IOException {
JobResourceUploader rUploader = new JobResourceUploader(jtFs);
rUploader.uploadFiles(job, jobSubmitDir);
job.getWorkingDirectory();
}
-----------------本文结束,感谢您的阅读-----------------