Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ object MimaExcludes {
// SPARK-5270
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaRDDLike.isEmpty")
) ++ Seq(
// SPARK-5297 Java FileStream do not work with custom key/values
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.streaming.api.java.JavaStreamingContext.fileStream")
)

case v if v.startsWith("1.2") =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.spark.streaming.api.java

import java.lang.{Boolean => JBoolean}
import java.io.{Closeable, InputStream}
import java.util.{List => JList, Map => JMap}

import scala.collection.JavaConversions._
import scala.reflect.ClassTag

import java.io.{Closeable, InputStream}
import java.util.{List => JList, Map => JMap}

import akka.actor.{Props, SupervisorStrategy}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark.{SparkConf, SparkContext}
Expand Down Expand Up @@ -250,21 +251,53 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @param kClass class of key for reading HDFS file
* @param vClass class of value for reading HDFS file
* @param fClass class of input format for reading HDFS file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String): JavaPairInputDStream[K, V] = {
implicit val cmk: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val cmv: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
implicit val cmf: ClassTag[F] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]]
directory: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you'll have to keep and deprecate the existing method. At least, that removes the need for a Mima exclusion, and doesn't do any harm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure to deprecate the old method or just to modify the old one, which is best? because the problem is that the old method actually has a bug, not outdated. If we just deprecate it, users can still call this with only warning, and this will lead to exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I doubt there are callers of the old method since, if we're right, it does not work at all. Still it is simple to retain and deprecate it to avoid removing a public API method.

kClass: Class[K],
vClass: Class[V],
fClass: Class[F]): JavaPairInputDStream[K, V] = {
implicit val cmk: ClassTag[K] = ClassTag(kClass)
implicit val cmv: ClassTag[V] = ClassTag(vClass)
implicit val cmf: ClassTag[F] = ClassTag(fClass)
ssc.fileStream[K, V, F](directory)
}

/**
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @param kClass class of key for reading HDFS file
* @param vClass class of value for reading HDFS file
* @param fClass class of input format for reading HDFS file
* @param filter Function to filter paths to process
* @param newFilesOnly Should process only new files and ignore existing files in the directory
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String,
kClass: Class[K],
vClass: Class[V],
fClass: Class[F],
filter: JFunction[Path, JBoolean],
newFilesOnly: Boolean): JavaPairInputDStream[K, V] = {
implicit val cmk: ClassTag[K] = ClassTag(kClass)
implicit val cmv: ClassTag[V] = ClassTag(vClass)
implicit val cmf: ClassTag[F] = ClassTag(fClass)
def fn = (x: Path) => filter.call(x).booleanValue()
ssc.fileStream[K, V, F](directory, fn, newFilesOnly)
}

/**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@

package org.apache.spark.streaming;

import java.io.*;
import java.lang.Iterable;
import java.nio.charset.Charset;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import scala.Tuple2;

import org.junit.Assert;
import static org.junit.Assert.*;
import org.junit.Test;
import java.io.*;
import java.util.*;
import java.lang.Iterable;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -1743,13 +1750,66 @@ public Iterable<String> call(InputStream in) throws IOException {
StorageLevel.MEMORY_ONLY());
}

@SuppressWarnings("unchecked")
@Test
public void testTextFileStream() throws IOException {
File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"));
List<List<String>> expected = fileTestPrepare(testDir);

JavaDStream<String> input = ssc.textFileStream(testDir.toString());
JavaTestUtils.attachTestOutputStream(input);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);

assertOrderInvariantEquals(expected, result);
}

@SuppressWarnings("unchecked")
@Test
public void testTextFileStream() {
JavaDStream<String> test = ssc.textFileStream("/tmp/foo");
public void testFileStream() throws IOException {
File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"));
List<List<String>> expected = fileTestPrepare(testDir);

JavaPairInputDStream<LongWritable, Text> inputStream = ssc.fileStream(
testDir.toString(),
LongWritable.class,
Text.class,
TextInputFormat.class,
new Function<Path, Boolean>() {
@Override
public Boolean call(Path v1) throws Exception {
return Boolean.TRUE;
}
},
true);

JavaDStream<String> test = inputStream.map(
new Function<Tuple2<LongWritable, Text>, String>() {
@Override
public String call(Tuple2<LongWritable, Text> v1) throws Exception {
return v1._2().toString();
}
});

JavaTestUtils.attachTestOutputStream(test);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);

assertOrderInvariantEquals(expected, result);
}

@Test
public void testRawSocketStream() {
JavaReceiverInputDStream<String> test = ssc.rawSocketStream("localhost", 12345);
}

private List<List<String>> fileTestPrepare(File testDir) throws IOException {
File existingFile = new File(testDir, "0");
Files.write("0\n", existingFile, Charset.forName("UTF-8"));
assertTrue(existingFile.setLastModified(1000) && existingFile.lastModified() == 1000);

List<List<String>> expected = Arrays.asList(
Arrays.asList("0")
);

return expected;
}
}