栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 软件开发 > 后端开发 > Java

分布式定时任务框架 xxl-job 源码解析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

分布式定时任务框架 xxl-job 源码解析

xxl-job 架构

xxl-job 总体可以分成三部分:执行器管理,任务管理,调度中心等。

  • 执行器管理:这一部分主要是用来管理任务执行器,比如一个任务是交个哪个节点(机器)进行执行,这个执行任务的节点可以称作是执行器。
  • 任务管理:这一部分主要是业务处理逻辑,一般会由 @XXLJob 注解修饰。
  • 调用中心:将某个任务路由(分配)到某个或某些执行器上执行。

从上述架构可以看出,所有的任务都被调度中心统一管理,什么时候执行,交个哪个节点执行都是调度中心来决定。所以这里就需要执行器和调度中心进行一定的远程通信,xxl-job 通过 Netty 实现了一套自己的远程通信协议。

这里的执行器就相当于是客户端,然后调度中心相当于是服务端,所以看源码就可以抓住这点一点点切入即可。

源码解析

先看配置文件 XxlJobAdminConfig 这个相当于是客户端入口,源码如下:

@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {

    private XxlJobScheduler xxlJobScheduler;

    @Override
    public void afterPropertiesSet() throws Exception {
        adminConfig = this;

        xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
    }
}

在代码启动时,这个类会被加载,并且会回调 afterPropertiesSet() 方法,然后进入 XxlJobScheduler 类的 init() 方法,代码如下:

public class XxlJobScheduler  {
    private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);


    public void init() throws Exception {
        // 国际化处理
        initI18n();

        // 只是创建了两个备用的线程池 fastTriggerPool、slowTriggerPool 后面会使用到
        // fastTriggerPool 主要用来执行业务 handler
        // slowTriggerPool 当在 fastTriggerPool 线程池中分配线程执行的超时次数超过了 10 次
        // 就会交给这个慢线程池处理,减轻 fastTriggerPool 线程池的工作压力
        JobTriggerPoolHelper.toStart();

        // 心跳检测监控,把 xxl_job_registry 中 update_time 字段更新成最新当前时间
        // 然后再更新表 xxl_job_group 中的 addressList 字段
        // 如果 update_time 字段超过了 90s 没有更新的话,就把对应的 jobHandler 踢出
        // 相当于检测到此时的执行器挂了
        JobRegistryHelper.getInstance().start();

        // 每次捞取出 1000 条失败的任务,然后发送通知,比如邮件通知等
        JobFailMonitorHelper.getInstance().start();

        // admin lose-monitor run ( depend on JobTriggerPoolHelper )
        JobCompleteHelper.getInstance().start();

        // admin log report start
        JobLogReportHelper.getInstance().start();
		
		// xxl-job 的核心处理逻辑
        // 分配一个线程,循环捞取哪些要执行的任务,然后路由到对应的执行器执行
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    }
}

然后进入 JobScheduleHelper.getInstance().start() 代码如下:

public class JobScheduleHelper {

    private static JobScheduleHelper instance = new JobScheduleHelper();
    public static JobScheduleHelper getInstance(){
        return instance;
    }

    public static final long PRE_READ_MS = 5000;    // pre read

    private Thread scheduleThread;
    private Thread ringThread;
    private volatile boolean scheduleThreadToStop = false;
    private volatile boolean ringThreadToStop = false;
    // 用来存放多少秒后需要执行的任务
    private volatile static Map> ringData = new ConcurrentHashMap<>();

    public void start(){
        // schedule thread
        scheduleThread = new Thread(new Runnable() {
            @Override
            public void run() {
            
                try {
  					// 启动线程延迟 5s 后开始执行
                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                } catch (InterruptedException e) {
                    if (!scheduleThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }

                // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

                while (!scheduleThreadToStop) {

                    // 开始去扫描 job
                    long start = System.currentTimeMillis();

                    Connection conn = null;
                    Boolean connAutoCommit = null;
                    PreparedStatement preparedStatement = null;

                    boolean preReadSuc = true;
                    try {

                        conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
                        conn.setAutoCommit(false);

                        // 这里加锁,从侧面说明了这个调度中心也可以实现高可用,并且已经加了悲观锁保证数据安全性
                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                        preparedStatement.execute();

						// 事物开启,并且设置成手动提交
                        // 1、预读 xxl_job_info 数据,在 nextTriggerTime 的基础上后推迟 5s,提前做好准别
                        // 2、这里讲返回的数据分成三部分 |---5---10---15---|
                        long nowTime = System.currentTimeMillis();
                        // 假设现在 nowTime=10,那么现在这条语句会查询出 <= 15s 的数据
                        List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                        if (scheduleList!=null && scheduleList.size( )>0) {
      
                            // 循环处理查询出的 job—info 数据
                            for (XxlJobInfo jobInfo: scheduleList) {

                                // 当前时间已经超过了下次执行时间+5s,说明这次是不能执行的
                                // 因为已经超过了 5s,我们捞取的数据只是在当前时间+5s之内的数据
                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                    // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                    logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                                    // 1、misfire match
                                    MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                    if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                        // FIRE_ONCE_NOW 》 trigger
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                    }

                                    // 2、刷新下次执行的时间
                                    refreshNextValidTime(jobInfo, new Date());

                                } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                   // 当前时间大于了下次执行时间,并且小于下次执行时间+5s,说明肯定要执行这个业务逻辑了

                                    // 1、开始发执行路由规则定位到具体的节点执行
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);

                                    // 2、执行完之后刷新下次执行的时间
                                    refreshNextValidTime(jobInfo, new Date());

                                    // next-trigger-time in 5s, pre-read again
                                    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

                                        // 1、make ring second
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                        // 2、push time ring
                                        pushTimeRing(ringSecond, jobInfo.getId());

                                        // 3、fresh next
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                    }

                                } else {
                                    // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

                                    // 1、make ring second
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                    // 2、push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());

                                    // 3、fresh next
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                }

                            }

                            // 3、update trigger info
                            for (XxlJobInfo jobInfo: scheduleList) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                            }

                        } else {
                            preReadSuc = false;
                        }

                        // tx stop


                    } catch (Exception e) {
                        if (!scheduleThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                        }
                    } finally {
                         conn.commit();
                    }
                    long cost = System.currentTimeMillis()-start;

                    // Wait seconds, align second
                    if (cost < 1000) {  // scan-overtime, not wait
                        try {
                            // pre-read period: success > scan each second; fail > skip this period;
                            TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                        } catch (InterruptedException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }
                }
            }
        });
        scheduleThread.setDaemon(true);
        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
        scheduleThread.start();


        // 这个循环线程,执行由上述线程添加到 ringItemData map 中的数据
        ringThread = new Thread(new Runnable() {
            @Override
            public void run() {

                while (!ringThreadToStop) {

                    // align second
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                    } catch (InterruptedException e) {
                        if (!ringThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                    try {
                        // second data
                        List ringItemData = new ArrayList<>();
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                        for (int i = 0; i < 2; i++) {
                            List tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if (tmpData != null) {
                                ringItemData.addAll(tmpData);
                            }
                        }

                        // ring trigger
                        if (ringItemData.size() > 0) {
                            // do trigger
                            for (int jobId: ringItemData) {
                                // do trigger
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                            }
                            // clear
                            ringItemData.clear();
                        }
                    } catch (Exception e) {
                        if (!ringThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                        }
                    }
                }
            }
        });
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();
    }
}

进入 trigger 内部,代码如下:

    public static void trigger(int jobId,
                              TriggerTypeEnum triggerType,
                              int failRetryCount,
                              String executorShardingParam,
                              String executorParam,
                              String addressList) {

       // 查询 xxl_job_info 中的数据
       XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
       if (jobInfo == null) {
           logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
           return;
       }
       // 判断是否带指定的参数,如果有参数,就赋值到 ExecutorParam 字段
       if (executorParam != null) {
           jobInfo.setExecutorParam(executorParam);
       }
       // 重试次数
       int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
       XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

       // 如果是手动传入了执行器地址列表,就直接使用传入的 addressList
       if (addressList!=null && addressList.trim().length()>0) {
           // 如果是手动注入 addressType=1,如果是自动注入 addressType=0,具体可以查看类 XxlJobGroup
           group.setAddressType(1);
           group.setAddressList(addressList.trim());
       }

       //  分片参数
       int[] shardingParam = null;
       if (executorShardingParam!=null){
           // 可以查看 executorShardingParam 从哪里传进来的,发现只有一个地方,那就是失败监控线程会重试
           // 然后这里就要开始进行解析这些分片参数,正常运行这里都不会进来的,外部调用这里 executorShardingParam 参数都是 null
           String[] shardingArr = executorShardingParam.split("/");
           if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
               shardingParam = new int[2];
               shardingParam[0] = Integer.valueOf(shardingArr[0]);
               shardingParam[1] = Integer.valueOf(shardingArr[1]);
           }
       }
       // 分配类型为广播模式:
       //  1.执行器 addressList 地址列表不能为空,因为分配要路由到指定的执行器上
       if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
               && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
               && shardingParam==null) {
           // 获取到了所有的执行器地址列表,然后开始轮询调用
           for (int i = 0; i < group.getRegistryList().size(); i++) {
               processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
           }
       } else {
           if (shardingParam == null) {
               shardingParam = new int[]{0, 1};
           }
           processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
       }

   }

在进入 processTrigger() 内部

        // 4、trigger remote executor
        ReturnT triggerResult = null;
        if (address != null) {
            triggerResult = runExecutor(triggerParam, address);
        } else {
            triggerResult = new ReturnT(ReturnT.FAIL_CODE, null);
        }

在进入 runExecutor() 方法,代码如下:

    public static ReturnT runExecutor(TriggerParam triggerParam, String address){
        ReturnT runResult = null;
        try {
        	// 创建可以调用远程服务的客户端对象
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
            runResult = executorBiz.run(triggerParam);
        } catch (Exception e) {
            logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
            runResult = new ReturnT(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
        }

        StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
        runResultSB.append("
address:").append(address); runResultSB.append("
code:").append(runResult.getCode()); runResultSB.append("
msg:").append(runResult.getMsg()); runResult.setMsg(runResultSB.toString()); return runResult; }

进入 run() 方法,如下:

public class ExecutorBizClient implements ExecutorBiz {

    public ExecutorBizClient() {
    }
    public ExecutorBizClient(String addressUrl, String accessToken) {
        this.addressUrl = addressUrl;
        this.accessToken = accessToken;

        // valid
        if (!this.addressUrl.endsWith("/")) {
            this.addressUrl = this.addressUrl + "/";
        }
    }

    private String addressUrl ;
    private String accessToken;
    private int timeout = 3;


    @Override
    public ReturnT beat() {
        return XxlJobRemotingUtil.postBody(addressUrl+"beat", accessToken, timeout, "", String.class);
    }

    @Override
    public ReturnT idleBeat(IdleBeatParam idleBeatParam){
        return XxlJobRemotingUtil.postBody(addressUrl+"idleBeat", accessToken, timeout, idleBeatParam, String.class);
    }

    @Override
    public ReturnT run(TriggerParam triggerParam) {
        System.out.println("ExecutorBizClient.....走了 run 类型的 client 请求.....");
        return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
    }

    @Override
    public ReturnT kill(KillParam killParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "kill", accessToken, timeout, killParam, String.class);
    }

    @Override
    public ReturnT log(LogParam logParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "log", accessToken, timeout, logParam, LogResult.class);
    }
}

进入 postBody() 方法,代码如下:

   public static ReturnT postBody(String url, String accessToken, int timeout, Object requestObj, Class returnTargClassOfT) {
        System.out.println("---------->url="+url+",requestObj="+requestObj+",returnTargClassOfT="+returnTargClassOfT);
        LocalDateTime now = LocalDateTime.now();
        System.out.println(Thread.currentThread().getName()+",now="+now+"url="+url+"requestObj = " + requestObj+",returnTargClassOfT="+returnTargClassOfT);
        HttpURLConnection connection = null;
        BufferedReader bufferedReader = null;
        try {
            // connection
            URL realUrl = new URL(url);
            connection = (HttpURLConnection) realUrl.openConnection();

            // trust-https
            boolean useHttps = url.startsWith("https");
            if (useHttps) {
                HttpsURLConnection https = (HttpsURLConnection) connection;
                trustAllHosts(https);
            }

            // connection setting
            connection.setRequestMethod("POST");
            connection.setDoOutput(true);
            connection.setDoInput(true);
            connection.setUseCaches(false);
            connection.setReadTimeout(timeout * 1000);
            connection.setConnectTimeout(3 * 1000);
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
            connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");

            if(accessToken!=null && accessToken.trim().length()>0){
                connection.setRequestProperty(XXL_JOB_ACCESS_TOKEN, accessToken);
            }

            // do connection
            connection.connect();

            // write requestBody
            if (requestObj != null) {
            	// 封装数据,requestObj 就是下面的图示 triggerParam 参数
                String requestBody = GsonTool.toJson(requestObj);

                DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
                dataOutputStream.write(requestBody.getBytes("UTF-8"));
                dataOutputStream.flush();
                dataOutputStream.close();
            }

            

            // valid StatusCode
            int statusCode = connection.getResponseCode();
            if (statusCode != 200) {
                return new ReturnT(ReturnT.FAIL_CODE, "xxl-job remoting fail, StatusCode("+ statusCode +") invalid. for url : " + url);
            }

            // result
            bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
            StringBuilder result = new StringBuilder();
            String line;
            while ((line = bufferedReader.readLine()) != null) {
                result.append(line);
            }
            String resultJson = result.toString();
            System.out.println("读取到服务端返回的结果是 result="+resultJson);

            // parse returnT
            try {
                ReturnT returnT = GsonTool.fromJson(resultJson, ReturnT.class, returnTargClassOfT);
                return returnT;
            } catch (Exception e) {
                logger.error("xxl-job remoting (url="+url+") response content invalid("+ resultJson +").", e);
                return new ReturnT(ReturnT.FAIL_CODE, "xxl-job remoting (url="+url+") response content invalid("+ resultJson +").");
            }

        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return new ReturnT(ReturnT.FAIL_CODE, "xxl-job remoting error("+ e.getMessage() +"), for url : " + url);
        } finally {
            try {
                if (bufferedReader != null) {
                    bufferedReader.close();
                }
                if (connection != null) {
                    connection.disconnect();
                }
            } catch (Exception e2) {
                logger.error(e2.getMessage(), e2);
            }
        }
    }

这里封装的请求参数包括了这个非常中的 triggerParam,如图示:

这里开始发送客户端请求,也就是这里开始和服务端开始建立通信的,然后服务端接受代码如下:

public class EmbedServer {
    private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);

        @Override
        protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
            System.out.println("开始执行 channelRead0() 方法.............msg="+msg);
            // request parse
            //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
            String requestData = msg.content().toString(CharsetUtil.UTF_8);
            String uri = msg.uri();
            HttpMethod httpMethod = msg.method();
            boolean keepAlive = HttpUtil.isKeepAlive(msg);
            String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);

            // invoke
            bizThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    // 开始收到调度中心发送过来的请求,执行对象的 handler
                    Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);

                    // to json
                    String responseJson = GsonTool.toJson(responseObj);

                    // 把执行结果 responseJson 发送给客户端(调度中心)
                    writeResponse(ctx, keepAlive, responseJson);
                }
            });
        }
}

然后调用 process() 方法,代码如下:

        private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
            // valid
            if (HttpMethod.POST != httpMethod) {
                return new ReturnT(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
            }
            if (uri == null || uri.trim().length() == 0) {
                return new ReturnT(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
            }
            if (accessToken != null
                    && accessToken.trim().length() > 0
                    && !accessToken.equals(accessTokenReq)) {
                return new ReturnT(ReturnT.FAIL_CODE, "The access token is wrong.");
            }

            // services mapping
            try {
                switch (uri) {
                    case "/beat":
                        return executorBiz.beat();
                    case "/idleBeat":
                        IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                        return executorBiz.idleBeat(idleBeatParam);
                    case "/run":
                        TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                        System.out.println("channelRead0() 方法收到客户端请求,接收到的请求参数是 triggerParam="+triggerParam);
                        return executorBiz.run(triggerParam);
                    case "/kill":
                        KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                        return executorBiz.kill(killParam);
                    case "/log":
                        LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                        return executorBiz.log(logParam);
                    default:
                        return new ReturnT(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
            }
        }

计入到服务端 run() 方法,代码如下:

    @Override
    public ReturnT run(TriggerParam triggerParam) {
        // load old:jobHandler + jobThread
        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
		// 省略了所有的判断条件,然后进入到 registJobThread() 方法
        // 开启线程开始异步执行客户端请求
        if (jobThread == null) {
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }

        // 把客户端发送过来的参数存放到阻塞队列中 LinkedBlockingQueue triggerQueue;
        ReturnT pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;
    }

然后进入到 registJobThread() 方法,代码如下:

    public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
        JobThread newJobThread = new JobThread(jobId, handler);
        // 开启了一个线程,那么肯定会执行 JobThread 类里面的 run 方法
        newJobThread.start();
        logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});

        JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!
        if (oldJobThread != null) {
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
        }

        return newJobThread;
    }

进入到 JobThread 的 run() 方法,代码如下:

    @Override
	public void run() {

    	// init
    	try {
			handler.init();
		} catch (Throwable e) {
    		logger.error(e.getMessage(), e);
		}

		// 这里可以看出来一个线程在这里死循环,这样写主要是复用这个线程
		while(!toStop){
			running = false;
			// 记录这个线程在这里面空转了几次
			// 如果空转了 30 次,那么就会把这个线程直接终结掉
			idleTimes++;

            TriggerParam triggerParam = null;
            try {
				// 直接从阻塞队列中取出 triggerParam 任务参数,然后开始执行对应业务处理器
				triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
				if (triggerParam!=null) {
					running = true;
					idleTimes = 0;
					triggerLogIdSet.remove(triggerParam.getLogId());

					// log filename, like "logPath/yyyy-MM-dd/9999.log"
					String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
					XxlJobContext xxlJobContext = new XxlJobContext(
							triggerParam.getJobId(),
							triggerParam.getExecutorParams(),
							logFileName,
							triggerParam.getBroadcastIndex(),
							triggerParam.getBroadcastTotal());

					// init job context
					XxlJobContext.setXxlJobContext(xxlJobContext);

					// execute
					XxlJobHelper.log("
----------- xxl-job job execute start -----------
----------- Param:" + xxlJobContext.getJobParam()); // 如果设置在 admin 管理页面中设置了执行器超时时间处理,就走下面这个 if 逻辑 if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; try { // 使用 Future 框架实现超时机制 FutureTask futureTask = new FutureTask(new Callable() { @Override public Boolean call() throws Exception { // init job context XxlJobContext.setXxlJobContext(xxlJobContext); handler.execute(); return true; } }); futureThread = new Thread(futureTask); futureThread.start(); // 调用 get(Timeout) 方法,在指定超时间没有返回结果直接抛出异常 Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS); } catch (TimeoutException e) { // 捕获 get() 超时抛出异常 XxlJobHelper.log("
----------- xxl-job job execute timeout"); XxlJobHelper.log(e); // handle result XxlJobHelper.handleTimeout("job execute timeout "); } finally { // 并且中断在 Callback 中开启的 futureThread 该线程 futureThread.interrupt(); } } else { // 最终最终调用我们编写的业务执行器,其实就是调用我们之前的定时器而已 handler.execute(); } // valid execute handle data if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) { XxlJobHelper.handleFail("job handle result lost."); } else { String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg(); tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000) ?tempHandleMsg.substring(0, 50000).concat("...") :tempHandleMsg; XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg); } XxlJobHelper.log("
----------- xxl-job job execute end(finish) -----------
----------- Result: handleCode=" + XxlJobContext.getXxlJobContext().getHandleCode() + ", handleMsg = " + XxlJobContext.getXxlJobContext().getHandleMsg() ); } else { // 超过30次空转,就把该线程中断 if (idleTimes > 30) { if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); } } } } catch (Throwable e) { if (toStop) { XxlJobHelper.log("
----------- JobThread toStop, stopReason:" + stopReason); } // handle result StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String errorMsg = stringWriter.toString(); XxlJobHelper.handleFail(errorMsg); XxlJobHelper.log("
----------- JobThread Exception:" + errorMsg + "
----------- xxl-job job execute end(error) -----------"); } finally { if(triggerParam != null) { // callback handler info if (!toStop) { // 把 jobHandler 执行完之后的结果返回客户端(调度中心) TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.getXxlJobContext().getHandleCode(), XxlJobContext.getXxlJobContext().getHandleMsg() ) ); } else { // is killed TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.HANDLE_CODE_FAIL, stopReason + " [job running, killed]" ) ); } } } } // callback trigger request in queue while(triggerQueue !=null && triggerQueue.size()>0){ TriggerParam triggerParam = triggerQueue.poll(); if (triggerParam!=null) { // is killed TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.HANDLE_CODE_FAIL, stopReason + " [job not executed, in the job queue, killed.]") ); } } // destroy try { handler.destroy(); } catch (Throwable e) { logger.error(e.getMessage(), e); } logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); }

进入 TriggerCallbackThread 回调线程,代码如下:

  public void start() {

        // valid
        if (XxlJobExecutor.getAdminBizList() == null) {
            logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
            return;
        }

        // callback
        triggerCallbackThread = new Thread(new Runnable() {

            @Override
            public void run() {

                // normal callback
                while(!toStop){
                    try {
                    	// 取出阻塞队列中的值
                        HandleCallbackParam callback = getInstance().callBackQueue.take();
                        if (callback != null) {

                            // callback list param
                            List callbackParamList = new ArrayList();
                            int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                            callbackParamList.add(callback);

                            // callback, will retry if error
                            if (callbackParamList!=null && callbackParamList.size()>0) {
                                doCallback(callbackParamList);
                            }
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }

                // last callback
                try {
                    List callbackParamList = new ArrayList();
                    int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                    if (callbackParamList!=null && callbackParamList.size()>0) {
                    	// 实际底层调用
                        doCallback(callbackParamList);
                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy.");

            }
        });
        triggerCallbackThread.setDaemon(true);
        triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
        triggerCallbackThread.start();


        // retry
        triggerRetryCallbackThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while(!toStop){
                    try {
                        retryFailCallbackFile();
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }

                    }
                    try {
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy.");
            }
        });
        triggerRetryCallbackThread.setDaemon(true);
        triggerRetryCallbackThread.start();

    }

进入 doCallBack() 方法内部

    private void doCallback(List callbackParamList){
        // 回调线程执行完 handler 的结果
        boolean callbackRet = false;
        // callback, will retry if error
        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
            try {
                ReturnT callbackResult = adminBiz.callback(callbackParamList);
                if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
                    callbackLog(callbackParamList, "
----------- xxl-job job callback finish."); callbackRet = true; break; } else { callbackLog(callbackParamList, "
----------- xxl-job job callback fail, callbackResult:" + callbackResult); } } catch (Exception e) { callbackLog(callbackParamList, "
----------- xxl-job job callback error, errorMsg:" + e.getMessage()); } } if (!callbackRet) { appendFailCallbackFile(callbackParamList); } }
转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/1039547.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号