Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
149ea3e
ShuffleWriters write to temp file, then go through
squito Oct 21, 2015
cf8118e
assorted cleanup
squito Oct 22, 2015
ea1ae07
style
squito Oct 22, 2015
9356c67
fix compilation in StoragePerfTester
squito Oct 22, 2015
2b42eb5
mima
squito Oct 22, 2015
32d4b3b
update UnsafeShuffleWriterSuite
squito Oct 22, 2015
550e198
fix imports
squito Oct 22, 2015
4ff98bf
should work now, but needs cleanup
squito Oct 23, 2015
4a19702
only consider tmp files that exist; only consider the dest pre-existi…
squito Oct 23, 2015
89063dd
cleanup
squito Oct 23, 2015
4145651
ShuffleOutputCoordinatorSuite
squito Oct 23, 2015
2089e12
cleanup
squito Oct 23, 2015
2e9bbaa
Merge branch 'master' into SPARK-8029_first_wins
squito Oct 26, 2015
4cd423e
write the winning mapStatus to disk, so subsequent tasks can respond …
squito Oct 26, 2015
dc4b7f6
fix imports
squito Oct 26, 2015
b7a0981
fixes
squito Oct 26, 2015
830a097
shuffle writers must write always write all tmp files
squito Oct 27, 2015
5d11eca
more fixes for zero-sized blocks
squito Oct 27, 2015
3f5af9c
dont make ShuffleWriter return mapStatusFile
squito Oct 27, 2015
4b7c71a
rather than requiring all tmp files to exist, just write a zero-lengt…
squito Oct 27, 2015
eabf978
update test case
squito Oct 27, 2015
5bbeec3
minor cleanup
squito Oct 27, 2015
e141d82
test that shuffle output files are always the same
squito Oct 27, 2015
4df7955
fix compression settings of tmp files; minor cleanup
squito Oct 27, 2015
dc076b8
fix tests
squito Oct 27, 2015
cfdfd2c
review feedback
squito Nov 3, 2015
86f468a
Merge branch 'master' into SPARK-8029_first_wins
squito Nov 4, 2015
5c8b247
fix imports
squito Nov 4, 2015
4d66df1
fix more imports
squito Nov 4, 2015
e59df41
couple more nits ...
squito Nov 4, 2015
c206fc5
minor cleanup
squito Nov 5, 2015
c0edff1
style
squito Nov 5, 2015
da33519
Merge branch 'master' into SPARK-8029_first_wins
squito Nov 11, 2015
c0b93a5
create temporary files in same location as destination files
squito Nov 11, 2015
9d0d9d9
no more @VisibleForTesting
squito Nov 11, 2015
80e037d
unused import
squito Nov 11, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
review feedback
  • Loading branch information
squito committed Nov 4, 2015
commit cfdfd2c7aca33a40e687ed6c93859a72d8f8e280
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Arrays;
import javax.annotation.Nullable;

import org.apache.spark.*;
import scala.None$;
import scala.Option;
import scala.Product2;
Expand All @@ -37,14 +38,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.shuffle.IndexShuffleBlockResolver$;
import org.apache.spark.shuffle.TmpDestShuffleFile;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
Expand Down Expand Up @@ -125,7 +122,7 @@ public BypassMergeSortShuffleWriter(
}

@Override
public Seq<Tuple2<File, File>> write(Iterator<Product2<K, V>> records) throws IOException {
public Seq<TmpDestShuffleFile> write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
final File indexFile = shuffleBlockResolver.getIndexFile(shuffleId, mapId);
final File dataFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
Expand All @@ -136,10 +133,14 @@ public Seq<Tuple2<File, File>> write(Iterator<Product2<K, V>> records) throws IO
// create empty data file so we always commit same set of shuffle output files, even if
// data is non-deterministic
final File tmpDataFile = blockManager.diskBlockManager().createTempShuffleBlock()._2();
tmpDataFile.createNewFile();
if (!tmpDataFile.createNewFile()) {
// only possible if the file already exists, from a race in createTempShuffleBlock, which
// should be super-rare
throw new IOException("could not create shuffle data file: " + tmpDataFile);
}
return JavaConverters.asScalaBufferConverter(Arrays.asList(
new Tuple2<>(tmpIndexFile, indexFile),
new Tuple2<>(tmpDataFile, dataFile)
new TmpDestShuffleFile(tmpIndexFile, indexFile),
new TmpDestShuffleFile(tmpDataFile, dataFile)
)).asScala();
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -174,8 +175,8 @@ public Seq<Tuple2<File, File>> write(Iterator<Product2<K, V>> records) throws IO
final File tmpIndexFile = shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return JavaConverters.asScalaBufferConverter(Arrays.asList(
new Tuple2<>(tmpIndexFile, indexFile),
new Tuple2<>(tmpDataFile, dataFile)
new TmpDestShuffleFile(tmpIndexFile, indexFile),
new TmpDestShuffleFile(tmpDataFile, dataFile)
)).asScala();

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.IndexShuffleBlockResolver$;
import org.apache.spark.shuffle.TmpDestShuffleFile;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.ShuffleIndexBlockId;
import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.unsafe.Platform;

Expand Down Expand Up @@ -160,12 +159,12 @@ public long getPeakMemoryUsedBytes() {
* This convenience method should only be called in test code.
*/
@VisibleForTesting
public Seq<Tuple2<File, File>> write(Iterator<Product2<K, V>> records) throws IOException {
public Seq<TmpDestShuffleFile> write(Iterator<Product2<K, V>> records) throws IOException {
return write(JavaConverters.asScalaIteratorConverter(records).asScala());
}

@Override
public Seq<Tuple2<File, File>> write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
public Seq<TmpDestShuffleFile> write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
// Keep track of success so we know if we encountered an exception
// We do this rather than a standard try/catch/re-throw to handle
// generic throwables.
Expand All @@ -174,7 +173,7 @@ public Seq<Tuple2<File, File>> write(scala.collection.Iterator<Product2<K, V>> r
while (records.hasNext()) {
insertRecordIntoSorter(records.next());
}
final Seq<Tuple2<File, File>> result = closeAndWriteOutput();
final Seq<TmpDestShuffleFile> result = closeAndWriteOutput();
success = true;
return result;
} finally {
Expand Down Expand Up @@ -210,7 +209,7 @@ private void open() throws IOException {
}

@VisibleForTesting
Seq<Tuple2<File, File>> closeAndWriteOutput() throws IOException {
Seq<TmpDestShuffleFile> closeAndWriteOutput() throws IOException {
assert(sorter != null);
updatePeakMemoryUsed();
serBuffer = null;
Expand All @@ -236,8 +235,8 @@ Seq<Tuple2<File, File>> closeAndWriteOutput() throws IOException {
final File indexFile = shuffleBlockResolver.getIndexFile(shuffleId, mapId);

return JavaConverters.asScalaBufferConverter(Arrays.asList(
new Tuple2<>(tmpIndexFile, indexFile),
new Tuple2<>(tmpDataFile, dataFile)
new TmpDestShuffleFile(tmpIndexFile, indexFile),
new TmpDestShuffleFile(tmpDataFile, dataFile)
)).asScala();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,8 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
for (mapId <- state.completedMapTasks.asScala) {
val mapStatusFile =
blockManager.diskBlockManager.getFile(ShuffleMapStatusBlockId(shuffleId, mapId))
if (mapStatusFile.exists()) {
if (!mapStatusFile.delete()) {
logWarning(s"Error deleting MapStatus file ${mapStatusFile.getPath()}")
}
if (mapStatusFile.exists() && !mapStatusFile.delete()) {
logWarning(s"Error deleting MapStatus file ${mapStatusFile.getPath()}")
}
}
logInfo("Deleted all files for shuffle " + shuffleId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,8 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
}

file = blockManager.diskBlockManager.getFile(ShuffleMapStatusBlockId(shuffleId, mapId))
if (file.exists()) {
if (!file.delete()) {
logWarning(s"Error deleting MapStatus file ${file.getPath()}")
}
if (file.exists() && !file.delete()) {
logWarning(s"Error deleting MapStatus file ${file.getPath()}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
*/
package org.apache.spark.shuffle

import java.io.{FileOutputStream, FileInputStream, File}
import java.io.{File, FileInputStream, FileOutputStream, IOException}

import com.google.common.annotations.VisibleForTesting

import org.apache.spark.storage.ShuffleMapStatusBlockId
import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.storage.ShuffleMapStatusBlockId
import org.apache.spark.util.Utils

/**
* Ensures that on each executor, there are no conflicting writes to the same shuffle files. It
Expand All @@ -47,12 +48,11 @@ private[spark] object ShuffleOutputCoordinator extends Logging {
* @param mapStatus the [[MapStatus]] for the output already written to the the temporary files
* @return pair of: (1) true iff the set of temporary files was moved to the destination and (2)
* the MapStatus of the committed attempt.
*
*/
def commitOutputs(
shuffleId: Int,
partitionId: Int,
tmpToDest: Seq[(File, File)],
tmpToDest: Seq[TmpDestShuffleFile],
mapStatus: MapStatus,
sparkEnv: SparkEnv): (Boolean, MapStatus) = synchronized {
val mapStatusFile = sparkEnv.blockManager.diskBlockManager.getFile(
Expand All @@ -65,17 +65,21 @@ private[spark] object ShuffleOutputCoordinator extends Logging {
def commitOutputs(
shuffleId: Int,
partitionId: Int,
tmpToDest: Seq[(File, File)],
tmpToDest: Seq[TmpDestShuffleFile],
mapStatus: MapStatus,
mapStatusFile: File,
serializer: SerializerInstance): (Boolean, MapStatus) = synchronized {
val destAlreadyExists = tmpToDest.forall{_._2.exists()} && mapStatusFile.exists()
// due to SPARK-4085, we only consider the previous attempt "committed" if all its output
// files are present
val destAlreadyExists = tmpToDest.forall(_.dstFile.exists()) && mapStatusFile.exists()
if (!destAlreadyExists) {
tmpToDest.foreach { case (tmp, dest) =>
// If *some* of the destination files exist, but not all of them, then its not clear
tmpToDest.foreach { case TmpDestShuffleFile(tmp, dest) =>
// If *some* of the destination files exist, but not all of them, then it's not clear
// what to do. There could be a task already reading from this dest file when we delete
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this justification feels a little weird to me. Let's say you have t1 and t2.

  • if t1 finishes and writes "n" files, and t2 finishes and writes "n + 1" files, then you'll get the output of t2
  • if instead t2 also creates "n" files, you'll get the output of t1 instead.

Or am I misunderstanding something?

Also, the "if a task is already reading from the file" case will probably make this whole block of code fail on Windows; I believe File.delete() will fail if the file is open, unlike on POSIX fses.

// it -- but then again, something in that taskset would be doomed to fail in any case when
// it got to the missing files. Better to just put consistent output into place
// it got to the missing files. Better to just put consistent output into place.
// Note that for this to work with non-determinstic data, it is *critical* that each
// attempt always produces the exact same set of destination files (even if they are empty).
if (dest.exists()) {
dest.delete()
}
Expand All @@ -85,7 +89,9 @@ private[spark] object ShuffleOutputCoordinator extends Logging {
// we always create the destination files, so this works correctly even when the
// input data is non-deterministic (potentially empty in one iteration, and non-empty
// in another)
dest.createNewFile()
if (!dest.createNewFile()) {
throw new IOException("could not create file: $file")
}
}
}
val out = serializer.serializeStream(new FileOutputStream(mapStatusFile))
Expand All @@ -95,11 +101,13 @@ private[spark] object ShuffleOutputCoordinator extends Logging {
} else {
logInfo(s"shuffle output for shuffle $shuffleId, partition $partitionId already exists, " +
s"not overwriting. Another task must have created this shuffle output.")
tmpToDest.foreach{ case (tmp, _) => tmp.delete()}
tmpToDest.foreach{ tmpAndDest => tmpAndDest.tmpFile.delete()}
val in = serializer.deserializeStream(new FileInputStream(mapStatusFile))
val readStatus = in.readObject[MapStatus]
in.close()
(false, readStatus)
Utils.tryWithSafeFinally {
(false, in.readObject[MapStatus]())
} {
in.close()
}
}
}
}
18 changes: 16 additions & 2 deletions core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,25 @@ private[spark] abstract class ShuffleWriter[K, V] {
* Write a sequence of records to this task's output. This should write all data
* to temporary files, but return (temporaryFile, destinationFile) pairs for each
* file written. The temporary files will get moved to their destination or deleted
* by the [[ShuffleOutputCoordinator]]
* by the [[ShuffleOutputCoordinator]]. Note that for the ShuffleOutputCoordinator
* to work correctly, each attempt *must* have the exact same set of destination files.
* If the temporary file is empty, the ShuffleWriter does not have to create the file -- however
* it *must* still be in the result Seq, just pointing to a non-existent file.
*/
@throws[IOException]
def write(records: Iterator[Product2[K, V]]): Seq[(File, File)]
def write(records: Iterator[Product2[K, V]]): Seq[TmpDestShuffleFile]

/** Close this writer, passing along whether the map completed */
def stop(success: Boolean): Option[MapStatus]
}

/**
* The location of one shuffle file written by a [[ShuffleWriter]]. Holds both the temporary
* file, which is written to by the ShuffleWriter itself, and the destination file, where the
* file should get moved by the [[ShuffleOutputCoordinator]]. The ShuffleWriter is responsible
* for specifying both locations, though it only writes the temp file.
*/
private[shuffle] case class TmpDestShuffleFile(
val tmpFile: File,
val dstFile: File
)
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[spark] class HashShuffleWriter[K, V](
writeMetrics)

/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Seq[(File, File)] = {
override def write(records: Iterator[Product2[K, V]]): Seq[TmpDestShuffleFile] = {
val iter = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
dep.aggregator.get.combineValuesByKey(records, context)
Expand All @@ -67,7 +67,7 @@ private[spark] class HashShuffleWriter[K, V](
val bucketId = dep.partitioner.getPartition(elem._1)
shuffle.writers(bucketId)._1.write(elem._1, elem._2)
}
shuffle.writers.map { case (writer, destFile) => writer.file -> destFile}
shuffle.writers.map { case (writer, destFile) => TmpDestShuffleFile(writer.file, destFile) }
}

/** Close this writer, passing along whether the map completed */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

package org.apache.spark.shuffle.sort

import java.io.File

import org.apache.spark._
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.{BaseShuffleHandle, IndexShuffleBlockResolver, ShuffleWriter}
import org.apache.spark.storage.{ShuffleBlockId, ShuffleIndexBlockId, ShuffleMapStatusBlockId}
import org.apache.spark.shuffle.{BaseShuffleHandle, IndexShuffleBlockResolver, ShuffleWriter, TmpDestShuffleFile}
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.collection.ExternalSorter

private[spark] class SortShuffleWriter[K, V, C](
Expand All @@ -50,7 +48,7 @@ private[spark] class SortShuffleWriter[K, V, C](
context.taskMetrics.shuffleWriteMetrics = Some(writeMetrics)

/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Seq[(File, File)] = {
override def write(records: Iterator[Product2[K, V]]): Seq[TmpDestShuffleFile] = {
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
Expand All @@ -76,8 +74,8 @@ private[spark] class SortShuffleWriter[K, V, C](

mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
Seq(
tmpDataFile -> dataFile,
tmpIndexFile -> indexFile
TmpDestShuffleFile(tmpDataFile, dataFile),
TmpDestShuffleFile(tmpIndexFile, indexFile)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
blockId = new TempUncompressedShuffleBlockId(UUID.randomUUID())
}
(blockId, getFile(blockId))

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.ShuffleOutputCoordinator;
import org.apache.spark.shuffle.TmpDestShuffleFile;
import org.apache.spark.storage.*;
import org.apache.spark.util.Utils;

Expand Down Expand Up @@ -279,7 +280,7 @@ class BadRecords extends scala.collection.AbstractIterator<Product2<Object, Obje
@Test
public void writeEmptyIterator() throws Exception {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
Seq<Tuple2<File, File>> files = writer.write(Iterators.<Product2<Object, Object>>emptyIterator());
Seq<TmpDestShuffleFile> files = writer.write(Iterators.<Product2<Object, Object>>emptyIterator());
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
ShuffleOutputCoordinator.commitOutputs(0, 0, files, mapStatus.get(), mapStatusFile,
Expand All @@ -301,7 +302,7 @@ public void writeWithoutSpilling() throws Exception {
dataToWrite.add(new Tuple2<Object, Object>(i, i));
}
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
Seq<Tuple2<File, File>> files = writer.write(dataToWrite.iterator());
Seq<TmpDestShuffleFile> files = writer.write(dataToWrite.iterator());
final Option<MapStatus> mapStatus = writer.stop(true);
ShuffleOutputCoordinator.commitOutputs(0, 0, files, mapStatus.get(), mapStatusFile,
serializer.newInstance());
Expand Down Expand Up @@ -348,7 +349,7 @@ private void testMergingSpills(
writer.forceSorterToSpill();
writer.insertRecordIntoSorter(dataToWrite.get(4));
writer.insertRecordIntoSorter(dataToWrite.get(5));
Seq<Tuple2<File, File>> files = writer.closeAndWriteOutput();
Seq<TmpDestShuffleFile> files = writer.closeAndWriteOutput();
final Option<MapStatus> mapStatus = writer.stop(true);
ShuffleOutputCoordinator.commitOutputs(0, 0, files, mapStatus.get(), mapStatusFile,
serializer.newInstance());
Expand Down Expand Up @@ -429,7 +430,7 @@ public void writeEnoughDataToTriggerSpill() throws Exception {
for (int i = 0; i < 128 + 1; i++) {
dataToWrite.add(new Tuple2<Object, Object>(i, bigByteArray));
}
Seq<Tuple2<File, File>> files = writer.write(dataToWrite.iterator());
Seq<TmpDestShuffleFile> files = writer.write(dataToWrite.iterator());
verify(taskMemoryManager, times(5)).acquireExecutionMemory(anyLong());
// this includes the tmp index & data files, before the output is committed
assertEquals(4, tmpShuffleFilesCreated.size());
Expand Down Expand Up @@ -459,7 +460,7 @@ public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exce
for (int i = 0; i < UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE; i++) {
dataToWrite.add(new Tuple2<Object, Object>(i, i));
}
Seq<Tuple2<File, File>> files = writer.write(dataToWrite.iterator());
Seq<TmpDestShuffleFile> files = writer.write(dataToWrite.iterator());
verify(taskMemoryManager, times(5)).acquireExecutionMemory(anyLong());
// this includes the tmp index & data files, before the output is committed
assertEquals(4, tmpShuffleFilesCreated.size());
Expand All @@ -484,7 +485,7 @@ public void writeRecordsThatAreBiggerThanDiskWriteBufferSize() throws Exception
final byte[] bytes = new byte[(int) (ShuffleExternalSorter.DISK_WRITE_BUFFER_SIZE * 2.5)];
new Random(42).nextBytes(bytes);
dataToWrite.add(new Tuple2<Object, Object>(1, ByteBuffer.wrap(bytes)));
Seq<Tuple2<File, File>> files = writer.write(dataToWrite.iterator());
Seq<TmpDestShuffleFile> files = writer.write(dataToWrite.iterator());
MapStatus mapStatus = writer.stop(true).get();
ShuffleOutputCoordinator.commitOutputs(0, 0, files, mapStatus, mapStatusFile,
serializer.newInstance());
Expand All @@ -507,7 +508,7 @@ public void writeRecordsThatAreBiggerThanMaxRecordSize() throws Exception {
final byte[] exceedsMaxRecordSize = new byte[writer.maxRecordSizeBytes() + 1];
new Random(42).nextBytes(exceedsMaxRecordSize);
dataToWrite.add(new Tuple2<Object, Object>(3, ByteBuffer.wrap(exceedsMaxRecordSize)));
Seq<Tuple2<File, File>> files = writer.write(dataToWrite.iterator());
Seq<TmpDestShuffleFile> files = writer.write(dataToWrite.iterator());
MapStatus mapStatus = writer.stop(true).get();
ShuffleOutputCoordinator.commitOutputs(0, 0, files, mapStatus, mapStatusFile,
serializer.newInstance());
Expand Down
Loading