Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Bump artifact versions to latest
  • Loading branch information
Robert Kruszewski committed May 14, 2017
commit 1ffa236252e6423b1cc30f45d8f2836cecedbaba
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,26 @@

package org.apache.spark.network.crypto;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these import reorderings probably don't pass checkstyle

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they do

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Properties;
import javax.crypto.spec.SecretKeySpec;
import javax.crypto.spec.IvParameterSpec;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.util.AbstractReferenceCounted;
import javax.crypto.spec.SecretKeySpec;
import org.apache.commons.crypto.stream.CryptoInputStream;
import org.apache.commons.crypto.stream.CryptoOutputStream;

import org.apache.spark.network.util.ByteArrayReadableChannel;
import org.apache.spark.network.util.ByteArrayWritableChannel;

Expand Down Expand Up @@ -203,6 +206,11 @@ public long transfered() {
return transferred;
}

@Override
public long transferred() {
return transferred;
}

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
Preconditions.checkArgument(position == transfered(), "Invalid position.");
Expand Down Expand Up @@ -232,7 +240,7 @@ private void encryptMore() throws IOException {
int copied = byteRawChannel.write(buf.nioBuffer());
buf.skipBytes(copied);
} else {
region.transferTo(byteRawChannel, region.transfered());
region.transferTo(byteRawChannel, region.transferred());
}
cos.write(byteRawChannel.getData(), 0, byteRawChannel.length());
cos.flush();
Expand All @@ -241,6 +249,28 @@ private void encryptMore() throws IOException {
0, byteEncChannel.length());
}

@Override
public FileRegion retain() {
super.retain();
return this;
}

@Override
public FileRegion retain(int increment) {
super.retain(increment);
return this;
}

@Override
public FileRegion touch() {
return this;
}

@Override
public FileRegion touch(Object o) {
return this;
}

@Override
protected void deallocate() {
byteRawChannel.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@

package org.apache.spark.network.protocol;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import javax.annotation.Nullable;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCountUtil;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import javax.annotation.Nullable;
import org.apache.spark.network.buffer.ManagedBuffer;

/**
Expand Down Expand Up @@ -95,6 +93,11 @@ public long transfered() {
return totalBytesTransferred;
}

@Override
public long transferred() {
return totalBytesTransferred;
}

/**
* This code is more complicated than you would think because we might require multiple
* transferTo invocations in order to transfer a single MessageWithHeader to avoid busy waiting.
Expand Down Expand Up @@ -127,6 +130,28 @@ public long transferTo(final WritableByteChannel target, final long position) th
return writtenHeader + writtenBody;
}

@Override
public FileRegion touch(Object msg) {
return this;
}

@Override
public FileRegion retain() {
super.retain();
return this;
}

@Override
public FileRegion retain(int increment) {
super.retain(increment);
return this;
}

@Override
public FileRegion touch() {
return this;
}

@Override
protected void deallocate() {
header.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

package org.apache.spark.network.sasl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.List;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
Expand All @@ -33,7 +28,10 @@
import io.netty.channel.FileRegion;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.AbstractReferenceCounted;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.List;
import org.apache.spark.network.util.ByteArrayWritableChannel;
import org.apache.spark.network.util.NettyUtils;

Expand Down Expand Up @@ -187,6 +185,11 @@ public long transfered() {
return transferred;
}

@Override
public long transferred() {
return transferred;
}

/**
* Transfers data from the original message to the channel, encrypting it in the process.
*
Expand Down Expand Up @@ -262,7 +265,7 @@ private void nextChunk() throws IOException {
int copied = byteChannel.write(buf.nioBuffer());
buf.skipBytes(copied);
} else {
region.transferTo(byteChannel, region.transfered());
region.transferTo(byteChannel, region.transferred());
}

byte[] encrypted = backend.wrap(byteChannel.getData(), 0, byteChannel.length());
Expand All @@ -272,6 +275,28 @@ private void nextChunk() throws IOException {
this.unencryptedChunkSize = byteChannel.length();
}

@Override
public FileRegion touch(Object o) {
return this;
}

@Override
public FileRegion retain() {
super.retain();
return this;
}

@Override
public FileRegion retain(int increment) {
super.retain(increment);
return this;
}

@Override
public FileRegion touch() {
return this;
}

@Override
protected void deallocate() {
if (currentHeader != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,12 @@ public void close() {
channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
channelFuture = null;
}
if (bootstrap != null && bootstrap.group() != null) {
bootstrap.group().shutdownGracefully();
if (bootstrap != null && bootstrap.config() != null && bootstrap.config().group() != null) {
bootstrap.config().group().shutdownGracefully();
}
if (bootstrap != null && bootstrap.childGroup() != null) {
bootstrap.childGroup().shutdownGracefully();
if (bootstrap != null && bootstrap.config() != null
&& bootstrap.config().childGroup() != null) {
bootstrap.config().childGroup().shutdownGracefully();
}
bootstrap = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,15 @@

package org.apache.spark.network;

import java.util.List;
import static org.junit.Assert.assertEquals;

import com.google.common.primitives.Ints;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.MessageToMessageEncoder;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

import java.util.List;
import org.apache.spark.network.protocol.ChunkFetchFailure;
import org.apache.spark.network.protocol.ChunkFetchRequest;
import org.apache.spark.network.protocol.ChunkFetchSuccess;
Expand All @@ -45,6 +42,7 @@
import org.apache.spark.network.protocol.StreamResponse;
import org.apache.spark.network.util.ByteArrayWritableChannel;
import org.apache.spark.network.util.NettyUtils;
import org.junit.Test;

public class ProtocolSuite {
private void testServerToClient(Message msg) {
Expand All @@ -56,7 +54,7 @@ private void testServerToClient(Message msg) {
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);

while (!serverChannel.outboundMessages().isEmpty()) {
clientChannel.writeInbound(serverChannel.readOutbound());
clientChannel.writeOneInbound(serverChannel.readOutbound());
}

assertEquals(1, clientChannel.inboundMessages().size());
Expand All @@ -72,7 +70,7 @@ private void testClientToServer(Message msg) {
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);

while (!clientChannel.outboundMessages().isEmpty()) {
serverChannel.writeInbound(clientChannel.readOutbound());
serverChannel.writeOneInbound(clientChannel.readOutbound());
}

assertEquals(1, serverChannel.inboundMessages().size());
Expand Down Expand Up @@ -116,8 +114,8 @@ public void encode(ChannelHandlerContext ctx, FileRegion in, List<Object> out)
throws Exception {

ByteArrayWritableChannel channel = new ByteArrayWritableChannel(Ints.checkedCast(in.count()));
while (in.transfered() < in.count()) {
in.transferTo(channel, in.transfered());
while (in.transferred() < in.count()) {
in.transferTo(channel, in.transferred());
}
out.add(Unpooled.wrappedBuffer(channel.getData()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,22 @@

package org.apache.spark.network.protocol;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;
import org.junit.Test;
import org.mockito.Mockito;

import static org.junit.Assert.*;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import org.apache.spark.network.TestManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NettyManagedBuffer;
import org.apache.spark.network.util.ByteArrayWritableChannel;
import org.junit.Test;
import org.mockito.Mockito;

public class MessageWithHeaderSuite {

Expand Down Expand Up @@ -134,6 +133,11 @@ public long transfered() {
return 8 * written;
}

@Override
public long transferred() {
return 8 * written;
}

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
for (int i = 0; i < writesPerCall; i++) {
Expand All @@ -148,6 +152,28 @@ public long transferTo(WritableByteChannel target, long position) throws IOExcep
return 8 * writesPerCall;
}

@Override
public FileRegion retain() {
super.retain();
return this;
}

@Override
public FileRegion retain(int increment) {
super.retain(increment);
return this;
}

@Override
public FileRegion touch(Object o) {
return this;
}

@Override
public FileRegion touch() {
return this;
}

@Override
protected void deallocate() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public void getMetrics(MetricsCollector collector, boolean all) {
}

@VisibleForTesting
public static void collectMetric(MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) {
public static void collectMetric(
MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) {

// The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics
if (metric instanceof Timer) {
Expand Down
Loading