diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 4755eb8d21999..80c14c26b18e0 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -109,6 +109,8 @@ import org.opensearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction; import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusAction; import org.opensearch.action.admin.cluster.snapshots.status.TransportSnapshotsStatusAction; +import org.opensearch.action.admin.cluster.snapshots.list.SnapshotIndicesListAction; +import org.opensearch.action.admin.cluster.snapshots.list.TransportSnapshotIndicesListAction; import org.opensearch.action.admin.cluster.state.ClusterStateAction; import org.opensearch.action.admin.cluster.state.TransportClusterStateAction; import org.opensearch.action.admin.cluster.stats.ClusterStatsAction; @@ -484,6 +486,7 @@ import org.opensearch.rest.action.list.RestIndicesListAction; import org.opensearch.rest.action.list.RestListAction; import org.opensearch.rest.action.list.RestShardsListAction; +import org.opensearch.rest.action.list.RestSnapshotIndicesListAction; import org.opensearch.rest.action.search.RestClearScrollAction; import org.opensearch.rest.action.search.RestCountAction; import org.opensearch.rest.action.search.RestCreatePitAction; @@ -672,6 +675,7 @@ public void reg actions.register(CloneSnapshotAction.INSTANCE, TransportCloneSnapshotAction.class); actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class); actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class); + actions.register(SnapshotIndicesListAction.INSTANCE, TransportSnapshotIndicesListAction.class); actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class); actions.register(ClusterGetWeightedRoutingAction.INSTANCE, TransportGetWeightedRoutingAction.class); @@ -1025,6 +1029,7 @@ public void initRestHandlers(Supplier nodesInCluster) { // LIST API registerHandler.accept(new RestIndicesListAction(responseLimitSettings)); registerHandler.accept(new RestShardsListAction()); + registerHandler.accept(new RestSnapshotIndicesListAction()); // Point in time API registerHandler.accept(new RestCreatePitAction()); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/list/SnapshotIndicesListAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/list/SnapshotIndicesListAction.java new file mode 100644 index 0000000000000..4e0316aa15df3 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/list/SnapshotIndicesListAction.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.snapshots.list; + +import org.opensearch.action.ActionType; + +/** + * ActionType for listing snapshot indices with pagination. + * + * @opensearch.internal + */ +public class SnapshotIndicesListAction extends ActionType { + + public static final SnapshotIndicesListAction INSTANCE = new SnapshotIndicesListAction(); + public static final String NAME = "cluster:admin/snapshot/list/indices"; + + private SnapshotIndicesListAction() { + super(NAME, SnapshotIndicesListResponse::new); + } +} + + diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/list/SnapshotIndicesListRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/list/SnapshotIndicesListRequest.java new file mode 100644 index 0000000000000..b88ea0ebe3b97 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/list/SnapshotIndicesListRequest.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.snapshots.list; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Request for listing snapshot indices with pagination. + * + * @opensearch.internal + */ +public class SnapshotIndicesListRequest extends ClusterManagerNodeRequest { + + private String repository; + private String snapshot; + private int from; + private int size; + + public SnapshotIndicesListRequest() {} + + public SnapshotIndicesListRequest(StreamInput in) throws IOException { + super(in); + this.repository = in.readString(); + this.snapshot = in.readString(); + this.from = in.readVInt(); + this.size = in.readVInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(repository); + out.writeString(snapshot); + out.writeVInt(from); + out.writeVInt(size); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public String repository() { return repository; } + public String snapshot() { return snapshot; } + public int from() { return from; } + public int size() { return size; } + + public SnapshotIndicesListRequest repository(String repository) { + this.repository = repository; + return this; + } + + public SnapshotIndicesListRequest snapshot(String snapshot) { + this.snapshot = snapshot; + return this; + } + + public SnapshotIndicesListRequest from(int from) { + this.from = from; + return this; + } + + public SnapshotIndicesListRequest size(int size) { + this.size = size; + return this; + } +} + + diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/list/SnapshotIndicesListResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/list/SnapshotIndicesListResponse.java new file mode 100644 index 0000000000000..42fb874ab2640 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/list/SnapshotIndicesListResponse.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.snapshots.list; + +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Response payload for snapshot indices list. + * + * @opensearch.internal + */ +public class SnapshotIndicesListResponse extends ActionResponse { + + public static class IndexRow { + public String index; + public int shardsTotal; + public int shardsDone; + public int shardsFailed; + public long fileCount; + public long sizeInBytes; + public long startTimeInMillis; + public long timeInMillis; + + public IndexRow() {} + + public IndexRow(StreamInput in) throws IOException { + this.index = in.readString(); + this.shardsTotal = in.readVInt(); + this.shardsDone = in.readVInt(); + this.shardsFailed = in.readVInt(); + this.fileCount = in.readVLong(); + this.sizeInBytes = in.readVLong(); + this.startTimeInMillis = in.readVLong(); + this.timeInMillis = in.readVLong(); + } + + public void writeTo(StreamOutput out) throws IOException { + out.writeString(index); + out.writeVInt(shardsTotal); + out.writeVInt(shardsDone); + out.writeVInt(shardsFailed); + out.writeVLong(fileCount); + out.writeVLong(sizeInBytes); + out.writeVLong(startTimeInMillis); + out.writeVLong(timeInMillis); + } + } + + private List rows = new ArrayList<>(); + + public SnapshotIndicesListResponse() {} + + public SnapshotIndicesListResponse(StreamInput in) throws IOException { + super(in); + int n = in.readVInt(); + for (int i = 0; i < n; i++) { + rows.add(new IndexRow(in)); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(rows.size()); + for (IndexRow r : rows) { + r.writeTo(out); + } + } + + public List rows() { return rows; } +} + + diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/list/SnapshotListUtils.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/list/SnapshotListUtils.java new file mode 100644 index 0000000000000..42e30839326d1 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/list/SnapshotListUtils.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.snapshots.list; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.snapshots.IndexShardSnapshotStatus; +import org.opensearch.repositories.IndexId; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryData; +import org.opensearch.snapshots.SnapshotId; +import org.opensearch.snapshots.SnapshotInfo; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Utilities for snapshot listing that read only page-relevant data from repository. + * + * @opensearch.internal + */ +final class SnapshotListUtils { + + static class IndexFileStats { + static final IndexFileStats EMPTY = new IndexFileStats(0L, 0L); + final long fileCount; + final long sizeInBytes; + + IndexFileStats(long fileCount, long sizeInBytes) { + this.fileCount = fileCount; + this.sizeInBytes = sizeInBytes; + } + } + + private SnapshotListUtils() {} + + static Map loadIndexMetadata( + Repository repo, + RepositoryData repositoryData, + SnapshotInfo snapshotInfo, + List indices + ) throws IOException { + Map result = new HashMap<>(); + SnapshotId snapshotId = snapshotInfo.snapshotId(); + for (String index : indices) { + IndexId indexId = repositoryData.resolveIndexId(index); + IndexMetadata meta = repo.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId); + if (meta != null) { + result.put(index, meta); + } + } + return result; + } + + static Map loadIndexFileStats( + Repository repo, + RepositoryData repositoryData, + SnapshotInfo snapshotInfo, + List indices, + Map indexMetadataMap + ) throws IOException { + Map result = new HashMap<>(); + SnapshotId snapshotId = snapshotInfo.snapshotId(); + for (String index : indices) { + IndexMetadata meta = indexMetadataMap.get(index); + int numShards = meta == null ? 0 : meta.getNumberOfShards(); + long files = 0L; + long bytes = 0L; + if (numShards > 0) { + IndexId indexId = repositoryData.resolveIndexId(index); + for (int shard = 0; shard < numShards; shard++) { + IndexShardSnapshotStatus.Copy s = repo.getShardSnapshotStatus(snapshotId, indexId, new ShardId(meta.getIndex(), shard)) + .asCopy(); + files += s.getTotalFileCount(); + bytes += s.getTotalSize(); + } + } + result.put(index, new IndexFileStats(files, bytes)); + } + return result; + } +} + + diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/list/TransportSnapshotIndicesListAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/list/TransportSnapshotIndicesListAction.java new file mode 100644 index 0000000000000..6f49cdf186a64 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/list/TransportSnapshotIndicesListAction.java @@ -0,0 +1,173 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.snapshots.list; + +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryData; +import org.opensearch.snapshots.SnapshotId; +import org.opensearch.snapshots.SnapshotInfo; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Transport action that returns a page of indices within a snapshot with summary stats. + * + * @opensearch.internal + */ +public class TransportSnapshotIndicesListAction extends TransportClusterManagerNodeAction { + + private final RepositoriesService repositoriesService; + + @Inject + public TransportSnapshotIndicesListAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + RepositoriesService repositoriesService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + SnapshotIndicesListAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + TransportSnapshotIndicesListAction::newRequest, + indexNameExpressionResolver + ); + this.repositoriesService = repositoriesService; + } + + private static SnapshotIndicesListRequest newRequest(StreamInput in) throws IOException { + return new SnapshotIndicesListRequest(in); + } + + @Override + protected String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + protected ClusterBlockException checkBlock(SnapshotIndicesListRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + + @Override + protected SnapshotIndicesListResponse read(StreamInput in) throws IOException { + return new SnapshotIndicesListResponse(in); + } + + @Override + protected void clusterManagerOperation( + SnapshotIndicesListRequest request, + ClusterState state, + ActionListener listener + ) throws Exception { + final String repository = Objects.requireNonNull(request.repository(), "repository is required"); + final String snapshotName = Objects.requireNonNull(request.snapshot(), "snapshot is required"); + final int from = Math.max(0, request.from()); + final int size = Math.max(0, request.size()); + + repositoriesService.getRepositoryData(repository, ActionListener.wrap(repositoryData -> { + final SnapshotId snapshotId = repositoryData.getSnapshotIds() + .stream() + .filter(s -> s.getName().equals(snapshotName)) + .findFirst() + .orElse(null); + if (snapshotId == null) { + listener.onResponse(new SnapshotIndicesListResponse()); + return; + } + final SnapshotInfo snapshotInfo = repositoriesService.repository(repository).getSnapshotInfo(snapshotId); + // Order indices deterministically (by name) + final List allIndices = snapshotInfo.indices().stream().sorted().collect(Collectors.toList()); + final int start = Math.min(from, allIndices.size()); + final int end = Math.min(start + size, allIndices.size()); + final List page = new ArrayList<>(allIndices.subList(start, end)); + + fetchIndexRows(repository, repositoryData, snapshotInfo, page, ActionListener.wrap(rows -> { + SnapshotIndicesListResponse resp = new SnapshotIndicesListResponse(); + resp.rows().addAll(rows); + listener.onResponse(resp); + }, listener::onFailure)); + }, listener::onFailure)); + } + + private void fetchIndexRows( + String repository, + RepositoryData repositoryData, + SnapshotInfo snapshotInfo, + List pageIndices, + ActionListener> listener + ) { + try { + Repository repo = repositoriesService.repository(repository); + Map indexMetadataMap = SnapshotListUtils.loadIndexMetadata( + repo, + repositoryData, + snapshotInfo, + pageIndices + ); + Map fileStats = SnapshotListUtils.loadIndexFileStats( + repo, + repositoryData, + snapshotInfo, + pageIndices, + indexMetadataMap + ); + + final long startTime = snapshotInfo.startTime(); + final long endTime = snapshotInfo.endTime() == 0 ? threadPool.absoluteTimeInMillis() : snapshotInfo.endTime(); + final long timeInMillis = Math.max(0L, endTime - startTime); + + List rows = new ArrayList<>(); + for (String index : pageIndices) { + int totalShards = indexMetadataMap.containsKey(index) ? indexMetadataMap.get(index).getNumberOfShards() : 0; + int failed = (int) snapshotInfo.shardFailures().stream().filter(f -> index.equals(f.index())).count(); + int done = Math.max(0, totalShards - failed); + SnapshotListUtils.IndexFileStats stats = fileStats.getOrDefault(index, SnapshotListUtils.IndexFileStats.EMPTY); + + SnapshotIndicesListResponse.IndexRow row = new SnapshotIndicesListResponse.IndexRow(); + row.index = index; + row.shardsTotal = totalShards; + row.shardsDone = done; + row.shardsFailed = failed; + row.fileCount = stats.fileCount; + row.sizeInBytes = stats.sizeInBytes; + row.startTimeInMillis = startTime; + row.timeInMillis = timeInMillis; + rows.add(row); + } + listener.onResponse(rows); + } catch (Exception e) { + listener.onFailure(e); + } + } +} + + diff --git a/server/src/main/java/org/opensearch/rest/action/list/RestSnapshotIndicesListAction.java b/server/src/main/java/org/opensearch/rest/action/list/RestSnapshotIndicesListAction.java new file mode 100644 index 0000000000000..61287e7497845 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/list/RestSnapshotIndicesListAction.java @@ -0,0 +1,94 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.list; + +import org.opensearch.action.admin.cluster.snapshots.list.SnapshotIndicesListAction; +import org.opensearch.action.admin.cluster.snapshots.list.SnapshotIndicesListRequest; +import org.opensearch.action.admin.cluster.snapshots.list.SnapshotIndicesListResponse; +import org.opensearch.common.Table; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestResponseListener; +import org.opensearch.rest.action.cat.AbstractCatAction; +import org.opensearch.transport.client.node.NodeClient; + +import java.util.List; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.GET; + +public class RestSnapshotIndicesListAction extends AbstractCatAction { + + @Override + public List routes() { + return unmodifiableList(asList(new Route(GET, "/_list/snapshot/{repository}/{snapshot}/indices"))); + } + + @Override + public String getName() { + return "list_snapshot_indices_action"; + } + + @Override + protected void documentation(StringBuilder sb) { + sb.append("/_list/snapshot/{repository}/{snapshot}/indices\n"); + } + + @Override + public RestChannelConsumer doCatRequest(final RestRequest request, NodeClient client) { + final String repository = request.param("repository"); + final String snapshot = request.param("snapshot"); + final int from = Math.max(0, request.paramAsInt("from", 0)); + final int size = Math.max(1, request.paramAsInt("size", 500)); + + SnapshotIndicesListRequest req = new SnapshotIndicesListRequest().repository(repository).snapshot(snapshot).from(from).size(size); + return channel -> client.execute(SnapshotIndicesListAction.INSTANCE, + req, + new RestResponseListener(channel) { + @Override + public org.opensearch.rest.RestResponse buildResponse(SnapshotIndicesListResponse response) throws Exception { + return org.opensearch.rest.action.cat.RestTable.buildResponse(buildTable(request, response), channel); + } + } + ); + } + + @Override + protected Table getTableWithHeader(RestRequest request) { + return new Table().startHeaders() + .addCell("index", "alias:index;desc:index name") + .addCell("shards.total", "alias:st;desc:total shards") + .addCell("shards.done", "alias:sd;desc:done shards") + .addCell("shards.failed", "alias:sf;desc:failed shards") + .addCell("file_count", "alias:fc;desc:file count") + .addCell("size_in_bytes", "alias:s;desc:size in bytes") + .addCell("start_time_in_millis", "alias:stm;desc:start time millis") + .addCell("time_in_millis", "alias:tm;desc:time in millis") + .endHeaders(); + } + + private Table buildTable(RestRequest request, SnapshotIndicesListResponse response) { + Table t = getTableWithHeader(request); + for (SnapshotIndicesListResponse.IndexRow r : response.rows()) { + t.startRow(); + t.addCell(r.index); + t.addCell(r.shardsTotal); + t.addCell(r.shardsDone); + t.addCell(r.shardsFailed); + t.addCell(r.fileCount); + t.addCell(r.sizeInBytes); + t.addCell(r.startTimeInMillis); + t.addCell(r.timeInMillis); + t.endRow(); + } + return t; + } +} + +