xxl-job 总体可以分成三部分:执行器管理,任务管理,调度中心等。
- 执行器管理:这一部分主要是用来管理任务执行器,比如一个任务是交个哪个节点(机器)进行执行,这个执行任务的节点可以称作是执行器。
- 任务管理:这一部分主要是业务处理逻辑,一般会由 @XXLJob 注解修饰。
- 调用中心:将某个任务路由(分配)到某个或某些执行器上执行。
从上述架构可以看出,所有的任务都被调度中心统一管理,什么时候执行,交个哪个节点执行都是调度中心来决定。所以这里就需要执行器和调度中心进行一定的远程通信,xxl-job 通过 Netty 实现了一套自己的远程通信协议。
这里的执行器就相当于是客户端,然后调度中心相当于是服务端,所以看源码就可以抓住这点一点点切入即可。
源码解析先看配置文件 XxlJobAdminConfig 这个相当于是客户端入口,源码如下:
@Component public class XxlJobAdminConfig implements InitializingBean, DisposableBean { private XxlJobScheduler xxlJobScheduler; @Override public void afterPropertiesSet() throws Exception { adminConfig = this; xxlJobScheduler = new XxlJobScheduler(); xxlJobScheduler.init(); } }
在代码启动时,这个类会被加载,并且会回调 afterPropertiesSet() 方法,然后进入 XxlJobScheduler 类的 init() 方法,代码如下:
public class XxlJobScheduler { private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class); public void init() throws Exception { // 国际化处理 initI18n(); // 只是创建了两个备用的线程池 fastTriggerPool、slowTriggerPool 后面会使用到 // fastTriggerPool 主要用来执行业务 handler // slowTriggerPool 当在 fastTriggerPool 线程池中分配线程执行的超时次数超过了 10 次 // 就会交给这个慢线程池处理,减轻 fastTriggerPool 线程池的工作压力 JobTriggerPoolHelper.toStart(); // 心跳检测监控,把 xxl_job_registry 中 update_time 字段更新成最新当前时间 // 然后再更新表 xxl_job_group 中的 addressList 字段 // 如果 update_time 字段超过了 90s 没有更新的话,就把对应的 jobHandler 踢出 // 相当于检测到此时的执行器挂了 JobRegistryHelper.getInstance().start(); // 每次捞取出 1000 条失败的任务,然后发送通知,比如邮件通知等 JobFailMonitorHelper.getInstance().start(); // admin lose-monitor run ( depend on JobTriggerPoolHelper ) JobCompleteHelper.getInstance().start(); // admin log report start JobLogReportHelper.getInstance().start(); // xxl-job 的核心处理逻辑 // 分配一个线程,循环捞取哪些要执行的任务,然后路由到对应的执行器执行 JobScheduleHelper.getInstance().start(); logger.info(">>>>>>>>> init xxl-job admin success."); } }
然后进入 JobScheduleHelper.getInstance().start() 代码如下:
public class JobScheduleHelper { private static JobScheduleHelper instance = new JobScheduleHelper(); public static JobScheduleHelper getInstance(){ return instance; } public static final long PRE_READ_MS = 5000; // pre read private Thread scheduleThread; private Thread ringThread; private volatile boolean scheduleThreadToStop = false; private volatile boolean ringThreadToStop = false; // 用来存放多少秒后需要执行的任务 private volatile static Map> ringData = new ConcurrentHashMap<>(); public void start(){ // schedule thread scheduleThread = new Thread(new Runnable() { @Override public void run() { try { // 启动线程延迟 5s 后开始执行 TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 ); } catch (InterruptedException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20) int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20; while (!scheduleThreadToStop) { // 开始去扫描 job long start = System.currentTimeMillis(); Connection conn = null; Boolean connAutoCommit = null; PreparedStatement preparedStatement = null; boolean preReadSuc = true; try { conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); connAutoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); // 这里加锁,从侧面说明了这个调度中心也可以实现高可用,并且已经加了悲观锁保证数据安全性 preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute(); // 事物开启,并且设置成手动提交 // 1、预读 xxl_job_info 数据,在 nextTriggerTime 的基础上后推迟 5s,提前做好准别 // 2、这里讲返回的数据分成三部分 |---5---10---15---| long nowTime = System.currentTimeMillis(); // 假设现在 nowTime=10,那么现在这条语句会查询出 <= 15s 的数据 List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); if (scheduleList!=null && scheduleList.size( )>0) { // 循环处理查询出的 job—info 数据 for (XxlJobInfo jobInfo: scheduleList) { // 当前时间已经超过了下次执行时间+5s,说明这次是不能执行的 // 因为已经超过了 5s,我们捞取的数据只是在当前时间+5s之内的数据 if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { // 2.1、trigger-expire > 5s:pass && make next-trigger-time logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()); // 1、misfire match MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING); if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) { // FIRE_ONCE_NOW 》 trigger JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); } // 2、刷新下次执行的时间 refreshNextValidTime(jobInfo, new Date()); } else if (nowTime > jobInfo.getTriggerNextTime()) { // 当前时间大于了下次执行时间,并且小于下次执行时间+5s,说明肯定要执行这个业务逻辑了 // 1、开始发执行路由规则定位到具体的节点执行 JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null); // 2、执行完之后刷新下次执行的时间 refreshNextValidTime(jobInfo, new Date()); // next-trigger-time in 5s, pre-read again if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { // 1、make ring second int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、push time ring pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); } } else { // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time // 1、make ring second int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、push time ring pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); } } // 3、update trigger info for (XxlJobInfo jobInfo: scheduleList) { XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); } } else { preReadSuc = false; } // tx stop } catch (Exception e) { if (!scheduleThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e); } } finally { conn.commit(); } long cost = System.currentTimeMillis()-start; // Wait seconds, align second if (cost < 1000) { // scan-overtime, not wait try { // pre-read period: success > scan each second; fail > skip this period; TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000); } catch (InterruptedException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } } } }); scheduleThread.setDaemon(true); scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread"); scheduleThread.start(); // 这个循环线程,执行由上述线程添加到 ringItemData map 中的数据 ringThread = new Thread(new Runnable() { @Override public void run() { while (!ringThreadToStop) { // align second try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000); } catch (InterruptedException e) { if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } try { // second data List ringItemData = new ArrayList<>(); int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度; for (int i = 0; i < 2; i++) { List tmpData = ringData.remove( (nowSecond+60-i)%60 ); if (tmpData != null) { ringItemData.addAll(tmpData); } } // ring trigger if (ringItemData.size() > 0) { // do trigger for (int jobId: ringItemData) { // do trigger JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); } // clear ringItemData.clear(); } } catch (Exception e) { if (!ringThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e); } } } } }); ringThread.setDaemon(true); ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread"); ringThread.start(); } }
进入 trigger 内部,代码如下:
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) { // 查询 xxl_job_info 中的数据 XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId); if (jobInfo == null) { logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId); return; } // 判断是否带指定的参数,如果有参数,就赋值到 ExecutorParam 字段 if (executorParam != null) { jobInfo.setExecutorParam(executorParam); } // 重试次数 int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount(); XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup()); // 如果是手动传入了执行器地址列表,就直接使用传入的 addressList if (addressList!=null && addressList.trim().length()>0) { // 如果是手动注入 addressType=1,如果是自动注入 addressType=0,具体可以查看类 XxlJobGroup group.setAddressType(1); group.setAddressList(addressList.trim()); } // 分片参数 int[] shardingParam = null; if (executorShardingParam!=null){ // 可以查看 executorShardingParam 从哪里传进来的,发现只有一个地方,那就是失败监控线程会重试 // 然后这里就要开始进行解析这些分片参数,正常运行这里都不会进来的,外部调用这里 executorShardingParam 参数都是 null String[] shardingArr = executorShardingParam.split("/"); if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) { shardingParam = new int[2]; shardingParam[0] = Integer.valueOf(shardingArr[0]); shardingParam[1] = Integer.valueOf(shardingArr[1]); } } // 分配类型为广播模式: // 1.执行器 addressList 地址列表不能为空,因为分配要路由到指定的执行器上 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList()!=null && !group.getRegistryList().isEmpty() && shardingParam==null) { // 获取到了所有的执行器地址列表,然后开始轮询调用 for (int i = 0; i < group.getRegistryList().size(); i++) { processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); } } else { if (shardingParam == null) { shardingParam = new int[]{0, 1}; } processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]); } }
在进入 processTrigger() 内部
// 4、trigger remote executor ReturnTtriggerResult = null; if (address != null) { triggerResult = runExecutor(triggerParam, address); } else { triggerResult = new ReturnT (ReturnT.FAIL_CODE, null); }
在进入 runExecutor() 方法,代码如下:
public static ReturnTrunExecutor(TriggerParam triggerParam, String address){ ReturnT runResult = null; try { // 创建可以调用远程服务的客户端对象 ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address); runResult = executorBiz.run(triggerParam); } catch (Exception e) { logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e); runResult = new ReturnT (ReturnT.FAIL_CODE, ThrowableUtil.toString(e)); } StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":"); runResultSB.append("
address:").append(address); runResultSB.append("
code:").append(runResult.getCode()); runResultSB.append("
msg:").append(runResult.getMsg()); runResult.setMsg(runResultSB.toString()); return runResult; }
进入 run() 方法,如下:
public class ExecutorBizClient implements ExecutorBiz { public ExecutorBizClient() { } public ExecutorBizClient(String addressUrl, String accessToken) { this.addressUrl = addressUrl; this.accessToken = accessToken; // valid if (!this.addressUrl.endsWith("/")) { this.addressUrl = this.addressUrl + "/"; } } private String addressUrl ; private String accessToken; private int timeout = 3; @Override public ReturnTbeat() { return XxlJobRemotingUtil.postBody(addressUrl+"beat", accessToken, timeout, "", String.class); } @Override public ReturnT idleBeat(IdleBeatParam idleBeatParam){ return XxlJobRemotingUtil.postBody(addressUrl+"idleBeat", accessToken, timeout, idleBeatParam, String.class); } @Override public ReturnT run(TriggerParam triggerParam) { System.out.println("ExecutorBizClient.....走了 run 类型的 client 请求....."); return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class); } @Override public ReturnT kill(KillParam killParam) { return XxlJobRemotingUtil.postBody(addressUrl + "kill", accessToken, timeout, killParam, String.class); } @Override public ReturnT log(LogParam logParam) { return XxlJobRemotingUtil.postBody(addressUrl + "log", accessToken, timeout, logParam, LogResult.class); } }
进入 postBody() 方法,代码如下:
public static ReturnT postBody(String url, String accessToken, int timeout, Object requestObj, Class returnTargClassOfT) { System.out.println("---------->url="+url+",requestObj="+requestObj+",returnTargClassOfT="+returnTargClassOfT); LocalDateTime now = LocalDateTime.now(); System.out.println(Thread.currentThread().getName()+",now="+now+"url="+url+"requestObj = " + requestObj+",returnTargClassOfT="+returnTargClassOfT); HttpURLConnection connection = null; BufferedReader bufferedReader = null; try { // connection URL realUrl = new URL(url); connection = (HttpURLConnection) realUrl.openConnection(); // trust-https boolean useHttps = url.startsWith("https"); if (useHttps) { HttpsURLConnection https = (HttpsURLConnection) connection; trustAllHosts(https); } // connection setting connection.setRequestMethod("POST"); connection.setDoOutput(true); connection.setDoInput(true); connection.setUseCaches(false); connection.setReadTimeout(timeout * 1000); connection.setConnectTimeout(3 * 1000); connection.setRequestProperty("connection", "Keep-Alive"); connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8"); if(accessToken!=null && accessToken.trim().length()>0){ connection.setRequestProperty(XXL_JOB_ACCESS_TOKEN, accessToken); } // do connection connection.connect(); // write requestBody if (requestObj != null) { // 封装数据,requestObj 就是下面的图示 triggerParam 参数 String requestBody = GsonTool.toJson(requestObj); DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); dataOutputStream.write(requestBody.getBytes("UTF-8")); dataOutputStream.flush(); dataOutputStream.close(); } // valid StatusCode int statusCode = connection.getResponseCode(); if (statusCode != 200) { return new ReturnT(ReturnT.FAIL_CODE, "xxl-job remoting fail, StatusCode("+ statusCode +") invalid. for url : " + url); } // result bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); StringBuilder result = new StringBuilder(); String line; while ((line = bufferedReader.readLine()) != null) { result.append(line); } String resultJson = result.toString(); System.out.println("读取到服务端返回的结果是 result="+resultJson); // parse returnT try { ReturnT returnT = GsonTool.fromJson(resultJson, ReturnT.class, returnTargClassOfT); return returnT; } catch (Exception e) { logger.error("xxl-job remoting (url="+url+") response content invalid("+ resultJson +").", e); return new ReturnT (ReturnT.FAIL_CODE, "xxl-job remoting (url="+url+") response content invalid("+ resultJson +")."); } } catch (Exception e) { logger.error(e.getMessage(), e); return new ReturnT (ReturnT.FAIL_CODE, "xxl-job remoting error("+ e.getMessage() +"), for url : " + url); } finally { try { if (bufferedReader != null) { bufferedReader.close(); } if (connection != null) { connection.disconnect(); } } catch (Exception e2) { logger.error(e2.getMessage(), e2); } } }
这里封装的请求参数包括了这个非常中的 triggerParam,如图示:
这里开始发送客户端请求,也就是这里开始和服务端开始建立通信的,然后服务端接受代码如下:
public class EmbedServer { private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class); @Override protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { System.out.println("开始执行 channelRead0() 方法.............msg="+msg); // request parse //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8); String requestData = msg.content().toString(CharsetUtil.UTF_8); String uri = msg.uri(); HttpMethod httpMethod = msg.method(); boolean keepAlive = HttpUtil.isKeepAlive(msg); String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN); // invoke bizThreadPool.execute(new Runnable() { @Override public void run() { // 开始收到调度中心发送过来的请求,执行对象的 handler Object responseObj = process(httpMethod, uri, requestData, accessTokenReq); // to json String responseJson = GsonTool.toJson(responseObj); // 把执行结果 responseJson 发送给客户端(调度中心) writeResponse(ctx, keepAlive, responseJson); } }); } }
然后调用 process() 方法,代码如下:
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) { // valid if (HttpMethod.POST != httpMethod) { return new ReturnT(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support."); } if (uri == null || uri.trim().length() == 0) { return new ReturnT (ReturnT.FAIL_CODE, "invalid request, uri-mapping empty."); } if (accessToken != null && accessToken.trim().length() > 0 && !accessToken.equals(accessTokenReq)) { return new ReturnT (ReturnT.FAIL_CODE, "The access token is wrong."); } // services mapping try { switch (uri) { case "/beat": return executorBiz.beat(); case "/idleBeat": IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class); return executorBiz.idleBeat(idleBeatParam); case "/run": TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class); System.out.println("channelRead0() 方法收到客户端请求,接收到的请求参数是 triggerParam="+triggerParam); return executorBiz.run(triggerParam); case "/kill": KillParam killParam = GsonTool.fromJson(requestData, KillParam.class); return executorBiz.kill(killParam); case "/log": LogParam logParam = GsonTool.fromJson(requestData, LogParam.class); return executorBiz.log(logParam); default: return new ReturnT (ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found."); } } catch (Exception e) { logger.error(e.getMessage(), e); return new ReturnT (ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e)); } }
计入到服务端 run() 方法,代码如下:
@Override public ReturnTrun(TriggerParam triggerParam) { // load old:jobHandler + jobThread JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; // 省略了所有的判断条件,然后进入到 registJobThread() 方法 // 开启线程开始异步执行客户端请求 if (jobThread == null) { jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); } // 把客户端发送过来的参数存放到阻塞队列中 LinkedBlockingQueue triggerQueue; ReturnT pushResult = jobThread.pushTriggerQueue(triggerParam); return pushResult; }
然后进入到 registJobThread() 方法,代码如下:
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ JobThread newJobThread = new JobThread(jobId, handler); // 开启了一个线程,那么肯定会执行 JobThread 类里面的 run 方法 newJobThread.start(); logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!! if (oldJobThread != null) { oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); } return newJobThread; }
进入到 JobThread 的 run() 方法,代码如下:
@Override public void run() { // init try { handler.init(); } catch (Throwable e) { logger.error(e.getMessage(), e); } // 这里可以看出来一个线程在这里死循环,这样写主要是复用这个线程 while(!toStop){ running = false; // 记录这个线程在这里面空转了几次 // 如果空转了 30 次,那么就会把这个线程直接终结掉 idleTimes++; TriggerParam triggerParam = null; try { // 直接从阻塞队列中取出 triggerParam 任务参数,然后开始执行对应业务处理器 triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); if (triggerParam!=null) { running = true; idleTimes = 0; triggerLogIdSet.remove(triggerParam.getLogId()); // log filename, like "logPath/yyyy-MM-dd/9999.log" String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId()); XxlJobContext xxlJobContext = new XxlJobContext( triggerParam.getJobId(), triggerParam.getExecutorParams(), logFileName, triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()); // init job context XxlJobContext.setXxlJobContext(xxlJobContext); // execute XxlJobHelper.log("
----------- xxl-job job execute start -----------
----------- Param:" + xxlJobContext.getJobParam()); // 如果设置在 admin 管理页面中设置了执行器超时时间处理,就走下面这个 if 逻辑 if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; try { // 使用 Future 框架实现超时机制 FutureTaskfutureTask = new FutureTask (new Callable () { @Override public Boolean call() throws Exception { // init job context XxlJobContext.setXxlJobContext(xxlJobContext); handler.execute(); return true; } }); futureThread = new Thread(futureTask); futureThread.start(); // 调用 get(Timeout) 方法,在指定超时间没有返回结果直接抛出异常 Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS); } catch (TimeoutException e) { // 捕获 get() 超时抛出异常 XxlJobHelper.log("
----------- xxl-job job execute timeout"); XxlJobHelper.log(e); // handle result XxlJobHelper.handleTimeout("job execute timeout "); } finally { // 并且中断在 Callback 中开启的 futureThread 该线程 futureThread.interrupt(); } } else { // 最终最终调用我们编写的业务执行器,其实就是调用我们之前的定时器而已 handler.execute(); } // valid execute handle data if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) { XxlJobHelper.handleFail("job handle result lost."); } else { String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg(); tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000) ?tempHandleMsg.substring(0, 50000).concat("...") :tempHandleMsg; XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg); } XxlJobHelper.log("
----------- xxl-job job execute end(finish) -----------
----------- Result: handleCode=" + XxlJobContext.getXxlJobContext().getHandleCode() + ", handleMsg = " + XxlJobContext.getXxlJobContext().getHandleMsg() ); } else { // 超过30次空转,就把该线程中断 if (idleTimes > 30) { if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); } } } } catch (Throwable e) { if (toStop) { XxlJobHelper.log("
----------- JobThread toStop, stopReason:" + stopReason); } // handle result StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String errorMsg = stringWriter.toString(); XxlJobHelper.handleFail(errorMsg); XxlJobHelper.log("
----------- JobThread Exception:" + errorMsg + "
----------- xxl-job job execute end(error) -----------"); } finally { if(triggerParam != null) { // callback handler info if (!toStop) { // 把 jobHandler 执行完之后的结果返回客户端(调度中心) TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.getXxlJobContext().getHandleCode(), XxlJobContext.getXxlJobContext().getHandleMsg() ) ); } else { // is killed TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.HANDLE_CODE_FAIL, stopReason + " [job running, killed]" ) ); } } } } // callback trigger request in queue while(triggerQueue !=null && triggerQueue.size()>0){ TriggerParam triggerParam = triggerQueue.poll(); if (triggerParam!=null) { // is killed TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.HANDLE_CODE_FAIL, stopReason + " [job not executed, in the job queue, killed.]") ); } } // destroy try { handler.destroy(); } catch (Throwable e) { logger.error(e.getMessage(), e); } logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); }
进入 TriggerCallbackThread 回调线程,代码如下:
public void start() { // valid if (XxlJobExecutor.getAdminBizList() == null) { logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null."); return; } // callback triggerCallbackThread = new Thread(new Runnable() { @Override public void run() { // normal callback while(!toStop){ try { // 取出阻塞队列中的值 HandleCallbackParam callback = getInstance().callBackQueue.take(); if (callback != null) { // callback list param ListcallbackParamList = new ArrayList (); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); callbackParamList.add(callback); // callback, will retry if error if (callbackParamList!=null && callbackParamList.size()>0) { doCallback(callbackParamList); } } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } } // last callback try { List callbackParamList = new ArrayList (); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); if (callbackParamList!=null && callbackParamList.size()>0) { // 实际底层调用 doCallback(callbackParamList); } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy."); } }); triggerCallbackThread.setDaemon(true); triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread"); triggerCallbackThread.start(); // retry triggerRetryCallbackThread = new Thread(new Runnable() { @Override public void run() { while(!toStop){ try { retryFailCallbackFile(); } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } try { TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy."); } }); triggerRetryCallbackThread.setDaemon(true); triggerRetryCallbackThread.start(); }
进入 doCallBack() 方法内部
private void doCallback(ListcallbackParamList){ // 回调线程执行完 handler 的结果 boolean callbackRet = false; // callback, will retry if error for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { ReturnT callbackResult = adminBiz.callback(callbackParamList); if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { callbackLog(callbackParamList, "
----------- xxl-job job callback finish."); callbackRet = true; break; } else { callbackLog(callbackParamList, "
----------- xxl-job job callback fail, callbackResult:" + callbackResult); } } catch (Exception e) { callbackLog(callbackParamList, "
----------- xxl-job job callback error, errorMsg:" + e.getMessage()); } } if (!callbackRet) { appendFailCallbackFile(callbackParamList); } }