Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Adding test for StartsWith
Check for column batch skipping for LIKE 'XYZ%' kind of queries.

Update UnifiedPartitionerTest as per the new default buckets.

Corrected compiler warnings in CatalogConsistencyDUnitTest and adding an assertion
after drop in proper place where drop table is expected to fail.
  • Loading branch information
Sumedh Wale committed Dec 1, 2016
commit 82170c290b10dbefa0dde851266f35cc5f0570f1
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class CatalogConsistencyDUnitTest(s: String) extends ClusterManagerTestBase(s) {
val dataDF = snc.createDataFrame(rdd)

snc.createTable("column_table1", "column", dataDF.schema, props)
// dataDF.write.format("column").mode(SaveMode.Append).options(props).saveAsTable("column_table1")
snc.createTable("column_table2", "column", dataDF.schema, props)
dataDF.write.format("column").mode(SaveMode.Append).options(props).saveAsTable("column_table2")

Expand Down Expand Up @@ -218,24 +217,20 @@ class CatalogConsistencyDUnitTest(s: String) extends ClusterManagerTestBase(s) {
s"'13', REDUNDANCY '1', EVICTION_BY 'LRUHEAPPERCENT', PERSISTENT 'ASYNCHRONOUS'," +
s"PARTITION_BY 'SINGLE_ORDER_DID')");

snc.sql(s"create table $colloactedColumnTable(EXEC_DID BIGINT,SYS_EXEC_VER INTEGER,SYS_EXEC_ID " +
s"VARCHAR(64),TRD_DATE VARCHAR(20),ALT_EXEC_ID VARCHAR(64)) USING column OPTIONS" +
s"(COLOCATE_WITH '$baseColumnTable', BUCKETS '13', REDUNDANCY '1', EVICTION_BY " +
snc.sql(s"create table $colloactedColumnTable(EXEC_DID BIGINT,SYS_EXEC_VER INTEGER," +
s"SYS_EXEC_ID VARCHAR(64),TRD_DATE VARCHAR(20),ALT_EXEC_ID VARCHAR(64)) USING column " +
s"OPTIONS (COLOCATE_WITH '$baseColumnTable', BUCKETS '13', REDUNDANCY '1', EVICTION_BY " +
s"'LRUHEAPPERCENT', PERSISTENT 'ASYNCHRONOUS',PARTITION_BY 'EXEC_DID')");

try {
// This should throw an exception
snc.sql(s"drop table $baseRowTable")

assert(assertion = false, "expected the drop to fail")
} catch {

case ae: AnalysisException =>
// Expected Exception and assert message
assert(ae.getMessage.equals("Object APP.ORDER_DETAILS_ROW cannot be dropped because of " +
"dependent objects: APP.EXEC_DETAILS_ROW;"))
case _ =>
assert(false)

}

// stop spark
Expand All @@ -248,21 +243,18 @@ class CatalogConsistencyDUnitTest(s: String) extends ClusterManagerTestBase(s) {
try {
// This should throw an exception
snc.sql(s"drop table $baseRowTable")

assert(assertion = false, "expected the drop to fail")
} catch {
case ae: AnalysisException =>
// Expected Exception and assert message
assert(ae.getMessage.equals("Object APP.ORDER_DETAILS_ROW cannot be dropped because of " +
"dependent objects: APP.EXEC_DETAILS_ROW;"))
case _ =>
assert(false)
}

snc.sql(s"drop table $colloactedColumnTable")
snc.sql(s"drop table $baseColumnTable")

snc.sql(s"drop table $colloactedRowTable")
snc.sql(s"drop table $baseRowTable")

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ class CachedBatchScanDUnitTest(s: String) extends ClusterManagerTestBase(s) {
snc.sql(s"create table if not exists airline ($ddlStr) " +
s" using column options (Buckets '2')").collect()

for (i <- 1 to 100) {
snc.sql(s"insert into airline values(2015, 2, 15, 1002, $i, 'AA')")
}
import snc.implicits._

val ds = snc.createDataset(sc.range(1, 101).map(i =>
AirlineData(2015, 2, 15, 1002, i.toInt, "AA" + i)))
ds.write.insertInto("airline")

// ***Check for the case when all the cached batches are scanned ****
var previousExecutionIds = snc.sharedState.listener.executionIdToData.keySet
Expand Down Expand Up @@ -98,6 +100,74 @@ class CachedBatchScanDUnitTest(s: String) extends ClusterManagerTestBase(s) {

assert(skipped3 > 0, "Some Cached batches should have been scanned")
assert(scanned3 != skipped3, "Some Cached batches should have been scanned - comparison")

// check for StartsWith predicate with MAX/MIN handling

// first all batches chosen
previousExecutionIds = snc.sharedState.listener.executionIdToData.keySet

val df_allCachedBatchesLikeScan = snc.sql(
"select AVG(ArrDelay) arrivalDelay, UniqueCarrier carrier " +
"from AIRLINE where UniqueCarrier like 'AA%' " +
"group by UniqueCarrier order by arrivalDelay")

var count = df_allCachedBatchesLikeScan.count()
assert(count == 100, s"Unexpected count = $count, expected 100")

executionIds =
snc.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds)

executionId = executionIds.head

val (scanned4, skipped4) =
findCachedBatchStats(df_allCachedBatchesLikeScan, snc.snappySession, executionId)

assert(skipped4 == 0, "No Cached batches should have been skipped")
assert(scanned4 > 0, "All Cached batches should have been scanned")

// next some batches skipped
previousExecutionIds = snc.sharedState.listener.executionIdToData.keySet

val df_someCachedBatchesLikeScan = snc.sql(
"select AVG(ArrDelay) arrivalDelay, UniqueCarrier carrier " +
"from AIRLINE where UniqueCarrier like 'AA1%' " +
"group by UniqueCarrier order by arrivalDelay")

count = df_someCachedBatchesLikeScan.count()
assert(count == 12, s"Unexpected count = $count, expected 12")

executionIds =
snc.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds)

executionId = executionIds.head

val (scanned5, skipped5) =
findCachedBatchStats(df_someCachedBatchesLikeScan, snc.snappySession, executionId)

assert(skipped5 > 0, "Some Cached batches should have been skipped")
assert(scanned5 != skipped5, "Some Cached batches should have been skipped - comparison")

// last all batches skipped
previousExecutionIds = snc.sharedState.listener.executionIdToData.keySet

val df_noCachedBatchesLikeScan = snc.sql(
"select AVG(ArrDelay) arrivalDelay, UniqueCarrier carrier " +
"from AIRLINE where UniqueCarrier like 'AA0%' " +
"group by UniqueCarrier order by arrivalDelay")

count = df_noCachedBatchesLikeScan.count()
assert(count == 0, s"Unexpected count = $count, expected 0")

executionIds =
snc.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds)

executionId = executionIds.head

val (scanned6, skipped6) =
findCachedBatchStats(df_noCachedBatchesLikeScan, snc.snappySession, executionId)

assert(scanned6 == skipped6, "No Cached batches should have been returned")
assert(skipped6 > 0, "No Cached batches should have been returned")
}

private def findCachedBatchStats(df: DataFrame,
Expand All @@ -119,3 +189,6 @@ class CachedBatchScanDUnitTest(s: String) extends ClusterManagerTestBase(s) {
metricValues.filter(_._1 == skippedid).head._2.toInt)
}
}

case class AirlineData(year: Int, month: Int, dayOfMonth: Int,
depTime: Int, arrTime: Int, carrier: String)
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.apache.spark.Logging
import org.apache.spark.sql.ColumnName
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, Murmur3Hash}
import org.apache.spark.sql.execution.columnar.ExternalStoreUtils
import org.apache.spark.sql.types.{DataType, _}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -75,6 +76,7 @@ class UnifiedPartitionerTest extends SnappyFunSuite
}

private def createDate(year: Int, month: Int, date: Int): java.sql.Date = {
// noinspection ScalaDeprecation
new java.sql.Date(year, month, date)
}

Expand Down Expand Up @@ -114,7 +116,7 @@ class UnifiedPartitionerTest extends SnappyFunSuite
assert(rpr != null)
assert(rpr2 != null)

val numPartitions = 11
val numPartitions = ExternalStoreUtils.DEFAULT_TABLE_BUCKETS_LOCAL_MODE.toInt

// Check All Datatypes
var row = createRow(200, IntegerType)
Expand Down