Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.hadoop.hdfs.protocolPB;

import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext;
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
Expand All @@ -32,9 +32,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public QuotaUsage getQuotaUsage(String path) throws IOException {
* @return quota usage for each remote location.
* @throws IOException If the quota system is disabled.
*/
Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
protected Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
throws IOException {
rpcServer.checkOperation(OperationCategory.READ);
if (!router.isQuotaEnabled()) {
Expand Down Expand Up @@ -252,8 +252,9 @@ protected List<RemoteLocation> getValidQuotaLocations(String path)
* @param path Federation path of the results.
* @param results Quota query result.
* @return Aggregated Quota.
* @throws IOException If the quota system is disabled.
*/
QuotaUsage aggregateQuota(String path,
protected QuotaUsage aggregateQuota(String path,
Map<RemoteLocation, QuotaUsage> results) throws IOException {
long nsCount = 0;
long ssCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler.HANDLER_IDENTIFIER;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;

import java.io.IOException;
import java.net.InetSocketAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;

/**
* Service to periodically update the {@link RouterQuotaUsage}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1940,7 +1940,7 @@ private String getCurrentFairnessPolicyControllerClassName() {
* @return A prioritized list of NNs to use for communication.
* @throws IOException If a NN cannot be located for the nameservice ID.
*/
protected List<? extends FederationNamenodeContext> getOrderedNamenodes(String nsId,
public List<? extends FederationNamenodeContext> getOrderedNamenodes(String nsId,
boolean isObserverRead) throws IOException {
final List<? extends FederationNamenodeContext> namenodes;

Expand Down Expand Up @@ -2047,39 +2047,39 @@ protected static class ExecutionStatus {
private static final byte SHOULD_USE_OBSERVER_BIT = 2;
private static final byte COMPLETE_BIT = 4;

ExecutionStatus() {
public ExecutionStatus() {
this(false, false);
}

ExecutionStatus(boolean failOver, boolean shouldUseObserver) {
public ExecutionStatus(boolean failOver, boolean shouldUseObserver) {
this.flag = 0;
setFailOver(failOver);
setShouldUseObserver(shouldUseObserver);
setComplete(false);
}

private void setFailOver(boolean failOver) {
public void setFailOver(boolean failOver) {
flag = (byte) (failOver ? (flag | FAIL_OVER_BIT) : (flag & ~FAIL_OVER_BIT));
}

private void setShouldUseObserver(boolean shouldUseObserver) {
public void setShouldUseObserver(boolean shouldUseObserver) {
flag = (byte) (shouldUseObserver ?
(flag | SHOULD_USE_OBSERVER_BIT) : (flag & ~SHOULD_USE_OBSERVER_BIT));
}

void setComplete(boolean complete) {
public void setComplete(boolean complete) {
flag = (byte) (complete ? (flag | COMPLETE_BIT) : (flag & ~COMPLETE_BIT));
}

boolean isFailOver() {
public boolean isFailOver() {
return (flag & FAIL_OVER_BIT) != 0;
}

boolean isShouldUseObserver() {
public boolean isShouldUseObserver() {
return (flag & SHOULD_USE_OBSERVER_BIT) != 0;
}

boolean isComplete() {
public boolean isComplete() {
return (flag & COMPLETE_BIT) != 0;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -75,9 +75,10 @@
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.CatchFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -686,7 +687,7 @@ public InetSocketAddress getRpcAddress() {
* client requests.
* @throws UnsupportedOperationException If the operation is not supported.
*/
void checkOperation(OperationCategory op, boolean supported)
public void checkOperation(OperationCategory op, boolean supported)
throws StandbyException, UnsupportedOperationException {
checkOperation(op);

Expand Down Expand Up @@ -1032,7 +1033,7 @@ RemoteLocation getCreateLocation(
* @return The remote location for this file.
* @throws IOException If the file has no creation location.
*/
RemoteLocation getCreateLocationAsync(
public RemoteLocation getCreateLocationAsync(
final String src, final List<RemoteLocation> locations)
throws IOException {

Expand Down Expand Up @@ -1995,7 +1996,7 @@ public Long getNextSPSPath() throws IOException {
* @return Prioritized list of locations in the federated cluster.
* @throws IOException If the location for this path cannot be determined.
*/
protected List<RemoteLocation> getLocationsForPath(String path,
public List<RemoteLocation> getLocationsForPath(String path,
boolean failIfLocked) throws IOException {
return getLocationsForPath(path, failIfLocked, true);
}
Expand All @@ -2010,7 +2011,7 @@ protected List<RemoteLocation> getLocationsForPath(String path,
* @return Prioritized list of locations in the federated cluster.
* @throws IOException If the location for this path cannot be determined.
*/
protected List<RemoteLocation> getLocationsForPath(String path,
public List<RemoteLocation> getLocationsForPath(String path,
boolean failIfLocked, boolean needQuotaVerify) throws IOException {
try {
if (failIfLocked) {
Expand Down Expand Up @@ -2227,9 +2228,9 @@ private MountTable getMountTable(final String path){
* mount entry.
* @param path The path on which the operation need to be invoked.
* @return true if the call is supposed to invoked on all locations.
* @throws IOException
* @throws IOException If an I/O error occurs.
*/
boolean isInvokeConcurrent(final String path) throws IOException {
public boolean isInvokeConcurrent(final String path) throws IOException {
if (subclusterResolver instanceof MountTableResolver) {
MountTableResolver mountTableResolver =
(MountTableResolver) subclusterResolver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
class RouterStateIdContext implements AlignmentContext {
public class RouterStateIdContext implements AlignmentContext {

private final HashSet<String> coordinatedMethods;
/**
Expand Down Expand Up @@ -93,6 +93,8 @@ class RouterStateIdContext implements AlignmentContext {

/**
* Adds the {@link #namespaceIdMap} to the response header that will be sent to a client.
*
* @param headerBuilder the response header that will be sent to a client.
*/
public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder) {
if (namespaceIdMap.isEmpty()) {
Expand All @@ -110,7 +112,8 @@ public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder)
}

public LongAccumulator getNamespaceStateId(String nsId) {
return namespaceIdMap.computeIfAbsent(nsId, key -> new LongAccumulator(Math::max, Long.MIN_VALUE));
return namespaceIdMap.computeIfAbsent(nsId,
key -> new LongAccumulator(Math::max, Long.MIN_VALUE));
}

public List<String> getNamespaces() {
Expand All @@ -127,6 +130,9 @@ public void removeNamespaceStateId(String nsId) {

/**
* Utility function to parse routerFederatedState field in RPC headers.
*
* @param byteString the byte string of routerFederatedState.
* @return the router federated state map.
*/
public static Map<String, Long> getRouterFederatedStateMap(ByteString byteString) {
if (byteString != null) {
Expand All @@ -148,7 +154,8 @@ public static long getClientStateIdFromCurrentCall(String nsId) {
if (call != null) {
ByteString callFederatedNamespaceState = call.getFederatedNamespaceState();
if (callFederatedNamespaceState != null) {
Map<String, Long> clientFederatedStateIds = getRouterFederatedStateMap(callFederatedNamespaceState);
Map<String, Long> clientFederatedStateIds =
getRouterFederatedStateMap(callFederatedNamespaceState);
clientStateID = clientFederatedStateIds.getOrDefault(nsId, Long.MIN_VALUE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
package org.apache.hadoop.hdfs.server.federation.router.async;

import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
Expand All @@ -25,7 +25,12 @@
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
import org.apache.hadoop.hdfs.server.federation.router.ErasureCoding;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
import org.apache.hadoop.hdfs.server.namenode.NameNode;

import java.io.IOException;
Expand All @@ -36,9 +41,15 @@
import java.util.Set;

import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;

/**
* Provides asynchronous operations for erasure coding in HDFS Federation.
* This class extends {@link org.apache.hadoop.hdfs.server.federation.router.ErasureCoding}
* and overrides its methods to perform erasure coding operations in a non-blocking manner,
* allowing for concurrent execution and improved performance.
*/
public class AsyncErasureCoding extends ErasureCoding {
/** RPC server to receive client calls. */
private final RouterRpcServer rpcServer;
Expand All @@ -54,6 +65,17 @@ public AsyncErasureCoding(RouterRpcServer server) {
this.namenodeResolver = this.rpcClient.getNamenodeResolver();
}

/**
* Asynchronously get an array of all erasure coding policies.
* This method checks the operation category and then invokes the
* getErasureCodingPolicies method concurrently across all namespaces.
* <p>
* The results are merged and returned as an array of ErasureCodingPolicyInfo.
*
* @return Array of ErasureCodingPolicyInfo.
* @throws IOException If an I/O error occurs.
*/
@Override
public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
Expand All @@ -70,6 +92,16 @@ public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
return asyncReturn(ErasureCodingPolicyInfo[].class);
}

/**
* Asynchronously get the erasure coding codecs available.
* This method checks the operation category and then invokes the
* getErasureCodingCodecs method concurrently across all namespaces.
* <p>
* The results are merged into a single map of codec names to codec properties.
*
* @return Map of erasure coding codecs.
* @throws IOException If an I/O error occurs.
*/
@Override
public Map<String, String> getErasureCodingCodecs() throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
Expand Down Expand Up @@ -97,6 +129,17 @@ public Map<String, String> getErasureCodingCodecs() throws IOException {
return asyncReturn(Map.class);
}

/**
* Asynchronously add an array of erasure coding policies.
* This method checks the operation category and then invokes the
* addErasureCodingPolicies method concurrently across all namespaces.
* <p>
* The results are merged and returned as an array of AddErasureCodingPolicyResponse.
*
* @param policies Array of erasure coding policies to add.
* @return Array of AddErasureCodingPolicyResponse.
* @throws IOException If an I/O error occurs.
*/
@Override
public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
ErasureCodingPolicy[] policies) throws IOException {
Expand All @@ -117,6 +160,17 @@ public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
return asyncReturn(AddErasureCodingPolicyResponse[].class);
}

/**
* Asynchronously get the erasure coding policy for a given source path.
* This method checks the operation category and then invokes the
* getErasureCodingPolicy method sequentially for the given path.
* <p>
* The result is returned as an ErasureCodingPolicy object.
*
* @param src Source path to get the erasure coding policy for.
* @return ErasureCodingPolicy for the given path.
* @throws IOException If an I/O error occurs.
*/
@Override
public ErasureCodingPolicy getErasureCodingPolicy(String src)
throws IOException {
Expand All @@ -136,6 +190,17 @@ public ErasureCodingPolicy getErasureCodingPolicy(String src)
return asyncReturn(ErasureCodingPolicy.class);
}

/**
* Asynchronously get the EC topology result for the given policies.
* This method checks the operation category and then invokes the
* getECTopologyResultForPolicies method concurrently across all namespaces.
* <p>
* The results are merged and the first unsupported result is returned.
*
* @param policyNames Array of policy names to check.
* @return ECTopologyVerifierResult for the policies.
* @throws IOException If an I/O error occurs.
*/
@Override
public ECTopologyVerifierResult getECTopologyResultForPolicies(
String[] policyNames) throws IOException {
Expand All @@ -162,6 +227,16 @@ public ECTopologyVerifierResult getECTopologyResultForPolicies(
return asyncReturn(ECTopologyVerifierResult.class);
}

/**
* Asynchronously get the erasure coding block group statistics.
* This method checks the operation category and then invokes the
* getECBlockGroupStats method concurrently across all namespaces.
* <p>
* The results are merged and returned as an ECBlockGroupStats object.
*
* @return ECBlockGroupStats for the erasure coding block groups.
* @throws IOException If an I/O error occurs.
*/
@Override
public ECBlockGroupStats getECBlockGroupStats() throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
Expand Down
Loading