vlambda博客
学习文章列表

Java读取HDFS上的ORC格式文件

Java读取HDFS上的ORC格式文件

不想引入hive的一堆依赖,此处使用org.apache.orc下的orc-cor-nohive版本jar包。
废话少说,直接上样例代码;个别地方需要根据自己的使用场景调整包的引用等。

1. gradle依赖配置


# build.gradle文件
dependencies {
...
...
implementation 'org.apache.orc:orc-core:1.5.5'
}

2. Java代码

/**
* 读取HDFS文件(ORC格式);
* 由于该方法会将数据全部加载至缓存,所以不适合读取较大的文件
*
* @param hdfsPath HDFS路径(到文件名)
* @return 数据列表
* @throws IOException
*/

public synchronized List<String> readOrcOfflineLabelDataFromHdfs(String hdfsPath) throws IOException {
List<String> retList = new ArrayList<String>();
// FileSystem fs = HdfsFileSystem.get().getFileSystem();
RecordReader rowIterator = null;
Reader reader = null;
try {
reader = OrcFile.createReader(new Path(hdfsPath), OrcFile.readerOptions(HdfsFileSystem.get().getConfiguration()));
log.debug("---file schema: " + reader.getSchema());
log.debug("---Row count: " + reader.getNumberOfRows());

// Pick the schema we want to read using schema evolution
// TypeDescription readSchema =
// TypeDescription.fromString("struct<z:int,y:string,x:bigint>");
TypeDescription readSchema = TypeDescription.createStruct()
.addField("openId", TypeDescription.createString())
.addField("value", TypeDescription.createString())
.addField("ruleId", TypeDescription.createString())
.addField("dataType", TypeDescription.createString())
.addField("hour", TypeDescription.createString())
.addField("count", TypeDescription.createLong());
// Read the row data

VectorizedRowBatch batch = readSchema.createRowBatch();
rowIterator = reader.rows(reader.options().schema(readSchema));
while (rowIterator.nextBatch(batch)) {
BytesColumnVector openIdVector = (BytesColumnVector) batch.cols[0];
BytesColumnVector valueVector = (BytesColumnVector) batch.cols[1];
BytesColumnVector ruleIdVector = (BytesColumnVector) batch.cols[2];
BytesColumnVector dataTypeVector = (BytesColumnVector) batch.cols[3];
BytesColumnVector hourVector = (BytesColumnVector) batch.cols[4];
LongColumnVector countVector = (LongColumnVector) batch.cols[5];
for(int row=0; row < batch.size; ++row) {
String openId = new String(openIdVector.vector[row], openIdVector.start[row], openIdVector.length[row]);
String(valueVector.vector[row]);
String dataType = new String(dataTypeVector.vector[row], dataTypeVector.start[row], dataTypeVector.length[row]);
// log.debug("---dataType: " + dataType);
if(StringUtils.isBlank(openId) || StringUtils.isBlank(dataType)) {
log.debug("---continue..");
continue;
}
}
}

} catch (IOException e) {
log.warn("---readOrcDataFrom hdfs fail.", e);
} catch (Exception e) {
log.warn("---readOrcDataFrom hdfs fail.", e);
} finally {
if(rowIterator != null) {
rowIterator.close();
}
}
log.debug("---retList.size(): " + retList.size());
return retList;
}

附上获取hdfs configuartion的代码:


import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;


public class HdfsFileSystem {

private static Log log = LogFactory.getLog(HdfsFileSystem.class);
private static HdfsFileSystem instance = new HdfsFileSystem();

public static HdfsFileSystem get() {
return instance;
}

public static Configuration createConf() {
Configuration conf = new Configuration();
String hadoopHome = System.getenv("HADOOP_HOME");
if(StringUtils.isBlank(hadoopHome)) {
hadoopHome = System.getProperty("HADOOP_HOME");
}
if(StringUtils.isBlank(hadoopHome)) {
return null;
}
log.debug("HADOOP_HOME = " + hadoopHome);
String xmlPath = hadoopHome + File.separator + "etc" + File.separator + "hadoop" + File.separator;
Path coreSitePath = new Path(xmlPath + "core-site.xml");
Path hdfsSitePath = new Path(xmlPath + "hdfs-site.xml");
Path mapredSitePath = new Path(xmlPath + "mapred-site.xml");
Path yarnSitePath = new Path(xmlPath + "yarn-site.xml");
conf.addResource(coreSitePath);
conf.addResource(hdfsSitePath);
conf.addResource(mapredSitePath);
conf.addResource(yarnSitePath);
return conf;
}


private Configuration conf;
private FileSystem fs;

private HdfsFileSystem() {
Configuration conf = createConf();
if(conf == null) {
return;
}
try {
//conf.writeXml(System.out);
this.conf = conf;
this.fs = FileSystem.get(conf);

//String defaultFS = conf.get(FileSystem.FS_DEFAULT_NAME_KEY, "");
//log.info("defaultFS = " + defaultFS);
} catch (Exception e) {
log.warn(null, e);
}
}

public FileSystem getFileSystem() {
return this.fs;
}

public Configuration getConfiguration() {
return this.conf;
}

/**
* 输入路径是否为空
* @param str
* @return
*/

public boolean isEmpty(String str) {
try {
if(fs == null) {
return false; // 没有得到,先放过了
}

Path path = new Path(str);
if(!fs.exists(path)) {
return true;
}
FileStatus[] listStatus = fs.listStatus(path);
// 当根目录没有文件的时候会进入if里面
if (listStatus.length == 0) {
return true;
}
if (listStatus.length == 1 && listStatus[0].getPath().getName().equalsIgnoreCase("_SUCCESS")) {
return true;
}

} catch(FileNotFoundException e) {
log.warn(null, e);
return true;
} catch (IOException e) {
log.warn(null, e);
}
return false;
}

public void print() {

if(this.conf == null) {
throw new ApplicationException("failed to init hadoop env.", ApplicationException.CODE_FAILURE_UNKNOWN);
}

URI defaultFsUri = URI.create(this.conf.get(FileSystem.FS_DEFAULT_NAME_KEY, ""));
log.debug("defaultFsUri = " + defaultFsUri);
if (defaultFsUri.getScheme() == null) {
return;
}
UserGroupInformation currentUser = null;
AbstractFileSystem defaultAfs = null;
try {
currentUser = UserGroupInformation.getCurrentUser();
log.debug("currentUser = " + currentUser);
defaultAfs = getAbstractFileSystem(currentUser, defaultFsUri, this.conf);
log.debug("defaultAfs = " + defaultAfs.getClass().getCanonicalName());
} catch (UnsupportedFileSystemException ex) {
log.warn(null, ex);
} catch (IOException ex) {
log.warn(null, ex);
}
}

private AbstractFileSystem getAbstractFileSystem(UserGroupInformation user, final URI uri, final Configuration conf)
throws UnsupportedFileSystemException, IOException {
try {
return user.doAs(new PrivilegedExceptionAction<AbstractFileSystem>() {
@Override
public AbstractFileSystem run() throws UnsupportedFileSystemException {
return AbstractFileSystem.get(uri, conf);
}
});
} catch (InterruptedException ex) {
throw new IOException("Failed to get the AbstractFileSystem for path: " + uri, ex);
}
}

public void close() {
if(this.fs == null) {
return;
}
try {
this.fs.close();
} catch (IOException e) {
log.warn(null, e);
}
}
}

3. 参考资料

  1. 网址:https://www.jianshu.com/p/9ab341ba9193

  2. 官方example源代码:orc-examples-1.5.5-sources.jar