Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Review Comments Fixes
  • Loading branch information
Vikram Agrawal committed Jul 8, 2019
commit 3ad88eb4154a347cf9cac1ce3943df1cb85191fc
161 changes: 92 additions & 69 deletions core/src/main/java/org/apache/spark/io/FileUtility.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
*/
package org.apache.spark.io;

import org.apache.commons.compress.archivers.ArchiveException;
import org.apache.commons.compress.archivers.ArchiveStreamFactory;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
Expand All @@ -24,79 +22,104 @@

public class FileUtility {

/**
* Untar an input file into an output file.
*
* The output file is created in the output folder, having the same name as
* the input file, minus the '.tar' extension.
*
* @param inputFile the input .tar file
* @throws IOException
*
* @throws ArchiveException
*/
public static void unTar(final File inputFile)
throws IOException, ArchiveException {
/**
* Extract an input tar file into an output files and directories.
* @param inputTarFileLoc the input file location for the tar file
* @param destDirLoc destination for the extracted files
*
* @throws IOException
* @throws IllegalStateException
*/
public static final String ENCODING = "utf-8";

String outputDir = inputFile.getAbsolutePath().split(".tar")[0];
File outputTarDir = new File(outputDir);
outputTarDir.mkdir();
final InputStream is = new FileInputStream(inputFile);
final TarArchiveInputStream debInputStream = (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream(
"tar", is);
TarArchiveEntry entry = null;
while ((entry = (TarArchiveEntry) debInputStream.getNextEntry()) != null) {
final File outputFile = new File(outputDir, entry.getName());
if (entry.isDirectory()) {
if (!outputFile.exists()) {
if (!outputFile.mkdirs()) {
throw new IllegalStateException(String.format(
"Couldn't create directory %s.", outputFile.getAbsolutePath()));
}
}
} else {
final OutputStream outputFileStream = new FileOutputStream(outputFile);
IOUtils.copy(debInputStream, outputFileStream);
outputFileStream.close();
}
}
debInputStream.close();
public static void extractTarFile(String inputTarFileLoc, String destDirLoc)
throws IOException, IllegalStateException {
File inputFile = new File(inputTarFileLoc);
if (!inputTarFileLoc.endsWith("tar")) {
throw new IllegalStateException(String.format(
"Input File %s should end with tar extension.", inputTarFileLoc));
}
File destDir = new File(destDirLoc);
if (destDir.exists() && !destDir.delete()) {
throw new IllegalStateException(String.format(
"Couldn't delete the existing destination directory %s ", destDirLoc));
} else if (!destDir.mkdir()) {
throw new IllegalStateException(String.format(
"Couldn't create directory %s ", destDirLoc));
}

public static void createTarFile(String source, String destFileName) throws Exception {
TarArchiveOutputStream tarOs = null;
File f = new File(destFileName);
if (f.exists()) {
f.delete();
}
try {
FileOutputStream fos = new FileOutputStream(destFileName);
tarOs = (TarArchiveOutputStream) new ArchiveStreamFactory().createArchiveOutputStream("tar", fos);
tarOs = new TarArchiveOutputStream(fos);
File folder = new File(source);
File[] fileNames = folder.listFiles();
for(File file : fileNames){
TarArchiveEntry tar_file = new TarArchiveEntry(file.getName());
tar_file.setSize(file.length());
tarOs.putArchiveEntry(tar_file);
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
IOUtils.copy(bis, tarOs);
bis.close();
tarOs.closeArchiveEntry();
}
} catch (IOException e) {
final InputStream is = new FileInputStream(inputFile);
final TarArchiveInputStream debInputStream = new TarArchiveInputStream(is, ENCODING);
OutputStream outputFileStream = null;
try {
TarArchiveEntry entry;
while ((entry = (TarArchiveEntry) debInputStream.getNextEntry()) != null) {
final File outputFile = new File(destDirLoc, entry.getName());
if (entry.isDirectory()) {
if (!outputFile.exists() && !outputFile.mkdirs()) {
throw new IllegalStateException(String.format(
"createTarFile failed with exception %s.", e.getMessage()));
} finally {
try {
tarOs.finish();
tarOs.close();
} catch (IOException e) {
throw new IllegalStateException(String.format(
"createTarFile failed with exception %s.", e.getMessage()));
}
"Couldn't create directory %s.", outputFile.getAbsolutePath()));
}
} else {
outputFileStream = new FileOutputStream(outputFile);
IOUtils.copy(debInputStream, outputFileStream);
outputFileStream.close();
outputFileStream = null;
}
}
} catch (IOException e){
throw new IllegalStateException(String.format(
"extractTarFile failed with exception %s.", e.getMessage()));
} finally {
debInputStream.close();
if (outputFileStream != null) {
outputFileStream.close();
}
}
}

/**
* create a tar file for input source directory location .
* @param source the source directory location
* @param destFileLoc destination of the created tarball
*
* @throws IOException
* @throws IllegalStateException
*/
public static void createTarFile(String source, String destFileLoc)
throws IllegalStateException, IOException {
TarArchiveOutputStream tarOs = null;
File f = new File(destFileLoc);
if (f.exists() && !f.delete()) {
throw new IllegalStateException(String.format(
"Couldn't delete the destination file location %s", destFileLoc));
}
BufferedInputStream bis = null;
try {
FileOutputStream fos = new FileOutputStream(destFileLoc);
tarOs = new TarArchiveOutputStream(fos, ENCODING);
File folder = new File(source);
File[] fileNames = folder.listFiles();
for(File file : fileNames){
TarArchiveEntry tar_file = new TarArchiveEntry(file.getName());
tar_file.setSize(file.length());
tarOs.putArchiveEntry(tar_file);
bis = new BufferedInputStream(new FileInputStream(file));
IOUtils.copy(bis, tarOs);
bis.close();
bis = null;
tarOs.closeArchiveEntry();
}
} catch (IOException e) {
throw new IllegalStateException(String.format(
"createTarFile failed with exception %s.", e.getMessage()));
} finally {
tarOs.finish();
tarOs.close();
if (bis != null) {
bis.close();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,15 @@ class RocksDbInstance(keySchema: StructType, valueSchema: StructType, identifier
}

def open(path: String, conf: Map[String, String], readOnly: Boolean): Unit = {
verify(db == null, "Another rocksDb instance is already actve")
require(db == null, "Another rocksDb instance is already active")
try {
setOptions(conf)
db = readOnly match {
case true =>
options.setCreateIfMissing(false)
RocksDB.openReadOnly(options, path)
case false =>
options.setCreateIfMissing(true)
RocksDB.open(options, path)
db = if (readOnly) {
options.setCreateIfMissing(false)
RocksDB.openReadOnly(options, path)
} else {
options.setCreateIfMissing(true)
RocksDB.open(options, path)
}
dbPath = path
} catch {
Expand All @@ -69,7 +68,7 @@ class RocksDbInstance(keySchema: StructType, valueSchema: StructType, identifier
}

def get(key: UnsafeRow): UnsafeRow = {
verify(isOpen(), "Open rocksDb instance before any operation")
require(isOpen(), "Open rocksDb instance before any operation")
Option(db.get(readOptions, key.getBytes)) match {
case Some(valueInBytes) =>
val value = new UnsafeRow(valueSchema.fields.length)
Expand All @@ -80,12 +79,12 @@ class RocksDbInstance(keySchema: StructType, valueSchema: StructType, identifier
}

def put(key: UnsafeRow, value: UnsafeRow): Unit = {
verify(isOpen(), "Open rocksDb instance before any operation")
require(isOpen(), "Open rocksDb instance before any operation")
db.put(key.getBytes, value.getBytes)
}

def remove(key: UnsafeRow): Unit = {
verify(isOpen(), "Open rocksDb instance before any operation")
require(isOpen(), "Open rocksDb instance before any operation")
db.delete(key.getBytes)
}

Expand All @@ -106,7 +105,7 @@ class RocksDbInstance(keySchema: StructType, valueSchema: StructType, identifier
}

def iterator(closeDbOnCompletion: Boolean): Iterator[UnsafeRowPair] = {
verify(isOpen(), "Open rocksDb instance before any operation")
require(isOpen(), "Open rocksDb instance before any operation")
Option(db.getSnapshot) match {
case Some(snapshot) =>
logDebug(s"Inside rockdDB iterator function")
Expand Down Expand Up @@ -153,7 +152,7 @@ class RocksDbInstance(keySchema: StructType, valueSchema: StructType, identifier
}

def printStats: Unit = {
verify(isOpen(), "Open rocksDb instance before any operation")
require(isOpen(), "Open rocksDb instance before any operation")
try {
val stats = db.getProperty("rocksdb.stats")
logInfo(s"Stats = $stats")
Expand Down Expand Up @@ -235,7 +234,7 @@ class RocksDbInstance(keySchema: StructType, valueSchema: StructType, identifier
}

def createCheckpoint(rocksDb: RocksDB, dir: String): Unit = {
verify(isOpen(), "Open rocksDb instance before any operation")
require(isOpen(), "Open rocksDb instance before any operation")
val (result, elapsedMs) = Utils.timeTakenMs {
val c = Checkpoint.create(rocksDb)
val f: File = new File(dir)
Expand All @@ -249,7 +248,7 @@ class RocksDbInstance(keySchema: StructType, valueSchema: StructType, identifier
}

def createBackup(dir: String): Unit = {
verify(isOpen(), "Open rocksDb instance before any operation")
require(isOpen(), "Open rocksDb instance before any operation")
val (result, elapsedMs) = Utils.timeTakenMs {
val backupableDBOptions = new BackupableDBOptions(dir)
backupableDBOptions.setDestroyOldData(true)
Expand Down Expand Up @@ -286,8 +285,8 @@ class OptimisticTransactionDbInstance(
}

override def open(path: String, conf: Map[String, String], readOnly: Boolean): Unit = {
verify(otdb == null, "Another OptimisticTransactionDbInstance instance is already actve")
verify(readOnly == false, "Cannot open OptimisticTransactionDbInstance in Readonly mode")
require(otdb == null, "Another OptimisticTransactionDbInstance instance is already actve")
require(readOnly == false, "Cannot open OptimisticTransactionDbInstance in Readonly mode")
try {
setOptions(conf)
options.setCreateIfMissing(true)
Expand All @@ -303,7 +302,7 @@ class OptimisticTransactionDbInstance(
}

def startTransactions(): Unit = {
verify(isOpen(), "Open OptimisticTransactionDbInstance before performing any operation")
require(isOpen(), "Open OptimisticTransactionDbInstance before performing any operation")
Option(txn) match {
case None =>
val optimisticTransactionOptions = new OptimisticTransactionOptions()
Expand All @@ -315,17 +314,17 @@ class OptimisticTransactionDbInstance(
}

override def put(key: UnsafeRow, value: UnsafeRow): Unit = {
verify(txn != null, "Start Transaction before inserting any key")
require(txn != null, "Start Transaction before inserting any key")
txn.put(key.getBytes, value.getBytes)
}

override def remove(key: UnsafeRow): Unit = {
verify(txn != null, "Start Transaction before deleting any key")
require(txn != null, "Start Transaction before deleting any key")
txn.delete(key.getBytes)
}

override def get(key: UnsafeRow): UnsafeRow = {
verify(txn != null, "Start Transaction before fetching any key-value")
require(txn != null, "Start Transaction before fetching any key-value")
Option(txn.get(readOptions, key.getBytes)) match {
case Some(valueInBytes) =>
val value = new UnsafeRow(valueSchema.fields.length)
Expand All @@ -337,7 +336,7 @@ class OptimisticTransactionDbInstance(
}

override def commit(backupPath: Option[String] = None): Unit = {
verify(txn != null, "Start Transaction before fetching any key-value")
require(txn != null, "Start Transaction before fetching any key-value")
// printTrxStats
try {
val file = new File(dbPath, identifier.toUpperCase(Locale.ROOT))
Expand All @@ -354,24 +353,24 @@ class OptimisticTransactionDbInstance(
}

def printTrxStats(): Unit = {
verify(txn != null, "No open Transaction")
require(txn != null, "No open Transaction")
logInfo(s"""
| deletes = ${txn.getNumDeletes}
| numKeys = ${txn.getNumKeys}
| puts = ${txn.getNumPuts}
| time = ${txn.getElapsedTime}
| deletes = ${txn.getNumDeletes}
| numKeys = ${txn.getNumKeys}
| puts = ${txn.getNumPuts}
| time = ${txn.getElapsedTime}
""".stripMargin)
}

override def abort(): Unit = {
verify(txn != null, "No Transaction to abort")
require(txn != null, "No Transaction to abort")
txn.rollbackToSavePoint()
txn.close()
txn = null
}

override def close(): Unit = {
verify(isOpen(), "No DB to close")
require(isOpen(), "No DB to close")
readOptions.close()
writeOptions.close()
logDebug("Closing the transaction db")
Expand All @@ -380,8 +379,8 @@ class OptimisticTransactionDbInstance(
}

override def iterator(closeDbOnCompletion: Boolean): Iterator[UnsafeRowPair] = {
verify(txn != null, "Transaction is not set")
verify(
require(txn != null, "Transaction is not set")
require(
closeDbOnCompletion == false,
"Cannot close a DB without aborting/commiting the transactions")
val readOptions = new ReadOptions()
Expand Down Expand Up @@ -425,10 +424,4 @@ object RocksDbInstance {
}
}

def verify(condition: => Boolean, msg: String): Unit = {
if (!condition) {
throw new IllegalStateException(msg)
}
}

}
Loading