栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 软件开发 > 后端开发 > Java

Apache IoTDB源码解析(0.11.2版本):Session的源码解析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Apache IoTDB源码解析(0.11.2版本):Session的源码解析

1. 声明

当前内容主要为解析Apache IoTDB 0.11.2版本的Session的源码解析
通过前面的Apache Thrift的Demo,可以发现iotdb中的server是使用了thrift的并生成了对应的iotdb-thrift.jar的

thrift在生成后必定有客户端Client和实现的服务端Processor的,thrift的通信就是使用thrift中的Client来访问的

iotdb中的session操作为:

Session session = new Session(xxx,xxx,xxx);
session.open();
session.executeSql();
session.close();
2. 查看Session中的代码

1. 查看open方法

  private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
   transport = new TFastFramedTransport(new TSocket(host, rpcPort, connectionTimeoutInMs));

这个其实就是构建一个TTransport就是用来open用的

 if (!transport.isOpen()) {
      try {
        transport.open();
      } catch (TTransportException e) {
        throw new IoTDBConnectionException(e);
      }
    }
    if (enableRPCCompression) {
      client = new TSIService.Client(new TCompactProtocol(transport));
    } else {
      client = new TSIService.Client(new TBinaryProtocol(transport));
    }

校验是否已经是开的,否则就打开连接,然后判断是否开启压缩(对应配置文件中的),构建对应的Client,TSIService就是thrift文件中定义并生成的

有了Client就可以实现与服务端进行通信了

 TSOpenSessionReq openReq = new TSOpenSessionReq();
    openReq.setUsername(username);
    openReq.setPassword(password);
    openReq.setZoneId(getTimeZone());
    try {
      TSOpenSessionResp openResp = client.openSession(openReq);
      RpcUtils.verifySuccess(openResp.getStatus());
      if (protocolVersion.getValue() != openResp.getServerProtocolVersion().getValue()) {
        logger.warn("Protocol differ, Client version is {}}, but Server version is {}",
            protocolVersion.getValue(), openResp.getServerProtocolVersion().getValue());
        if (openResp.getServerProtocolVersion().getValue() == 0) {// less than 0.10
          throw new TException(String
              .format("Protocol not supported, Client version is %s, but Server version is %s",
                  protocolVersion.getValue(), openResp.getServerProtocolVersion().getValue()));
        }
      }

通过Client携带用户名密码并发送openSession的一个请求,随后开始校验当前的状态是否正常(其实就是判断是否有错误,状态是否为SUCCESS),判断客户端和服务器的协议版本是否一致,低于0.10的就会报错

sessionId = openResp.getSessionId();
statementId = client.requestStatementId(sessionId);
 client = RpcUtils.newSynchronizedClient(client);

得到请求响应中的id并完成Client的statementId的设置(thrift文件定义发送数据查询请求必须要有statementId),最后使用同步方式锁住该client保证多线程情况下的请求

2. 查看close方法

 public synchronized void close() throws IoTDBConnectionException {
    if (isClosed) {
      return;
    }
    TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
    try {
      client.closeSession(req);
    } catch (TException e) {
      throw new IoTDBConnectionException(
          "Error occurs when closing session at server. Maybe server is down.", e);
    } finally {
      isClosed = true;
      if (transport != null) {
        transport.close();
      }
    }
  }

这里其实就是使用当前的sessionId并利用Client发送关闭Session的请求,最后关闭TTransport

结合综上,发现这个session实际上就是一个thrift版本的包装类,实现了同步?

3. 总结

1. 通过查看源码发现Session实际上就是thrift生成的Client的包装类,说明iotdb本身就是使用thrift的Server和Client来定义通信的

2. 本人通过查看发现iotdb的lib中有netty和jetty,将其全部移除掉(迁移到lib中)发现任然可以正常的使用iotdb证明了1的设想(也是可以发送写入数据的指令,jetty是用来访问metricService的、netty是用来构建MQTT的)


3. 了解iotdb必须从其中的组件进行了解才会有所体会

转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/937787.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号