Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixed ForeachSinkSuite
  • Loading branch information
tdas committed Feb 3, 2018
commit 1204755d8bdb0e8f0627a72bc8f456fdc12fc7ea
Original file line number Diff line number Diff line change
Expand Up @@ -46,49 +46,34 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
.foreach(new TestForeachWriter())
.start()

// -- batch 0 ---------------------------------------
input.addData(1, 2, 3, 4)
query.processAllAvailable()
def verifyOutput(expectedVersion: Int, expectedData: Seq[Int]): Unit = {
import ForeachSinkSuite._

var expectedEventsForPartition0 = Seq(
ForeachSinkSuite.Open(partition = 0, version = 0),
ForeachSinkSuite.Process(value = 2),
ForeachSinkSuite.Process(value = 3),
ForeachSinkSuite.Close(None)
)
var expectedEventsForPartition1 = Seq(
ForeachSinkSuite.Open(partition = 1, version = 0),
ForeachSinkSuite.Process(value = 1),
ForeachSinkSuite.Process(value = 4),
ForeachSinkSuite.Close(None)
)
val events = ForeachSinkSuite.allEvents()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test assumed that the output would arrive in specific order after repartitioning, which isnt guaranteed. So I rewrote the test to verify the output in an order-independent way.

assert(events.size === 2) // one seq of events for each of the 2 partitions

var allEvents = ForeachSinkSuite.allEvents()
assert(allEvents.size === 2)
assert(allEvents.toSet === Set(expectedEventsForPartition0, expectedEventsForPartition1))
// Verify both seq of events have an Open event as the first event
assert(events.map(_.head).toSet === Set(0, 1).map(p => Open(p, expectedVersion)))

// Verify all the Process event correspond to the expected data
val allProcessEvents = events.flatMap(_.filter(_.isInstanceOf[Process[_]]))
assert(allProcessEvents.toSet === expectedData.map { data => Process(data) }.toSet)

// Verify both seq of events have a Close event as the last event
assert(events.map(_.last).toSet === Set(Close(None), Close(None)))
}

// -- batch 0 ---------------------------------------
ForeachSinkSuite.clear()
input.addData(1, 2, 3, 4)
query.processAllAvailable()
verifyOutput(expectedVersion = 0, expectedData = 1 to 4)

// -- batch 1 ---------------------------------------
ForeachSinkSuite.clear()
input.addData(5, 6, 7, 8)
query.processAllAvailable()

expectedEventsForPartition0 = Seq(
ForeachSinkSuite.Open(partition = 0, version = 1),
ForeachSinkSuite.Process(value = 5),
ForeachSinkSuite.Process(value = 7),
ForeachSinkSuite.Close(None)
)
expectedEventsForPartition1 = Seq(
ForeachSinkSuite.Open(partition = 1, version = 1),
ForeachSinkSuite.Process(value = 6),
ForeachSinkSuite.Process(value = 8),
ForeachSinkSuite.Close(None)
)

allEvents = ForeachSinkSuite.allEvents()
assert(allEvents.size === 2)
assert(allEvents.toSet === Set(expectedEventsForPartition0, expectedEventsForPartition1))
verifyOutput(expectedVersion = 1, expectedData = 5 to 8)

query.stop()
}
Expand Down