package com.alibaba.datax.plugin.reader.hivereader;
public class Constant {
public final static String TEMP_DATABASE_DEFAULT = "default"; // 参考CDH的default库
public static final String TEMP_DATABSE_HDFS_LOCATION_DEFAULT = "/user/{username}/warehouse/";// 参考CDH的default库的路径
public static final String TEMP_TABLE_NAME_PREFIX="tmp_datax_hivereader_";
// public final static String HIVE_CMD_DEFAULT = "hive"; //
public final static String HIVE_SQL_SET_DEFAULT = ""; //
public final static String FIELDDELIMITER_DEFAULT = "\u0001"; //
public final static String NULL_FORMAT_DEFAULT="\N" ;
public static final String TEXT = "TEXT";
public static final String ORC = "ORC";
public static final String CSV = "CSV";
public static final String SEQ = "SEQ";
public static final String RC = "RC";
}
DFSUtil.class
package com.alibaba.datax.plugin.reader.hivereader;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.unstructuredstorage.reader.ColumnEntry;
import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderErrorCode;
import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.RCFileRecordReader;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.*;
public class DFSUtil {
private static final Logger LOG = LoggerFactory.getLogger(HiveReader.Job.class);
private org.apache.hadoop.conf.Configuration hadoopConf = null;
private String username = null;
private String specifiedFileType = null;
private Boolean haveKerberos = false;
private String kerberosKeytabFilePath;
private String kerberosPrincipal;
private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
public static final String HDFS_DEFAULTFS_KEY = "fs.defaultFS";
public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
public DFSUtil(Configuration taskConfig) {
hadoopConf = new org.apache.hadoop.conf.Configuration();
//io.file.buffer.size 性能参数
//http://blog.csdn.net/yangjl38/article/details/7583374
Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG);
JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG));
if (null != hadoopSiteParams) {
Set paramKeys = hadoopSiteParams.getKeys();
for (String each : paramKeys) {
hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));
}
}
hadoopConf.set(HDFS_DEFAULTFS_KEY, taskConfig.getString(Key.DEFAULT_FS));
this.username = taskConfig.getString(Key.USERNAME);
System.setProperty("HADOOP_USER_NAME", this.username);
//是否有Kerberos认证
this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false);
if (haveKerberos) {
this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);
this.hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
}
this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath);
LOG.info(String.format("hadoopConfig details:%s", JSON.toJSONString(this.hadoopConf)));
}
private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath) {
if (haveKerberos && StringUtils.isNotBlank(this.kerberosPrincipal) && StringUtils.isNotBlank(this.kerberosKeytabFilePath)) {
UserGroupInformation.setConfiguration(this.hadoopConf);
try {
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
} catch (Exception e) {
String message = String.format("kerberos认证失败,请确定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填写正确",
kerberosKeytabFilePath, kerberosPrincipal);
throw DataXException.asDataXException(HiveReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e);
}
}
}
public HashSet getAllFiles(List srcPaths, String specifiedFileType) {
this.specifiedFileType = specifiedFileType;
if (!srcPaths.isEmpty()) {
for (String eachPath : srcPaths) {
LOG.info(String.format("get HDFS all files in path = [%s]", eachPath));
getHDFSAllFiles(eachPath);
}
}
return sourceHDFSAllFilesList;
}
private HashSet sourceHDFSAllFilesList = new HashSet();
public HashSet getHDFSAllFiles(String hdfsPath) {
try {
FileSystem hdfs = FileSystem.get(new URI(hadoopConf.get(HDFS_DEFAULTFS_KEY)),hadoopConf,username);
//判断hdfsPath是否包含正则符号
if (hdfsPath.contains("*") || hdfsPath.contains("?")) {
Path path = new Path(hdfsPath);
FileStatus stats[] = hdfs.globStatus(path);
for (FileStatus f : stats) {
if (f.isFile()) {
if (f.getLen() == 0) {
String message = String.format("文件[%s]长度为0,将会跳过不作处理!", hdfsPath);
LOG.warn(message);
} else {
addSourceFileByType(f.getPath().toString());
}
} else if (f.isDirectory()) {
getHDFSAllFilesNORegex(f.getPath().toString(), hdfs);
}
}
} else {
getHDFSAllFilesNORegex(hdfsPath, hdfs);
}
return sourceHDFSAllFilesList;
} catch (IOException | InterruptedException | URISyntaxException e) {
String message = String.format("无法读取路径[%s]下的所有文件,请确认您的配置项fs.defaultFS, path的值是否正确," +
"是否有读写权限,网络是否已断开!", hdfsPath);
LOG.error(message);
throw DataXException.asDataXException(HiveReaderErrorCode.PATH_CONFIG_ERROR, e);
}
}
private HashSet getHDFSAllFilesNORegex(String path, FileSystem hdfs) throws IOException {
// 获取要读取的文件的根目录
Path listFiles = new Path(path);
// If the network disconnected, this method will retry 45 times
// each time the retry interval for 20 seconds
// 获取要读取的文件的根目录的所有二级子文件目录
FileStatus stats[] = hdfs.listStatus(listFiles);
for (FileStatus f : stats) {
// 判断是不是目录,如果是目录,递归调用
if (f.isDirectory()) {
LOG.info(String.format("[%s] 是目录, 递归获取该目录下的文件", f.getPath().toString()));
getHDFSAllFilesNORegex(f.getPath().toString(), hdfs);
} else if (f.isFile()) {
addSourceFileByType(f.getPath().toString());
} else {
String message = String.format("该路径[%s]文件类型既不是目录也不是文件,插件自动忽略。",
f.getPath().toString());
LOG.info(message);
}
}
return sourceHDFSAllFilesList;
}
// 根据用户指定的文件类型,将指定的文件类型的路径加入sourceHDFSAllFilesList
private void addSourceFileByType(String filePath) {
// 检查file的类型和用户配置的fileType类型是否一致
boolean isMatchedFileType = checkHdfsFileType(filePath, this.specifiedFileType);
if (isMatchedFileType) {
LOG.info(String.format("[%s]是[%s]类型的文件, 将该文件加入source files列表", filePath, this.specifiedFileType));
sourceHDFSAllFilesList.add(filePath);
} else {
String message = String.format("文件[%s]的类型与用户配置的fileType类型不一致," +
"请确认您配置的目录下面所有文件的类型均为[%s]"
, filePath, this.specifiedFileType);
LOG.error(message);
throw DataXException.asDataXException(
HiveReaderErrorCode.FILE_TYPE_UNSUPPORT, message);
}
}
public InputStream getInputStream(String filepath) {
InputStream inputStream;
Path path = new Path(filepath);
try {
FileSystem fs = FileSystem.get(new URI(hadoopConf.get(HDFS_DEFAULTFS_KEY)),hadoopConf,username);
//If the network disconnected, this method will retry 45 times
//each time the retry interval for 20 seconds
inputStream = fs.open(path);
return inputStream;
} catch (IOException | URISyntaxException | InterruptedException e) {
String message = String.format("读取文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filepath, filepath);
throw DataXException.asDataXException(HiveReaderErrorCode.READ_FILE_ERROR, message, e);
}
}
public void sequenceFileStartRead(String sourceSequenceFilePath, Configuration readerSliceConfig,
RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
LOG.info(String.format("Start Read sequence file [%s].", sourceSequenceFilePath));
Path seqFilePath = new Path(sourceSequenceFilePath);
SequenceFile.Reader reader = null;
try {
//获取SequenceFile.Reader实例
reader = new SequenceFile.Reader(this.hadoopConf,
SequenceFile.Reader.file(seqFilePath));
//获取key 与 value
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), this.hadoopConf);
Text value = new Text();
while (reader.next(key, value)) {
if (StringUtils.isNotBlank(value.toString())) {
UnstructuredStorageReaderUtil.transportOneRecord(recordSender,
readerSliceConfig, taskPluginCollector, value.toString());
}
}
} catch (Exception e) {
String message = String.format("SequenceFile.Reader读取文件[%s]时出错", sourceSequenceFilePath);
LOG.error(message);
throw DataXException.asDataXException(HiveReaderErrorCode.READ_SEQUENCEFILE_ERROR, message, e);
} finally {
IOUtils.closeStream(reader);
LOG.info("Finally, Close stream SequenceFile.Reader.");
}
}
public void rcFileStartRead(String sourceRcFilePath, Configuration readerSliceConfig,
RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
LOG.info(String.format("Start Read rcfile [%s].", sourceRcFilePath));
List column = UnstructuredStorageReaderUtil
.getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
// warn: no default value 'N'
String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
Path rcFilePath = new Path(sourceRcFilePath);
FileSystem fs = null;
RCFileRecordReader recordReader = null;
try {
fs = FileSystem.get(rcFilePath.toUri(), hadoopConf,username);
long fileLen = fs.getFileStatus(rcFilePath).getLen();
FileSplit split = new FileSplit(rcFilePath, 0, fileLen, (String[]) null);
recordReader = new RCFileRecordReader(hadoopConf, split);
LongWritable key = new LongWritable();
BytesRefArrayWritable value = new BytesRefArrayWritable();
Text txt = new Text();
while (recordReader.next(key, value)) {
String[] sourceLine = new String[value.size()];
txt.clear();
for (int i = 0; i < value.size(); i++) {
BytesRefWritable v = value.get(i);
txt.set(v.getData(), v.getStart(), v.getLength());
sourceLine[i] = txt.toString();
}
UnstructuredStorageReaderUtil.transportOneRecord(recordSender,
column, sourceLine, nullFormat, taskPluginCollector);
}
} catch (IOException | InterruptedException e) {
String message = String.format("读取文件[%s]时出错", sourceRcFilePath);
LOG.error(message);
throw DataXException.asDataXException(HiveReaderErrorCode.READ_RCFILE_ERROR, message, e);
} finally {
try {
if (recordReader != null) {
recordReader.close();
LOG.info("Finally, Close RCFileRecordReader.");
}
} catch (IOException e) {
LOG.warn(String.format("finally: 关闭RCFileRecordReader失败, %s", e.getMessage()));
}
}
}
public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,
RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath));
List column = UnstructuredStorageReaderUtil
.getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
StringBuilder allColumns = new StringBuilder();
StringBuilder allColumnTypes = new StringBuilder();
boolean isReadAllColumns = false;
int columnIndexMax = -1;
// 判断是否读取所有列
if (null == column || column.size() == 0) {
int allColumnsCount = getAllColumnsCount(sourceOrcFilePath);
columnIndexMax = allColumnsCount - 1;
isReadAllColumns = true;
} else {
columnIndexMax = getMaxIndex(column);
}
for (int i = 0; i <= columnIndexMax; i++) {
allColumns.append("col");
allColumnTypes.append("string");
if (i != columnIndexMax) {
allColumns.append(",");
allColumnTypes.append(":");
}
}
if (columnIndexMax >= 0) {
JobConf conf = new JobConf(hadoopConf);
Path orcFilePath = new Path(sourceOrcFilePath);
Properties p = new Properties();
p.setProperty("columns", allColumns.toString());
p.setProperty("columns.types", allColumnTypes.toString());
try {
OrcSerde serde = new OrcSerde();
serde.initialize(conf, p);
StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector();
InputFormat, ?> in = new OrcInputFormat();
FileInputFormat.setInputPaths(conf, orcFilePath.toString());
//If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
//Each file as a split
//TODO multy threads
InputSplit[] splits = in.getSplits(conf, 1);
RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);//获取reader
Object key = reader.createKey();
Object value = reader.createValue();// OrcStruct
// 获取列信息
List extends StructField> fields = inspector.getAllStructFieldRefs();
List
package com.alibaba.datax.plugin.reader.hivereader;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class HiveServer2ConnectUtil {
private static final Logger LOG = LoggerFactory.getLogger(HiveServer2ConnectUtil.class);
public static void main(String[] args) {
execHiveSql("hive", null,
"; use default; create table tmp_datax_hivereader_20220808_1659953092709 ROW FORMAT DELIMITED FIELDS TERMINATED BY '\u0001' STORED AS TEXTFILE as select id,username,password from default.t_user;",
"jdbc:hive2://10.252.92.4:10000");
}
public static boolean execHiveSql(String username, String password, String hiveSql, String hiveJdbcUrl) {
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
System.exit(1);
}
try {
LOG.info("hiveJdbcUrl:{}", hiveJdbcUrl);
LOG.info("username:{}", username);
LOG.info("password:{}", password);
Connection conn = DriverManager.getConnection(hiveJdbcUrl, username, password);
Statement stmt = conn.createStatement();
String[] hiveSqls = hiveSql.split(";");
for (int i = 0; i < hiveSqls.length; i++) {
if (StringUtils.isNotEmpty(hiveSqls[i])) {
stmt.execute(hiveSqls[i]);
}
}
return true;
} catch (SQLException sqlException) {
LOG.error(sqlException.getMessage(), sqlException);
return false;
}
}
}
Key.class
package com.alibaba.datax.plugin.reader.hivereader;
public class Key {
public final static String DEFAULT_FS = "defaultFS";
// reader执行的hiveSql语句
public final static String HIVE_SQL = "hiveSql";
// hive的Jdbc链接
public final static String HIVE_JDBC_URL = "hiveJdbcUrl";
// hive的用户名
public final static String USERNAME = "username";
// hive的密码
public final static String PASSWORD = "password";
// 临时表所在的数据库名称
public final static String TEMP_DATABASE = "tempDatabase";
// 临时标存放的HDFS目录
public final static String TEMP_DATABASE_HDFS_LOCATION = "tempDatabasePath";
// hive -e命令
public final static String HIVE_CMD = "hive_cmd";
public final static String HIVE_SQL_SET = "hive_sql_set";
// 存储文件 hdfs默认的分隔符
public final static String FIELDDELIMITER = "fieldDelimiter";
public static final String NULL_FORMAT = "nullFormat";
public static final String HADOOP_CONFIG = "hadoopConfig";
public static final String HAVE_KERBEROS = "haveKerberos";
public static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath";
public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
}
package com.alibaba.datax.plugin.writer.hivewriter;
public class Constants {
public static final String TEMP_TABLE_NAME_PREFIX_DEFAULT="tmp_datax_hivewriter_";
public final static String HIVE_CMD_DEFAULT = "hive";
public final static String HIVE_SQL_SET_DEFAULT = "";
public final static String HIVE_TARGET_TABLE_COMPRESS_SQL= "";
public static final String WRITE_MODE_DEFAULT="insert";
public final static String HIVE_PRESQL_DEFAULT = "";
public final static String HIVE_POSTSQL_DEFAULT = "";
public static final String INSERT_PRE_SQL="SET hive.exec.dynamic.partition=true;"
+"SET hive.exec.dynamic.partition.mode=nonstrict;"
+"SET hive.exec.max.dynamic.partitions.pernode=100000;"
+"SET hive.exec.max.dynamic.partitions=100000;";
public final static String FIELDDELIMITER_DEFAULT = "\u0001";
public final static String COMPRESS_DEFAULT="gzip";
// 此默认值,暂无使用
public static final String DEFAULT_NULL_FORMAT = "\N";
}
HdfsHelper.class
package com.alibaba.datax.plugin.writer.hivewriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.*;
public class HdfsHelper {
public static final Logger LOG = LoggerFactory.getLogger(HiveWriter.Job.class);
public FileSystem fileSystem = null;
public JobConf conf = null;
public org.apache.hadoop.conf.Configuration hadoopConf = null;
public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
public static final String HDFS_DEFAULTFS_KEY = "fs.defaultFS";
private String username = null;
// Kerberos
private Boolean haveKerberos = false;
private String kerberosKeytabFilePath;
private String kerberosPrincipal;
public void getFileSystem(String defaultFS, Configuration taskConfig) {
hadoopConf = new org.apache.hadoop.conf.Configuration();
Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG);
JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG));
if (null != hadoopSiteParams) {
Set paramKeys = hadoopSiteParams.getKeys();
for (String each : paramKeys) {
hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));
}
}
hadoopConf.set(HDFS_DEFAULTFS_KEY, defaultFS);
this.username = taskConfig.getString(Key.USERNAME);
System.setProperty("HADOOP_USER_NAME", this.username);
//是否有Kerberos认证
this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false);
if (haveKerberos) {
this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);
hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
}
this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath);
conf = new JobConf(hadoopConf);
conf.setUser(this.username);
try {
LOG.info("defaultFS:{},user:{}", defaultFS, this.username);
fileSystem = FileSystem.get(new URI(hadoopConf.get(HDFS_DEFAULTFS_KEY)), conf, this.username);
} catch (IOException e) {
String message = String.format("获取FileSystem时发生网络IO异常,请检查您的网络是否正常!HDFS地址:[%s]",
"message:defaultFS =" + defaultFS);
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
} catch (Exception e) {
String message = String.format("获取FileSystem失败,请检查HDFS地址是否正确: [%s]",
"message:defaultFS =" + defaultFS);
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
}
if (null == fileSystem || null == conf) {
String message = String.format("获取FileSystem失败,请检查HDFS地址是否正确: [%s]",
"message:defaultFS =" + defaultFS);
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.CONNECT_HDFS_IO_ERROR, message);
}
}
private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath) {
if (haveKerberos && StringUtils.isNotBlank(this.kerberosPrincipal) && StringUtils.isNotBlank(this.kerberosKeytabFilePath)) {
UserGroupInformation.setConfiguration(this.hadoopConf);
try {
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
} catch (Exception e) {
String message = String.format("kerberos认证失败,请确定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填写正确",
kerberosKeytabFilePath, kerberosPrincipal);
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.KERBEROS_LOGIN_ERROR, e);
}
}
}
public String[] hdfsDirList(String dir) {
Path path = new Path(dir);
String[] files = null;
try {
FileStatus[] status = fileSystem.listStatus(path);
files = new String[status.length];
for(int i=0;i tmpFiles, HashSet endFiles) {
Path tmpFilesParent = null;
if (tmpFiles.size() != endFiles.size()) {
String message = String.format("临时目录下文件名个数与目标文件名个数不一致!");
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.HDFS_RENAME_FILE_ERROR, message);
} else {
try {
for (Iterator it1 = tmpFiles.iterator(), it2 = endFiles.iterator(); it1.hasNext() && it2.hasNext(); ) {
String srcFile = it1.next().toString();
String dstFile = it2.next().toString();
Path srcFilePah = new Path(srcFile);
Path dstFilePah = new Path(dstFile);
if (tmpFilesParent == null) {
tmpFilesParent = srcFilePah.getParent();
}
LOG.info(String.format("start rename file [%s] to file [%s].", srcFile, dstFile));
boolean renameTag = false;
long fileLen = fileSystem.getFileStatus(srcFilePah).getLen();
if (fileLen > 0) {
renameTag = fileSystem.rename(srcFilePah, dstFilePah);
if (!renameTag) {
String message = String.format("重命名文件[%s]失败,请检查您的网络是否正常!", srcFile);
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.HDFS_RENAME_FILE_ERROR, message);
}
LOG.info(String.format("finish rename file [%s] to file [%s].", srcFile, dstFile));
} else {
LOG.info(String.format("文件[%s]内容为空,请检查写入是否正常!", srcFile));
}
}
} catch (Exception e) {
String message = String.format("重命名文件时发生异常,请检查您的网络是否正常!");
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
} finally {
deleteDir(tmpFilesParent);
}
}
}
//关闭FileSystem
public void closeFileSystem(){
try {
fileSystem.close();
} catch (IOException e) {
String message = String.format("关闭FileSystem时发生IO异常,请检查您的网络是否正常!");
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
}
}
//textfile格式文件
public FSDataOutputStream getOutputStream(String path){
Path storePath = new Path(path);
FSDataOutputStream fSDataOutputStream = null;
try {
fSDataOutputStream = fileSystem.create(storePath);
} catch (IOException e) {
String message = String.format("Create an FSDataOutputStream at the indicated Path[%s] failed: [%s]",
"message:path =" + path);
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.Write_FILE_IO_ERROR, e);
}
return fSDataOutputStream;
}
public void textFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,
TaskPluginCollector taskPluginCollector) {
char fieldDelimiter = config.getChar(Key.FIELD_DELIMITER);
List columns = config.getListConfiguration(Key.COLUMN);
String compress = config.getString(Key.COMPRESS, null);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
String attempt = "attempt_" + dateFormat.format(new Date()) + "_0001_m_000000_0";
Path outputPath = new Path(fileName);
//todo 需要进一步确定TASK_ATTEMPT_ID
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
FileOutputFormat outFormat = new TextOutputFormat();
outFormat.setOutputPath(conf, outputPath);
outFormat.setWorkOutputPath(conf, outputPath);
if (null != compress) {
Class extends CompressionCodec> codecClass = getCompressCodec(compress);
if (null != codecClass) {
outFormat.setOutputCompressorClass(conf, codecClass);
}
}
try {
RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, outputPath.toString(), Reporter.NULL);
Record record = null;
while ((record = lineReceiver.getFromReader()) != null) {
MutablePair transportResult = transportOneRecord(record, fieldDelimiter, columns, taskPluginCollector,config);
if (!transportResult.getRight()) {
writer.write(NullWritable.get(),transportResult.getLeft());
}
}
writer.close(Reporter.NULL);
} catch (Exception e) {
String message = String.format("写文件文件[%s]时发生IO异常,请检查您的网络是否正常!", fileName);
LOG.error(message);
Path path = new Path(fileName);
deleteDir(path.getParent());
throw DataXException.asDataXException(HiveWriterErrorCode.Write_FILE_IO_ERROR, e);
}
}
public static MutablePair transportOneRecord(
Record record, char fieldDelimiter, List columnsConfiguration, TaskPluginCollector taskPluginCollector, Configuration config) {
MutablePair, Boolean> transportResultList = transportOneRecord(record, columnsConfiguration, taskPluginCollector, config);
//保存<转换后的数据,是否是脏数据>
MutablePair transportResult = new MutablePair();
transportResult.setRight(false);
if (null != transportResultList) {
Text recordResult = new Text(StringUtils.join(transportResultList.getLeft(), fieldDelimiter));
transportResult.setRight(transportResultList.getRight());
transportResult.setLeft(recordResult);
}
return transportResult;
}
public Class extends CompressionCodec> getCompressCodec(String compress) {
Class extends CompressionCodec> codecClass = null;
if (null == compress) {
codecClass = null;
} else if ("GZIP".equalsIgnoreCase(compress)) {
codecClass = org.apache.hadoop.io.compress.GzipCodec.class;
} else if ("BZIP2".equalsIgnoreCase(compress)) {
codecClass = org.apache.hadoop.io.compress.BZip2Codec.class;
} else if ("SNAPPY".equalsIgnoreCase(compress)) {
//todo 等需求明确后支持 需要用户安装SnappyCodec
codecClass = org.apache.hadoop.io.compress.SnappyCodec.class;
// org.apache.hadoop.hive.ql.io.orc.ZlibCodec.class not public
//codecClass = org.apache.hadoop.hive.ql.io.orc.ZlibCodec.class;
} else {
throw DataXException.asDataXException(HiveWriterErrorCode.ILLEGAL_VALUE,
String.format("目前不支持您配置的 compress 模式 : [%s]", compress));
}
return codecClass;
}
public void orcFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,
TaskPluginCollector taskPluginCollector) {
List columns = config.getListConfiguration(Key.COLUMN);
String compress = config.getString(Key.COMPRESS, null);
List columnNames = getColumnNames(columns);
List columnTypeInspectors = getColumnTypeInspectors(columns);
StructObjectInspector inspector = (StructObjectInspector) ObjectInspectorFactory
.getStandardStructObjectInspector(columnNames, columnTypeInspectors);
OrcSerde orcSerde = new OrcSerde();
FileOutputFormat outFormat = new OrcOutputFormat();
if (!"NONE".equalsIgnoreCase(compress) && null != compress) {
Class extends CompressionCodec> codecClass = getCompressCodec(compress);
if (null != codecClass) {
outFormat.setOutputCompressorClass(conf, codecClass);
}
}
try {
RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, fileName, Reporter.NULL);
Record record = null;
while ((record = lineReceiver.getFromReader()) != null) {
MutablePair, Boolean> transportResult = transportOneRecord(record, columns, taskPluginCollector, config);
if (!transportResult.getRight()) {
writer.write(NullWritable.get(), orcSerde.serialize(transportResult.getLeft(), inspector));
}
}
writer.close(Reporter.NULL);
} catch (Exception e) {
String message = String.format("写文件文件[%s]时发生IO异常,请检查您的网络是否正常!", fileName);
LOG.error(message);
Path path = new Path(fileName);
deleteDir(path.getParent());
throw DataXException.asDataXException(HiveWriterErrorCode.Write_FILE_IO_ERROR, e);
}
}
public List getColumnNames(List columns) {
List columnNames = Lists.newArrayList();
for (Configuration eachColumnConf : columns) {
columnNames.add(eachColumnConf.getString(Key.NAME));
}
return columnNames;
}
public String getColumnInfo(List columns) {
StringBuilder str = new StringBuilder();
List columnNames = Lists.newArrayList();
for (int i = 0; i < columns.size(); i++) {
Configuration eachColumnConf = columns.get(i);
String name = eachColumnConf.getString(Key.NAME);//列名称
String type = eachColumnConf.getString(Key.TYPE);//列类型
str.append(name).append(" ").append(type);
if (i != (columns.size() - 1)) {
str.append(",");
}
}
return str.toString();
}
public String getColumnName(List columns) {
StringBuilder str = new StringBuilder();
List list = Lists.newArrayList();
for (int i = 0; i < columns.size(); i++) {
Configuration eachColumnConf = columns.get(i);
String name = eachColumnConf.getString(Key.NAME).toLowerCase();
list.add(name);
}
return String.join(",", list);
}
public List getColumnTypeInspectors(List columns) {
List columnTypeInspectors = Lists.newArrayList();
for (Configuration eachColumnConf : columns) {
SupportHiveDataType columnType = SupportHiveDataType.valueOf(eachColumnConf.getString(Key.TYPE).toUpperCase());
ObjectInspector objectInspector = null;
switch (columnType) {
case TINYINT:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Byte.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case SMALLINT:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Short.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case INT:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case BIGINT:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case FLOAT:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Float.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case DOUBLE:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Double.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case TIMESTAMP:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(java.sql.Timestamp.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case DATE:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(java.sql.Date.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case STRING:
case VARCHAR:
case CHAR:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(String.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case BOOLEAN:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Boolean.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
default:
throw DataXException
.asDataXException(
HiveWriterErrorCode.ILLEGAL_VALUE,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d]. 请修改表中该字段的类型或者不同步该字段.",
eachColumnConf.getString(Key.NAME),
eachColumnConf.getString(Key.TYPE)));
}
columnTypeInspectors.add(objectInspector);
}
return columnTypeInspectors;
}
public OrcSerde getOrcSerde(Configuration config) {
String fieldDelimiter = config.getString(Key.FIELD_DELIMITER);
String compress = config.getString(Key.COMPRESS);
String encoding = config.getString(Key.ENCODING);
OrcSerde orcSerde = new OrcSerde();
Properties properties = new Properties();
properties.setProperty("orc.bloom.filter.columns", fieldDelimiter);
properties.setProperty("orc.compress", compress);
properties.setProperty("orc.encoding.strategy", encoding);
orcSerde.initialize(conf, properties);
return orcSerde;
}
public static MutablePair, Boolean> transportOneRecord(
Record record, List columnsConfiguration,
TaskPluginCollector taskPluginCollector, Configuration config) {
MutablePair, Boolean> transportResult = new MutablePair, Boolean>();
transportResult.setRight(false);
List recordList = Lists.newArrayList();
int recordLength = record.getColumnNumber();
if (0 != recordLength) {
Column column;
for (int i = 0; i < recordLength; i++) {
column = record.getColumn(i);
//todo as method
if (null != column.getRawData()) {
String rowData = column.getRawData().toString();
SupportHiveDataType columnType = SupportHiveDataType.valueOf(
columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase());
//根据writer端类型配置做类型转换
try {
switch (columnType) {
case TINYINT:
recordList.add(Byte.valueOf(rowData));
break;
case SMALLINT:
recordList.add(Short.valueOf(rowData));
break;
case INT:
recordList.add(Integer.valueOf(rowData));
break;
case BIGINT:
recordList.add(column.asLong());
break;
case FLOAT:
recordList.add(Float.valueOf(rowData));
break;
case DOUBLE:
recordList.add(column.asDouble());
break;
case STRING:
case VARCHAR:
case CHAR:
recordList.add(column.asString());
break;
case BOOLEAN:
recordList.add(column.asBoolean());
break;
case DATE:
recordList.add(new java.sql.Date(column.asDate().getTime()));
break;
case TIMESTAMP:
recordList.add(new java.sql.Timestamp(column.asDate().getTime()));
break;
default:
throw DataXException
.asDataXException(
HiveWriterErrorCode.ILLEGAL_VALUE,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d]. 请修改表中该字段的类型或者不同步该字段.",
columnsConfiguration.get(i).getString(Key.NAME),
columnsConfiguration.get(i).getString(Key.TYPE)));
}
} catch (Exception e) {
// warn: 此处认为脏数据
String message = String.format(
"字段类型转换错误:你目标字段为[%s]类型,实际字段值为[%s].",
columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData().toString());
taskPluginCollector.collectDirtyRecord(record, message);
transportResult.setRight(true);
break;
}
} else {
// warn: it's all ok if nullFormat is null
//recordList.add(null);
// fix 写入hdfs的text格式时,需要指定NULL为N
String nullFormat = config.getString(Key.NULL_FORMAT);
if (nullFormat == null) {
recordList.add(null);
} else {
recordList.add(nullFormat);
}
}
}
}
transportResult.setLeft(recordList);
return transportResult;
}
}
HiveServer2ConnectUtil.class
package com.alibaba.datax.plugin.writer.hivewriter;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class HiveServer2ConnectUtil {
private static final Logger LOG = LoggerFactory.getLogger(HiveServer2ConnectUtil.class);
public static void main(String[] args) {
execHiveSql("hive", null,
"; use default; create table tmp_datax_hivereader_20220808_1659953092709 ROW FORMAT DELIMITED FIELDS TERMINATED BY '\u0001' STORED AS TEXTFILE as select id,username,password from default.t_user;",
"jdbc:hive2://10.252.92.4:10000");
}
public static boolean execHiveSql(String username, String password, String hiveSql, String hiveJdbcUrl) {
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
System.exit(1);
}
try {
LOG.info("hiveJdbcUrl:{}", hiveJdbcUrl);
LOG.info("username:{}", username);
LOG.info("password:{}", password);
Connection conn = DriverManager.getConnection(hiveJdbcUrl, username, password);
Statement stmt = conn.createStatement();
String[] hiveSqls = hiveSql.split(";");
for (int i = 0; i < hiveSqls.length; i++) {
if (StringUtils.isNotEmpty(hiveSqls[i])) {
stmt.execute(hiveSqls[i]);
}
}
return true;
} catch (SQLException sqlException) {
LOG.error(sqlException.getMessage(), sqlException);
return false;
}
}
}
package com.alibaba.datax.plugin.writer.hivewriter;
public class Key {
public final static String DEFAULT_FS = "defaultFS";
public final static String USERNAME = "username";
public final static String PASSWORD = "password";
public final static String HIVE_JDBC_URL = "hiveJdbcUrl";
public final static String DATABASE_NAME = "databaseName";//目标数据库名
public final static String TABLE_NAME = "tableName";//目标表名
public static final String WRITE_MODE = "writeMode";//表的写入方式insert、overwrite
public static final String COLUMN = "column";//目标表的列
public static final String NAME = "name";//目标表的字段名
public static final String TYPE = "type";//目标表的字段类型
public static final String PARTITION="partition";//分区字段
public static final String HIVE_DATABASE_TMP_LOCATION="tmpDatabasePath";//临时hive表所在数据库的location路径
public static final String HIVE_TMP_DATABASE="tmpDatabase";//临时HIVE表所在的数据库
public static final String TEMP_TABLE_NAME_PREFIX="tmpTableName";//临时HIVE表名前缀
public static final String TMP_FULL_NAME="fullFileName";//临时hive表 HDFS文件名称
public static final String HIVE_CMD = "hive_cmd"; //hive
public final static String HIVE_SQL_SET = "hive_sql_set";
public static final String HIVE_PRESQL = "hive_preSql";
public final static String HIVE_POSTSQL = "hive_postSql";
public final static String HIVE_TARGET_TABLE_COMPRESS_SQL="hive_target_table_compress_sql";
public static final String COMPRESS = "compress";//临时表压缩格式
public static final String ENCODING="encoding";
public static final String FIELD_DELIMITER="fieldDelimiter";
public static final String NULL_FORMAT = "nullFormat";
public static final String HAVE_KERBEROS = "haveKerberos";
public static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath";
public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
public static final String HADOOP_CONFIG = "hadoopConfig";
}
1、ERROR tool.ImportTool: Encountered IOException running import job: java.io.IOException: Cannot run program "hive": error=2, No such file or directory