博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
TaskTracker发送Heartbeat以及接受HeartbeatResponse
阅读量:4648 次
发布时间:2019-06-09

本文共 1821 字,大约阅读时间需要 6 分钟。

          我们通过在命令行输入start-all.sh启动hadoop服务,该脚本通过SSH运行各个节点上的TaskTracker类的main()来启动TaskTracker,它是作为一个单独的JVM来运行的。TaskTracker类实现了MapReduce模型中的TaskTracker的功能。TaskTracker的main函数如下: 

    main(){
 
    TaskTracker tt = new  TaskTracker(); 
        tt. run(); 
.......

 }

 TaskTracker的run()方法主要调用了offerService(),其主要代码如下: 

while (running && !shuttingDown) {//TaskTracker运行期间一直执行,每隔一段时间就想JobTracker发送一次心跳包,并接受JobTracker发送的HeartbeatResponse
      try {
      //判断是否该发送下一个心跳包
        while (remaining > 0) {
...
        }
 
        // TaskTracker 刚启动:
        // 1. 验证buildVersion
        // 2. 为TaskTracker创建一个System Directory
        if(justInited) {
          String jobTrackerBV = jobClient.getBuildVersion();
        ...
          systemDirectory = new Path(dir);
          systemFS = systemDirectory.getFileSystem(fConf);
        }
//每个一段时间检查一下TaskTrack的工作目录
        now = System.currentTimeMillis();
        if (now > (lastCheckDirsTime + diskHealthCheckInterval)) {
          localStorage.checkDirs();
          lastCheckDirsTime = now;
          int numFailures = localStorage.numFailures();
          // Re-init the task tracker if there were any new failures
          if (numFailures > lastNumFailures) {
            lastNumFailures = numFailures;
            return State.STALE;
          }
        }
 
        //发送心跳包,并向JobTracker请求Task
        HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
 
...
 
//从心跳包的响应中获取它要执行的任务
        TaskTrackerAction[] actions = heartbeatResponse.getActions();
 
//将相应的Task放入到特定的Queue
 if (actions != null){ 
          for(TaskTrackerAction action: actions) {
//将要运行一个Task
            if (action instanceof LaunchTaskAction) {
              addToTaskQueue((LaunchTaskAction)action);
            } else if (action instanceof CommitTaskAction) {//
              CommitTaskAction commitAction = (CommitTaskAction)action;
              if (!commitResponses.contains(commitAction.getTaskID())) {
                LOG.info("Received commit task action for " + 
                          commitAction.getTaskID());
                commitResponses.add(commitAction.getTaskID());
              }
            } else {//待清理的Task
              tasksToCleanup.put(action);
            }
          }
        }
} 

转载于:https://www.cnblogs.com/yueliming/archive/2013/04/09/3009869.html

你可能感兴趣的文章
取字符串以逗号隔开的两个值
查看>>
C#方法名前的方括号是干嘛用的呀?
查看>>
Ubuntu apt-get和pip国内源更换
查看>>
挖掘频繁模式、关联和相关性:基本概念和方法
查看>>
单向链表的简单Java实现-sunziren
查看>>
带有参数的装饰器
查看>>
C++的decltype
查看>>
167. Two Sum II - Input array is sorted
查看>>
Flex整合Spring
查看>>
深度学习入门篇--手把手教你用 TensorFlow 训练模型
查看>>
07-异常处理——动手动脑
查看>>
css3
查看>>
三和韓長庚 著 易學原理總論 對讀 021_040
查看>>
待字闺中之兄弟数字分析
查看>>
C与C++中const差别
查看>>
Spring学习--实现 FactoryBean 接口在 Spring IOC 容器中配置 Bean
查看>>
key-value 多线程server的Linux C++实现
查看>>
collections
查看>>
Amazon S3数据一致性模型
查看>>
三框架:使用数据源dbcp注意
查看>>