栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 软件开发 > 后端开发 > Java

Skywalking系列学习之Trace Profiling源码分析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Skywalking系列学习之Trace Profiling源码分析

前言

  在《在线代码级性能剖析,补全分布式追踪的最后一块“短板”》中有提到再复杂的业务逻辑,都是基于线程去进行执行,那skywalking怎样利用方法栈快照进行代码级性能剖析的,出于好奇心,一起来debug看看其中的奥妙

demo演示
  1. 打开skywalking UI,点击新建Trace Profiling任务
  2. 配置Trace Profiling任务
  3. 查看堆栈信息
源码分析 UI创建任务
  1. 接收页面请求,通过ProfileTaskMutationService#createTask将任务存入ES中,索引名为:profile_task-*(profile_task-20220807)

     public ProfileTaskCreationResult createTask(final String serviceId,
                                                final String endpointName,
                                                final long monitorStartTime,
                                                final int monitorDuration,
                                                final int minDurationThreshold,
                                                final int dumpPeriod,
                                                final int maxSamplingCount) throws IOException {
     	 // check data
        final String errorMessage = checkDataSuccess(
            serviceId, endpointName, taskStartTime, taskEndTime, monitorDuration, minDurationThreshold, dumpPeriod,
            maxSamplingCount
        );
        if (errorMessage != null) {
            return ProfileTaskCreationResult.builder().errorReason(errorMessage).build();
        }
    	
    	// create task
        final long createTime = System.currentTimeMillis();
        final ProfileTaskRecord task = new ProfileTaskRecord();
        task.setServiceId(serviceId);
        task.setEndpointName(endpointName.trim());
        task.setStartTime(taskStartTime);
        task.setDuration(monitorDuration);
        task.setMinDurationThreshold(minDurationThreshold);
        task.setDumpPeriod(dumpPeriod);
        task.setCreateTime(createTime);
        task.setMaxSamplingCount(maxSamplingCount);
        task.setTimeBucket(TimeBucket.getMinuteTimeBucket(taskStartTime));
        NoneStreamProcessor.getInstance().in(task);
    
        return ProfileTaskCreationResult.builder().id(task.id()).build();
     }                                        
    
  2. CacheUpdateTimer#updateProfileTask更新profileTask缓存:ProfileTaskCache$profileTaskDownstreamCache

agent发起ProfileTaskCommandQuery请求
  1. agent通过ProfileTaskChannelService发起ProfileTaskCommandQuery请求
    public void run() {
        if (status == GRPCChannelStatus.CONNECTED) {
            try {
                ProfileTaskCommandQuery.Builder builder = ProfileTaskCommandQuery.newBuilder();
    
                // sniffer info
                builder.setService(Config.Agent.SERVICE_NAME).setServiceInstance(Config.Agent.INSTANCE_NAME);
    
                // last command create time
                builder.setLastCommandTime(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class)
                                                                  .getLastCommandCreateTime());
    			
    			// 发起ProfileTaskCommandQuery请求
                Commands commands = profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
                                                           .getProfileTaskCommands(builder.build());
    			// 处理响应
                ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
            } catch (Throwable t) {
            }
        }
    }
    
  2. 服务端通过ProfileTaskServiceHandler#getProfileTaskCommands接收ProfileTaskCommandQuery请求
     public void getProfileTaskCommands(ProfileTaskCommandQuery request, StreamObserver responseObserver) {
        // query profile task list by service id
        final String serviceId = IDManager.ServiceID.buildId(request.getService(), true);
        final String serviceInstanceId = IDManager.ServiceInstanceID.buildId(serviceId, request.getServiceInstance());
        // 从缓存中取出对应服务的任务
        final List profileTaskList = profileTaskCache.getProfileTaskList(serviceId);
        if (CollectionUtils.isEmpty(profileTaskList)) {
            responseObserver.onNext(Commands.newBuilder().build());
            responseObserver.onCompleted();
            return;
        }
    
        // build command list
        final Commands.Builder commandsBuilder = Commands.newBuilder();
        final long lastCommandTime = request.getLastCommandTime();
    
        for (ProfileTask profileTask : profileTaskList) {
            // if command create time less than last command time, means sniffer already have task
            if (profileTask.getCreateTime() <= lastCommandTime) {
                continue;
            }
    
            // record profile task log -->索引名为:sw_profile_task_log-20220808
            recordProfileTaskLog(profileTask, serviceInstanceId, ProfileTaskLogOperationType.NOTIFIED);
    
            // add command -->将ProfileTask转换为ProfileTaskCommand返回
            commandsBuilder.addCommands(commandService.newProfileTaskCommand(profileTask).serialize().build());
        }
    
        responseObserver.onNext(commandsBuilder.build());
        responseObserver.onCompleted();
    }
    
  3. agent通过CommandService#receiveCommand处理ProfileTaskCommand返回,放入阻塞队列commands中
    public void receiveCommand(Commands commands) {
        for (Command command : commands.getCommandsList()) {
            try {
                BaseCommand baseCommand = CommandDeserializer.deserialize(command);
                boolean success = this.commands.offer(baseCommand);
            } catch (UnsupportedCommandException e) {
            }
        }
    }
    
agent异步处理ProfileTaskCommand
  1. CommandService线程循环检测commands队列的任务,交给不同command执行器去执行对应的任务
    public void run() {
       	final CommandExecutorService commandExecutorService = ServiceManager.INSTANCE.findService(CommandExecutorService.class);
    
       	while (isRunning) {
            try {
            	// 取出commands队列的任务
                BaseCommand command = commands.take();
    
                if (isCommandExecuted(command)) {
                    continue;
                }
    
                commandExecutorService.execute(command);
                serialNumberCache.add(command.getSerialNumber());
            } catch (CommandExecutionException e) {
                LOGGER.error(e, "Failed to execute command[{}].", e.command().getCommand());
            } catch (Throwable e) {
                LOGGER.error(e, "There is unexpected exception");
            }
       	 }
     }
    
  2. ProfileTaskCommandExecutor#execute将ProfileTaskCommand转换为ProfileTask
  3. ProfileTaskExecutionService#addProfileTask处启动定时任务处理ProfileTask
    public void addProfileTask(ProfileTask task) {
        // update last command create time
        if (task.getCreateTime() > lastCommandCreateTime) {
            lastCommandCreateTime = task.getCreateTime();
        }
    
        // check profile task limit
        final CheckResult dataError = checkProfileTaskSuccess(task);
        if (!dataError.isSuccess()) {
            LOGGER.warn(
                "check command error, cannot process this profile task. reason: {}", dataError.getErrorReason());
            return;
        }
    
        // add task to list
        profileTaskList.add(task);
    
        // schedule to start task
        long timeToProcessMills = task.getStartTime() - System.currentTimeMillis();
        PROFILE_TASK_SCHEDULE.schedule(() -> processProfileTask(task), timeToProcessMills, TimeUnit.MILLISECONDS);
    }
    
  4. ProfileTaskExecutionService#processProfileTask新建ProfileThread线程丢入线程池中,得到其返回profilingFuture(方便后面关闭)
ProfileThread开始profiling
  1. ProfileThread线程循环处理ProfileTaskExecutionContext的profilingSegmentSlots(profilingSegmentSlots什么时候插入呢?–>下文有答案)
  2. 通过Thread#getStackTrace获取线程栈,将其转换为线程快照TracingThreadSnapshot
  3. 将线程快照TracingThreadSnapshot放入快照队列snapshotQueue中
    private void profiling(ProfileTaskExecutionContext executionContext) throws InterruptedException {
		
		// 监控间隔 ->10ms
        int maxSleepPeriod = executionContext.getTask().getThreadDumpPeriod();

        // run loop when current thread still running
        long currentLoopStartTime = -1;
        // 循环
        while (!Thread.currentThread().isInterrupted()) {
            currentLoopStartTime = System.currentTimeMillis();

            // each all slot 采集插槽
            AtomicReferenceArray profilers = executionContext.threadProfilerSlots();
            int profilerCount = profilers.length();
            for (int slot = 0; slot < profilerCount; slot++) {
                ThreadProfiler currentProfiler = profilers.get(slot);
                if (currentProfiler == null) {
                    continue;
                }

                switch (currentProfiler.profilingStatus().get()) {

                    case PENDING:
                        // check tracing context running time
                        currentProfiler.startProfilingIfNeed();
                        break;

                    case PROFILING:
                        // dump stack
                        TracingThreadSnapshot snapshot = currentProfiler.buildSnapshot();
                        if (snapshot != null) {
                            profileTaskChannelService.addProfilingSnapshot(snapshot);
                        } else {
                            // tell execution context current tracing thread dump failed, stop it
                            executionContext.stopTracingProfile(currentProfiler.tracingContext());
                        }
                        break;

                }
            }

            // sleep to next period
            // if out of period, sleep one period
            long needToSleep = (currentLoopStartTime + maxSleepPeriod) - System.currentTimeMillis();
            needToSleep = needToSleep > 0 ? needToSleep : maxSleepPeriod;
            Thread.sleep(needToSleep);
        }
    }
profilingSegmentSlots什么时候插入呢?
  1. 在agent拦截入口方法前(譬如tomcat),初始化TracingContext会插入slot到profilingSegmentSlots(通过Thread.currentThread()获取线程栈信息)
     public ProfileStatusReference attemptProfiling(TracingContext tracingContext,
                                                   String traceSegmentId,
                                                   String firstSpanOPName) {
    	........
    	final ThreadProfiler threadProfiler = new ThreadProfiler(
            tracingContext, traceSegmentId, Thread.currentThread(), this);
        int slotLength = profilingSegmentSlots.length();
        for (int slot = 0; slot < slotLength; slot++) {
            if (profilingSegmentSlots.compareAndSet(slot, null, threadProfiler)) {
                return threadProfiler.profilingStatus();
            }
        }
    }
    
  2. 在agent拦截入口方法后(譬如tomcat),将之前插入slot重置为null
agent将线程快照异步发送给Server端
  1. ProfileTaskChannelService在boot时会启动500ms的定时任务,从快照队列snapshotQueue取出快照放入缓存中,批量发送给server端
    public void boot() {
    	.......
    	sendSnapshotFuture = Executors.newSingleThreadScheduledExecutor(
                new DefaultNamedThreadFactory("ProfileSendSnapshotService")
            ).scheduleWithFixedDelay(
                new RunnableWithExceptionProtection(
                    () -> {
                        List buffer = new ArrayList<>(Config.Profile.SNAPSHOT_TRANSPORT_BUFFER_SIZE);
                        //从快照队列snapshotQueue取出快照
                        snapshotQueue.drainTo(buffer);
                        if (!buffer.isEmpty()) {
                            sender.send(buffer);
                        }
                    },
                    t -> LOGGER.error("Profile segment snapshot upload failure.", t)
                ), 0, 500, TimeUnit.MILLISECONDS
            );
         ........         
    }
    
  2. ProfileSnapshotSender#send将TracingThreadSnapshot转换为ThreadSnapshot发送给server
束语

  通过本篇文章可以知道UI创建任务 --> agent获取任务 --> agent上报线程快照的整个流程,了解skywalking在其中使用大量的异步变成技巧,后续继续挖掘学习。

转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/1039867.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号