Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added tests
  • Loading branch information
Efim Poberezkin committed Apr 10, 2018
commit 7d85cf2fba6a90f67b6f0c7c7fa3f9a4b18d5859
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,34 @@ class EpochCoordinatorSuite
verifyCommit(1)
}

test("single epoch, all but one writer partition has committed") {
setWriterPartitions(3)
setReaderPartitions(2)

commitPartitionEpoch(0, 1)
commitPartitionEpoch(1, 1)
reportPartitionOffset(0, 1)
reportPartitionOffset(1, 1)

makeSynchronousCall()

verifyCommitHasntHappened(1)
}

test("single epoch, all but one reader partition has reported an offset") {
setWriterPartitions(3)
setReaderPartitions(2)

commitPartitionEpoch(0, 1)
commitPartitionEpoch(1, 1)
commitPartitionEpoch(2, 1)
reportPartitionOffset(0, 1)

makeSynchronousCall()

verifyCommitHasntHappened(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe verifyNoCommitFor

Copy link
Contributor

@tdas tdas Apr 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
verifyNoCommitFor is less verbose. will merge as soon as this is changed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest adding test cases where all but one writer partition has committed, or all but one reader partition has reported an offset. In those cases we should verify that the StreamWriter and query commits haven't happened.


test("consequent epochs, messages for epoch (k + 1) arrive after messages for epoch k") {
setWriterPartitions(2)
setReaderPartitions(2)
Expand Down Expand Up @@ -186,6 +214,11 @@ class EpochCoordinatorSuite
orderVerifier.verify(query).commit(epoch)
}

private def verifyCommitHasntHappened(epoch: Long): Unit = {
verify(writer, never()).commit(eqTo(epoch), any())
verify(query, never()).commit(epoch)
}

private def verifyCommitsInOrderOf(epochs: Seq[Long]): Unit = {
epochs.foreach(verifyCommit)
}
Expand Down