当前内容主要为解析Apache IoTDB 0.11.2版本的Session的源码解析
通过前面的Apache Thrift的Demo,可以发现iotdb中的server是使用了thrift的并生成了对应的iotdb-thrift.jar的
thrift在生成后必定有客户端Client和实现的服务端Processor的,thrift的通信就是使用thrift中的Client来访问的
iotdb中的session操作为:
Session session = new Session(xxx,xxx,xxx); session.open(); session.executeSql(); session.close();2. 查看Session中的代码
1. 查看open方法
private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs) transport = new TFastFramedTransport(new TSocket(host, rpcPort, connectionTimeoutInMs));
这个其实就是构建一个TTransport就是用来open用的
if (!transport.isOpen()) { try { transport.open(); } catch (TTransportException e) { throw new IoTDBConnectionException(e); } } if (enableRPCCompression) { client = new TSIService.Client(new TCompactProtocol(transport)); } else { client = new TSIService.Client(new TBinaryProtocol(transport)); }
校验是否已经是开的,否则就打开连接,然后判断是否开启压缩(对应配置文件中的),构建对应的Client,TSIService就是thrift文件中定义并生成的
有了Client就可以实现与服务端进行通信了
TSOpenSessionReq openReq = new TSOpenSessionReq(); openReq.setUsername(username); openReq.setPassword(password); openReq.setZoneId(getTimeZone()); try { TSOpenSessionResp openResp = client.openSession(openReq); RpcUtils.verifySuccess(openResp.getStatus()); if (protocolVersion.getValue() != openResp.getServerProtocolVersion().getValue()) { logger.warn("Protocol differ, Client version is {}}, but Server version is {}", protocolVersion.getValue(), openResp.getServerProtocolVersion().getValue()); if (openResp.getServerProtocolVersion().getValue() == 0) {// less than 0.10 throw new TException(String .format("Protocol not supported, Client version is %s, but Server version is %s", protocolVersion.getValue(), openResp.getServerProtocolVersion().getValue())); } }
通过Client携带用户名密码并发送openSession的一个请求,随后开始校验当前的状态是否正常(其实就是判断是否有错误,状态是否为SUCCESS),判断客户端和服务器的协议版本是否一致,低于0.10的就会报错
sessionId = openResp.getSessionId(); statementId = client.requestStatementId(sessionId); client = RpcUtils.newSynchronizedClient(client);
得到请求响应中的id并完成Client的statementId的设置(thrift文件定义发送数据查询请求必须要有statementId),最后使用同步方式锁住该client保证多线程情况下的请求
2. 查看close方法
public synchronized void close() throws IoTDBConnectionException { if (isClosed) { return; } TSCloseSessionReq req = new TSCloseSessionReq(sessionId); try { client.closeSession(req); } catch (TException e) { throw new IoTDBConnectionException( "Error occurs when closing session at server. Maybe server is down.", e); } finally { isClosed = true; if (transport != null) { transport.close(); } } }
这里其实就是使用当前的sessionId并利用Client发送关闭Session的请求,最后关闭TTransport
结合综上,发现这个session实际上就是一个thrift版本的包装类,实现了同步?
3. 总结1. 通过查看源码发现Session实际上就是thrift生成的Client的包装类,说明iotdb本身就是使用thrift的Server和Client来定义通信的
2. 本人通过查看发现iotdb的lib中有netty和jetty,将其全部移除掉(迁移到lib中)发现任然可以正常的使用iotdb证明了1的设想(也是可以发送写入数据的指令,jetty是用来访问metricService的、netty是用来构建MQTT的)
3. 了解iotdb必须从其中的组件进行了解才会有所体会