在《在线代码级性能剖析,补全分布式追踪的最后一块“短板”》中有提到再复杂的业务逻辑,都是基于线程去进行执行,那skywalking怎样利用方法栈快照进行代码级性能剖析的,出于好奇心,一起来debug看看其中的奥妙
demo演示- 打开skywalking UI,点击新建Trace Profiling任务
- 配置Trace Profiling任务
- 查看堆栈信息
-
接收页面请求,通过ProfileTaskMutationService#createTask将任务存入ES中,索引名为:profile_task-*(profile_task-20220807)
public ProfileTaskCreationResult createTask(final String serviceId, final String endpointName, final long monitorStartTime, final int monitorDuration, final int minDurationThreshold, final int dumpPeriod, final int maxSamplingCount) throws IOException { // check data final String errorMessage = checkDataSuccess( serviceId, endpointName, taskStartTime, taskEndTime, monitorDuration, minDurationThreshold, dumpPeriod, maxSamplingCount ); if (errorMessage != null) { return ProfileTaskCreationResult.builder().errorReason(errorMessage).build(); } // create task final long createTime = System.currentTimeMillis(); final ProfileTaskRecord task = new ProfileTaskRecord(); task.setServiceId(serviceId); task.setEndpointName(endpointName.trim()); task.setStartTime(taskStartTime); task.setDuration(monitorDuration); task.setMinDurationThreshold(minDurationThreshold); task.setDumpPeriod(dumpPeriod); task.setCreateTime(createTime); task.setMaxSamplingCount(maxSamplingCount); task.setTimeBucket(TimeBucket.getMinuteTimeBucket(taskStartTime)); NoneStreamProcessor.getInstance().in(task); return ProfileTaskCreationResult.builder().id(task.id()).build(); }
-
CacheUpdateTimer#updateProfileTask更新profileTask缓存:ProfileTaskCache$profileTaskDownstreamCache
- agent通过ProfileTaskChannelService发起ProfileTaskCommandQuery请求
public void run() { if (status == GRPCChannelStatus.CONNECTED) { try { ProfileTaskCommandQuery.Builder builder = ProfileTaskCommandQuery.newBuilder(); // sniffer info builder.setService(Config.Agent.SERVICE_NAME).setServiceInstance(Config.Agent.INSTANCE_NAME); // last command create time builder.setLastCommandTime(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class) .getLastCommandCreateTime()); // 发起ProfileTaskCommandQuery请求 Commands commands = profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS) .getProfileTaskCommands(builder.build()); // 处理响应 ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); } catch (Throwable t) { } } }
- 服务端通过ProfileTaskServiceHandler#getProfileTaskCommands接收ProfileTaskCommandQuery请求
public void getProfileTaskCommands(ProfileTaskCommandQuery request, StreamObserver
responseObserver) { // query profile task list by service id final String serviceId = IDManager.ServiceID.buildId(request.getService(), true); final String serviceInstanceId = IDManager.ServiceInstanceID.buildId(serviceId, request.getServiceInstance()); // 从缓存中取出对应服务的任务 final List profileTaskList = profileTaskCache.getProfileTaskList(serviceId); if (CollectionUtils.isEmpty(profileTaskList)) { responseObserver.onNext(Commands.newBuilder().build()); responseObserver.onCompleted(); return; } // build command list final Commands.Builder commandsBuilder = Commands.newBuilder(); final long lastCommandTime = request.getLastCommandTime(); for (ProfileTask profileTask : profileTaskList) { // if command create time less than last command time, means sniffer already have task if (profileTask.getCreateTime() <= lastCommandTime) { continue; } // record profile task log -->索引名为:sw_profile_task_log-20220808 recordProfileTaskLog(profileTask, serviceInstanceId, ProfileTaskLogOperationType.NOTIFIED); // add command -->将ProfileTask转换为ProfileTaskCommand返回 commandsBuilder.addCommands(commandService.newProfileTaskCommand(profileTask).serialize().build()); } responseObserver.onNext(commandsBuilder.build()); responseObserver.onCompleted(); } - agent通过CommandService#receiveCommand处理ProfileTaskCommand返回,放入阻塞队列commands中
public void receiveCommand(Commands commands) { for (Command command : commands.getCommandsList()) { try { BaseCommand baseCommand = CommandDeserializer.deserialize(command); boolean success = this.commands.offer(baseCommand); } catch (UnsupportedCommandException e) { } } }
- CommandService线程循环检测commands队列的任务,交给不同command执行器去执行对应的任务
public void run() { final CommandExecutorService commandExecutorService = ServiceManager.INSTANCE.findService(CommandExecutorService.class); while (isRunning) { try { // 取出commands队列的任务 BaseCommand command = commands.take(); if (isCommandExecuted(command)) { continue; } commandExecutorService.execute(command); serialNumberCache.add(command.getSerialNumber()); } catch (CommandExecutionException e) { LOGGER.error(e, "Failed to execute command[{}].", e.command().getCommand()); } catch (Throwable e) { LOGGER.error(e, "There is unexpected exception"); } } }
- ProfileTaskCommandExecutor#execute将ProfileTaskCommand转换为ProfileTask
- ProfileTaskExecutionService#addProfileTask处启动定时任务处理ProfileTask
public void addProfileTask(ProfileTask task) { // update last command create time if (task.getCreateTime() > lastCommandCreateTime) { lastCommandCreateTime = task.getCreateTime(); } // check profile task limit final CheckResult dataError = checkProfileTaskSuccess(task); if (!dataError.isSuccess()) { LOGGER.warn( "check command error, cannot process this profile task. reason: {}", dataError.getErrorReason()); return; } // add task to list profileTaskList.add(task); // schedule to start task long timeToProcessMills = task.getStartTime() - System.currentTimeMillis(); PROFILE_TASK_SCHEDULE.schedule(() -> processProfileTask(task), timeToProcessMills, TimeUnit.MILLISECONDS); }
- ProfileTaskExecutionService#processProfileTask新建ProfileThread线程丢入线程池中,得到其返回profilingFuture(方便后面关闭)
- ProfileThread线程循环处理ProfileTaskExecutionContext的profilingSegmentSlots(profilingSegmentSlots什么时候插入呢?–>下文有答案)
- 通过Thread#getStackTrace获取线程栈,将其转换为线程快照TracingThreadSnapshot
- 将线程快照TracingThreadSnapshot放入快照队列snapshotQueue中
private void profiling(ProfileTaskExecutionContext executionContext) throws InterruptedException { // 监控间隔 ->10ms int maxSleepPeriod = executionContext.getTask().getThreadDumpPeriod(); // run loop when current thread still running long currentLoopStartTime = -1; // 循环 while (!Thread.currentThread().isInterrupted()) { currentLoopStartTime = System.currentTimeMillis(); // each all slot 采集插槽 AtomicReferenceArrayprofilingSegmentSlots什么时候插入呢?profilers = executionContext.threadProfilerSlots(); int profilerCount = profilers.length(); for (int slot = 0; slot < profilerCount; slot++) { ThreadProfiler currentProfiler = profilers.get(slot); if (currentProfiler == null) { continue; } switch (currentProfiler.profilingStatus().get()) { case PENDING: // check tracing context running time currentProfiler.startProfilingIfNeed(); break; case PROFILING: // dump stack TracingThreadSnapshot snapshot = currentProfiler.buildSnapshot(); if (snapshot != null) { profileTaskChannelService.addProfilingSnapshot(snapshot); } else { // tell execution context current tracing thread dump failed, stop it executionContext.stopTracingProfile(currentProfiler.tracingContext()); } break; } } // sleep to next period // if out of period, sleep one period long needToSleep = (currentLoopStartTime + maxSleepPeriod) - System.currentTimeMillis(); needToSleep = needToSleep > 0 ? needToSleep : maxSleepPeriod; Thread.sleep(needToSleep); } }
- 在agent拦截入口方法前(譬如tomcat),初始化TracingContext会插入slot到profilingSegmentSlots(通过Thread.currentThread()获取线程栈信息)
public ProfileStatusReference attemptProfiling(TracingContext tracingContext, String traceSegmentId, String firstSpanOPName) { ........ final ThreadProfiler threadProfiler = new ThreadProfiler( tracingContext, traceSegmentId, Thread.currentThread(), this); int slotLength = profilingSegmentSlots.length(); for (int slot = 0; slot < slotLength; slot++) { if (profilingSegmentSlots.compareAndSet(slot, null, threadProfiler)) { return threadProfiler.profilingStatus(); } } }
- 在agent拦截入口方法后(譬如tomcat),将之前插入slot重置为null
- ProfileTaskChannelService在boot时会启动500ms的定时任务,从快照队列snapshotQueue取出快照放入缓存中,批量发送给server端
public void boot() { ....... sendSnapshotFuture = Executors.newSingleThreadScheduledExecutor( new DefaultNamedThreadFactory("ProfileSendSnapshotService") ).scheduleWithFixedDelay( new RunnableWithExceptionProtection( () -> { List
buffer = new ArrayList<>(Config.Profile.SNAPSHOT_TRANSPORT_BUFFER_SIZE); //从快照队列snapshotQueue取出快照 snapshotQueue.drainTo(buffer); if (!buffer.isEmpty()) { sender.send(buffer); } }, t -> LOGGER.error("Profile segment snapshot upload failure.", t) ), 0, 500, TimeUnit.MILLISECONDS ); ........ } - ProfileSnapshotSender#send将TracingThreadSnapshot转换为ThreadSnapshot发送给server
通过本篇文章可以知道UI创建任务 --> agent获取任务 --> agent上报线程快照的整个流程,了解skywalking在其中使用大量的异步变成技巧,后续继续挖掘学习。