Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
Expand Down Expand Up @@ -117,6 +118,7 @@
import org.apache.hadoop.fs.azurebfs.utils.CRC64;
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
import org.apache.hadoop.fs.azurebfs.utils.ListUtils;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.fs.impl.BackReference;
Expand Down Expand Up @@ -1272,7 +1274,7 @@ public String listStatus(final Path path, final String startFrom,
: generateContinuationTokenForNonXns(relativePath, startFrom);
}
}

List<FileStatus> fileStatusList = new ArrayList<>();
do {
try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) {
ListResponseData listResponseData = listingClient.listPath(relativePath,
Expand All @@ -1281,9 +1283,9 @@ public String listStatus(final Path path, final String startFrom,
AbfsRestOperation op = listResponseData.getOp();
perfInfo.registerResult(op.getResult());
continuation = listResponseData.getContinuationToken();
List<FileStatus> fileStatusListInCurrItr = listResponseData.getFileStatusList();
List<VersionedFileStatus> fileStatusListInCurrItr = listResponseData.getFileStatusList();
if (fileStatusListInCurrItr != null && !fileStatusListInCurrItr.isEmpty()) {
fileStatuses.addAll(fileStatusListInCurrItr);
fileStatusList.addAll(fileStatusListInCurrItr);
}
perfInfo.registerSuccess(true);
countAggregate++;
Expand All @@ -1296,6 +1298,14 @@ public String listStatus(final Path path, final String startFrom,
}
} while (shouldContinue);

if (listingClient instanceof AbfsBlobClient) {
fileStatuses.addAll(ListUtils.getUniqueListResult(fileStatusList));
LOG.debug("ListBlob API returned a total of {} elements including duplicates."
+ "Number of unique Elements are {}", fileStatusList.size(), fileStatuses.size());
} else {
fileStatuses.addAll(fileStatusList);
}

return continuation;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;

import org.w3c.dom.Document;
Expand All @@ -52,7 +51,6 @@

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -402,7 +400,7 @@ public ListResponseData listPath(final String relativePath, final boolean recurs
AbfsRestOperation pathStatus = this.getPathStatus(relativePath, tracingContext, null, false);
BlobListResultSchema listResultSchema = getListResultSchemaFromPathStatus(relativePath, pathStatus);
LOG.debug("ListBlob attempted on a file path. Returning file status.");
List<FileStatus> fileStatusList = new ArrayList<>();
List<VersionedFileStatus> fileStatusList = new ArrayList<>();
for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
fileStatusList.add(getVersionedFileStatusFromEntry(entry, uri));
}
Expand Down Expand Up @@ -1617,7 +1615,7 @@ public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri)
LOG.debug("ListBlobs listed {} blobs with {} as continuation token",
listResultSchema.paths().size(),
listResultSchema.getNextMarker());
return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, uri);
return filterRenamePendingFiles(listResultSchema, uri);
} catch (SAXException | IOException ex) {
throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, ex);
}
Expand Down Expand Up @@ -1917,39 +1915,23 @@ private List<AbfsHttpHeader> getMetadataHeadersList(final Hashtable<String, Stri
});

/**
* This is to handle duplicate listing entries returned by Blob Endpoint for
* implicit paths that also has a marker file created for them.
* This will retain entry corresponding to marker file and remove the BlobPrefix entry.
* This will also filter out all the rename pending json files in listing output.
* This will filter out all the rename pending json files in listing output.
* @param listResultSchema List of entries returned by Blob Endpoint.
* @param uri URI to be used for path conversion.
* @return List of entries after removing duplicates.
* @throws IOException if path conversion fails.
*/
@VisibleForTesting
public ListResponseData filterDuplicateEntriesAndRenamePendingFiles(
public ListResponseData filterRenamePendingFiles(
BlobListResultSchema listResultSchema, URI uri) throws IOException {
List<FileStatus> fileStatuses = new ArrayList<>();
List<VersionedFileStatus> fileStatuses = new ArrayList<>();
Map<Path, Integer> renamePendingJsonPaths = new HashMap<>();
TreeMap<String, BlobListResultEntrySchema> nameToEntryMap = new TreeMap<>();

for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
if (StringUtils.isNotEmpty(entry.eTag())) {
// This is a blob entry. It is either a file or a marker blob.
// In both cases we will add this.
if (isRenamePendingJsonPathEntry(entry)) {
renamePendingJsonPaths.put(entry.path(), entry.contentLength().intValue());
} else {
nameToEntryMap.put(entry.name(), entry);
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
}
if (isRenamePendingJsonPathEntry(entry)) {
renamePendingJsonPaths.put(entry.path(), entry.contentLength().intValue());
} else {
// This is a BlobPrefix entry. It is a directory with file inside
// This might have already been added as a marker blob.
if (!nameToEntryMap.containsKey(entry.name())) {
nameToEntryMap.put(entry.name(), entry);
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
}
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileAlreadyExistsException;
Expand Down Expand Up @@ -1353,7 +1352,7 @@ public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri)
LOG.debug("ListPath listed {} paths with {} as continuation token",
listResultSchema.paths().size(),
getContinuationFromResponse(result));
List<FileStatus> fileStatuses = new ArrayList<>();
List<VersionedFileStatus> fileStatuses = new ArrayList<>();
for (DfsListResultEntrySchema entry : listResultSchema.paths()) {
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,33 @@
import java.util.List;
import java.util.Map;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

/**
* This class is used to hold the response data for list operations.
* It contains a list of FileStatus objects, a map of rename pending JSON paths,
* It contains a list of VersionedFileStatus objects, a map of rename pending JSON paths,
* continuation token and the executed REST operation.
*/
public class ListResponseData {

private List<FileStatus> fileStatusList;
private List<VersionedFileStatus> fileStatusList;
private Map<Path, Integer> renamePendingJsonPaths;
private AbfsRestOperation executedRestOperation;
private String continuationToken;

/**
* Returns the list of FileStatus objects.
* @return the list of FileStatus objects
* Returns the list of VersionedFileStatus objects.
* @return the list of VersionedFileStatus objects
*/
public List<FileStatus> getFileStatusList() {
public List<VersionedFileStatus> getFileStatusList() {
return fileStatusList;
}

/**
* Sets the list of FileStatus objects.
* @param fileStatusList the list of FileStatus objects
* Sets the list of VersionedFileStatus objects.
* @param fileStatusList the list of VersionedFileStatus objects
*/
public void setFileStatusList(final List<FileStatus> fileStatusList) {
public void setFileStatusList(final List<VersionedFileStatus> fileStatusList) {
this.fileStatusList = fileStatusList;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;

import org.apache.hadoop.fs.FileStatus;

/**
* Utility class for List operations.
*/
public final class ListUtils {

private ListUtils() {
// Private constructor to prevent instantiation
}

/**
* Utility method to remove duplicates from a list of FileStatus.
* ListBlob API of blob endpoint can return duplicate entries.
* @param originalList prone to have duplicates
* @return rectified list with no duplicates.
*/
public static List<FileStatus> getUniqueListResult(List<FileStatus> originalList) {
if (originalList == null || originalList.isEmpty()) {
return originalList;
}

TreeMap<String, FileStatus> nameToEntryMap = new TreeMap<>();
String prefix = null;
List<FileStatus> rectifiedFileStatusList = new ArrayList<>();

for (FileStatus current : originalList) {
String fileName = current.getPath().getName();

if (prefix == null || !fileName.startsWith(prefix)) {
// Prefix pattern breaks here. Reset Map and prefix.
prefix = fileName;
nameToEntryMap.clear();
}

// Add the current entry if it is not already added.
if (!nameToEntryMap.containsKey(fileName)) {
nameToEntryMap.put(fileName, current);
rectifiedFileStatusList.add(current);
}
}

return rectifiedFileStatusList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.net.SocketTimeoutException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -90,6 +92,8 @@ public class ITestAzureBlobFileSystemListStatus extends
AbstractAbfsIntegrationTest {
private static final int TEST_FILES_NUMBER = 6000;
public static final String TEST_CONTINUATION_TOKEN = "continuation";
private static final int TOTAL_NUMBER_OF_PATHS = 11;
private static final int NUMBER_OF_UNIQUE_PATHS = 7;

public ITestAzureBlobFileSystemListStatus() throws Exception {
super();
Expand Down Expand Up @@ -197,7 +201,7 @@ public void testListPathParsingFailure() throws Exception {
Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
Mockito.doReturn(spiedClient).when(spiedStore).getClient();

Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterDuplicateEntriesAndRenamePendingFiles(any(), any());
Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterRenamePendingFiles(any(), any());
List<FileStatus> fileStatuses = new ArrayList<>();
AbfsDriverException ex = intercept(AbfsDriverException.class,
() -> {
Expand Down Expand Up @@ -532,6 +536,87 @@ public void testEmptyContinuationToken() throws Exception {
.describedAs("Listing Size Not as expected").hasSize(1);
}

/**
* Test to verify that listStatus returns the correct file status
* after removing duplicates across multiple iterations of list blobs.
* Also verifies that in case of non-empty explicit dir,
* entry corresponding to marker blob is returned.
* @throws Exception if test fails.
*/
@Test
public void testDuplicateEntriesAcrossListBlobIterations() throws Exception {
AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
store.getAbfsConfiguration().setListMaxResults(1);
AbfsClient client = Mockito.spy(store.getClient());

Mockito.doReturn(store).when(fs).getAbfsStore();
Mockito.doReturn(client).when(store).getClient();

/*
* Following entries will be created inside the root path.
* 0. /A - implicit directory without any marker blob
* 1. /a - marker file for explicit directory
* 2. /a/file1 - normal file inside explicit directory
* 3. /b - normal file inside root
* 4. /c - marker file for explicit directory
* 5. /c.bak - marker file for explicit directory
* 6. /c.bak/file2 - normal file inside explicit directory
* 7. /c/file3 - normal file inside explicit directory
* 8. /d - implicit directory
* 9. /e - marker file for explicit directory
* 10. /e/file4 - normal file inside explicit directory
*/
// Create Path 0
createAzCopyFolder(new Path("/A"));

// Create Path 1 and 2.
fs.create(new Path("/a/file1"));

// Create Path 3
fs.create(new Path("/b"));

// Create Path 4 and 7
fs.create(new Path("/c/file3"));

// Create Path 5 and 6
fs.create(new Path("/c.bak/file2"));

// Create Path 8
createAzCopyFolder(new Path("/d"));

// Create Path 9 and 10
fs.create(new Path("/e/file4"));

FileStatus[] fileStatuses = fs.listStatus(new Path(ROOT_PATH));

// Assert that client.listPath was called 11 times.
// This will assert server returned 11 entries in total.
Mockito.verify(client, Mockito.times(TOTAL_NUMBER_OF_PATHS))
.listPath(eq(ROOT_PATH), eq(false), eq(1), any(), any(), any());

// Assert that after duplicate removal, only 7 unique entries are returned.
Assertions.assertThat(fileStatuses.length)
.describedAs("List size is not expected").isEqualTo(NUMBER_OF_UNIQUE_PATHS);

// Assert that for duplicates, entry corresponding to marker blob is returned.
assertImplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(new Path("/A")));
assertExplicitDirectoryFileStatus(fileStatuses[1], fs.makeQualified(new Path("/a")));
assertFilePathFileStatus(fileStatuses[2], fs.makeQualified(new Path("/b")));
assertExplicitDirectoryFileStatus(fileStatuses[3], fs.makeQualified(new Path("/c")));
assertExplicitDirectoryFileStatus(fileStatuses[4], fs.makeQualified(new Path("/c.bak")));
assertImplicitDirectoryFileStatus(fileStatuses[5], fs.makeQualified(new Path("/d")));
assertExplicitDirectoryFileStatus(fileStatuses[6], fs.makeQualified(new Path("/e")));

// Assert that there are no duplicates in the returned file statuses.
Set<Path> uniquePaths = new HashSet<>();
for (FileStatus fileStatus : fileStatuses) {
Assertions.assertThat(uniquePaths.add(fileStatus.getPath()))
.describedAs("Duplicate Entries found")
.isTrue();
}
}

private void assertFilePathFileStatus(final FileStatus fileStatus,
final Path qualifiedPath) {
Assertions.assertThat(fileStatus.getPath())
Expand Down
Loading