Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
b660f4b
changes for integrated catalog and cleanup
Nov 28, 2018
7132932
Merge remote-tracking branch 'origin/master' into catalog-cleanup
Nov 28, 2018
9de169d
some fixes
Nov 28, 2018
3a46574
use non-isolated hive client
Nov 29, 2018
a5d078d
replace koloboke maps with eclipse collections
Nov 29, 2018
2a6635a
Merge remote-tracking branch 'origin/master' into catalog-cleanup
Nov 29, 2018
dd64f47
more fixes to tests
Nov 29, 2018
1dea3e4
corrected tokenize to be session property rather than global
Nov 29, 2018
d62413d
more test fixes as per the catalog fixes
Nov 30, 2018
17315d9
Merge remote-tracking branch 'origin/master' into catalog-cleanup
Dec 8, 2018
fb06812
fix build error after master merge
Dec 8, 2018
3af6077
more fixes/additions and enabled a bunch of compatibility tests
Dec 10, 2018
d2f8b1e
more fixes and changes
Dec 11, 2018
bffb66c
fixed issue with update sub-query due to alias removal
Dec 11, 2018
71d5597
Merge remote-tracking branch 'origin/master' into catalog-cleanup
Dec 12, 2018
285560b
fixes for AQP
Dec 13, 2018
369f583
clear global view catalog explicitly in close
Dec 14, 2018
8d853e0
don't resolve baseTable since it can be temporary table
Dec 14, 2018
aa809b4
fixes for AQP test failures
Dec 16, 2018
e36976e
some cleanups
Dec 16, 2018
74becab
Support for external hive catalog/session from within SnappySession
Dec 17, 2018
b7819bd
switch the sharedState and sessionState in SnappySession before runni…
Dec 17, 2018
1a0a503
minor comment change
Dec 17, 2018
21f8fbd
add gemfire to the providers for which dbtable is added implicitly
Dec 17, 2018
6687984
update store link
Dec 17, 2018
cdd2259
add special path for "gemfire" data source
Dec 18, 2018
fb49f64
minor logging changes
Dec 18, 2018
1da2075
minor comment changes
Dec 18, 2018
187b443
Merge branch 'catalog-cleanup' into external-hive-support
Dec 18, 2018
8494e0f
Merge remote-tracking branch 'origin/master' into external-hive-support
Dec 18, 2018
9ccef06
fixed initialization of shared state in SessionBase to be lazy
Dec 18, 2018
9d3ca95
fixing a couple of failures
Dec 19, 2018
e13521c
improve some exceptions to use "schema" in message rather than "datab…
Dec 19, 2018
ebe9a84
more exception improvements
Dec 19, 2018
0d848d8
correct some issues in the lookup path
Dec 19, 2018
c091e9f
completing the read path support
Dec 20, 2018
68a5544
Merge remote-tracking branch 'origin/master' into external-hive-support
Dec 21, 2018
7840860
Merge remote-tracking branch 'origin/master' into external-hive-support
Dec 29, 2018
69bfe80
write support in external hive catalog
Jan 1, 2019
60b1c08
allow identifier to start with numerics after AS
Jan 10, 2019
72957b6
changed behaviour as per internal discussions
Jan 10, 2019
9fa6c24
Merge remote-tracking branch 'origin/master' into external-hive-support
Apr 11, 2019
ec1c071
Merge remote-tracking branch 'origin/master' into external-hive-support
Apr 11, 2019
75110ee
Remove the test files dtests/src/test/java/io/snappydata/hydra/extern…
Jun 24, 2019
9fe19c6
Fix for SNAP-2998
Jul 8, 2019
485fc61
Merge remote-tracking branch 'origin/master' into external-hive-support
Jul 20, 2019
7991e15
Merge commit '75110ee40f0ec6bf7940f6a554adc0751e641617' into external…
Jul 20, 2019
6de8b61
Merge commit '9fe19c6613986a1f4a7cc732d335daf25444bc19' into external…
Jul 20, 2019
6f601fb
manually merging some changes from master that were refactored into s…
Jul 20, 2019
81f9364
enable external-hive support if non-default hive configuration is pre…
Jul 21, 2019
0e0b59a
changed to temporarily switch to SparkSession
Jul 22, 2019
3de52d4
minor changes
Jul 22, 2019
84c9f05
minor change
Jul 22, 2019
a8bebb4
Merge remote-tracking branch 'origin/master' into external-hive-support
Jul 22, 2019
978e41d
remove snappydata.sql.hive.defaultHiveSource property
Jul 22, 2019
2518402
honour spark.sql.sources.default for default data source
Jul 23, 2019
e455ab6
minor fix
Jul 23, 2019
063614b
removed snappydata.sql.hive.enabled
Jul 23, 2019
1d1707d
Merge remote-tracking branch 'origin/master' into external-hive-support
Jul 23, 2019
29a3e76
update store link
Jul 23, 2019
015ecdb
initial code for porting hive suite
Jul 24, 2019
1a4e571
added check to avoid recursive calls to SnappySessionState initializa…
Jul 24, 2019
86d910a
first working ported hive test suite
Jul 24, 2019
a70046a
fix dynamic setting of spark.sql.catalogImplementation which is other…
Jul 24, 2019
195e247
interpret CREATE TABLE containing hive-specific extensions
Jul 25, 2019
a15d16f
some minor changes to behaviour
Jul 26, 2019
43243ec
fix for SNAP-3100
Jul 26, 2019
2958e1b
fixes for schema/database handling and improved help messages
Jul 26, 2019
99fcf4a
Merge remote-tracking branch 'origin/master' into external-hive-support
Jul 26, 2019
d30d9b9
uniform databaseExists check for hiveSessionCatalog
Jul 26, 2019
b60cfdc
fix MetadataTest to add "DEFAULT" to list of schemas
Jul 27, 2019
80ead5b
update spark and store links
Jul 27, 2019
91ce084
Merge remote-tracking branch 'origin/master' into external-hive-support
Jul 27, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'origin/master' into external-hive-support
  • Loading branch information
Sumedh Wale committed Dec 18, 2018
commit 8494e0f1a596629330225edd1e37fbb3bc701d08
54 changes: 54 additions & 0 deletions cluster/src/test/scala/org/apache/spark/sql/store/BugTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -422,4 +422,58 @@ class BugTest extends SnappyFunSuite with BeforeAndAfterAll {
conn.close()
TestUtil.stopNetServer()
}

test("Bug SNAP-2758 . view containing aggregate function & join throws error") {
snc
var serverHostPort2 = TestUtil.startNetServer()
var conn = DriverManager.getConnection(s"jdbc:snappydata://$serverHostPort2")
var stmt = conn.createStatement()
val snappy = snc.snappySession
snappy.sql("drop table if exists test1")
snappy.sql("create table test1 (col1_1 int, col1_2 int, col1_3 int, col1_4 string) " +
"using column ")

snappy.sql("create table test2 (col2_1 int, col2_2 int, col2_3 int, col2_5 string) " +
"using column ")

snappy.sql(" CREATE OR REPLACE VIEW v1 as select col2_1, col2_2, " +
"col2_5 as longtext from test2 where col2_3 > 10")

val q1 = "select a.col1_1, a.col1_2, " +
" CASE WHEN a.col1_4 = '' THEN '#' ELSE a.col1_4 END functionalAreaCode," +
"b.longtext as name, " +
" sum(a.col1_3)" +
"from test1 a left outer join v1 as b on a.col1_1 = b.col2_1" +
" group by a.col1_1, a.col1_2, " +
" CASE WHEN a.col1_4 = '' THEN '#' ELSE a.col1_4 END," +
" b.longtext "
snappy.sql(q1)
snappy.sql(s" CREATE OR REPLACE VIEW v3 as $q1")

val q = "select a.col1_1, a.col1_2, " +
" CASE WHEN a.col1_4 = '' THEN '#' ELSE a.col1_4 END functionalAreaCode," +
"'#' as fsid, " +
"b.longtext as name, " +
" sum(a.col1_3)" +
"from test1 a left outer join v1 as b on a.col1_1 = b.col2_1" +
" group by a.col1_1, a.col1_2, " +
" CASE WHEN a.col1_4 = '' THEN '#' ELSE a.col1_4 END," +
" '#'," +
" b.longtext "
snappy.sql(q)
snappy.sql(s" CREATE OR REPLACE VIEW v2 as $q")
snappy.sql("select count(*) from v2").collect()

stmt.execute("drop view v3")
stmt.execute("drop view v2")
stmt.execute("drop view v1")
snc.sql("drop table if exists test1")
snc.sql("drop table if exists test2")

conn.close()
TestUtil.stopNetServer()

}


}
1 change: 1 addition & 0 deletions core/src/main/scala/io/snappydata/Literals.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ object StreamingConstants {
val STREAM_QUERY_ID = "streamqueryid"
val SINK_CALLBACK = "sinkcallback"
val CONFLATION = "conflation"
val EVENT_COUNT_COLUMN = "SNAPPYSYS_INTERNAL____EVENT_COUNT"

object EventType {
val INSERT = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ case class ParamLiteral(var value: Any, var dataType: DataType,
final class RefParamLiteral(val param: ParamLiteral, _value: Any, _dataType: DataType, _pos: Int)
extends ParamLiteral(_value, _dataType, _pos, execId = param.execId) {

assert(!param.isInstanceOf[RefParamLiteral])

private[sql] def referenceEquals(p: ParamLiteral): Boolean = {
if (param eq p) {
// Check that value and dataType should also be equal at this point.
Expand Down Expand Up @@ -423,57 +425,67 @@ trait ParamLiteralHolder {

private[sql] final def getCurrentParamsId: Int = paramListId

/**
* Find existing ParamLiteral with given value and DataType. This should
* never return a RefParamLiteral.
*/
private def findExistingParamLiteral(value: Any, dataType: DataType,
numConstants: Int): ParamLiteral = {
numConstants: Int): Option[ParamLiteral] = {
// for size >= 4 use a lookup map to search for same constant else linear search
if (numConstants >= 4) {
if (paramConstantMap eq null) {
// populate the map while checking for a match
paramConstantMap = UnifiedMap.newMap(8)
var i = 0
var existing: ParamLiteral = null
var existing: Option[ParamLiteral] = None
while (i < numConstants) {
val param = parameterizedConstants(i)
if ((existing eq null) && dataType == param.dataType && value == param.value) {
existing = param
parameterizedConstants(i) match {
case _: RefParamLiteral => // skip
case param =>
if (existing.isEmpty && dataType == param.dataType && value == param.value) {
existing = Some(param)
}
paramConstantMap.put(param.dataType -> param.value, param)
}
paramConstantMap.put(param.dataType -> param.value, param)
i += 1
}
existing
} else paramConstantMap.get(dataType -> value)
} else Option(paramConstantMap.get(dataType -> value))
} else {
var i = 0
while (i < numConstants) {
val param = parameterizedConstants(i)
if (dataType == param.dataType && value == param.value) {
return param
parameterizedConstants(i) match {
case _: RefParamLiteral => // skip
case param =>
if (dataType == param.dataType && value == param.value) {
return Some(param)
}
}
i += 1
}
null
None
}
}

private[sql] final def addParamLiteralToContext(value: Any,
dataType: DataType): ParamLiteral = {
val numConstants = parameterizedConstants.length
val existing = findExistingParamLiteral(value, dataType, numConstants)
if (existing ne null) {
// Add to paramelizedConstants list so that its position can be updated
// if required (e.g. if a ParamLiteral is reverted to a Literal for
// functions that require so as in SnappyParserConsts.FOLDABLE_FUNCTIONS)
// In addition RefParamLiteral maintains its own copy of value to avoid updating
// the referenced ParamLiteral's value by functions like ROUND, so that needs to
// be changed too when a plan with updated tokens is created.
val ref = new RefParamLiteral(existing, value, dataType, numConstants)
parameterizedConstants += ref
ref
} else {
val p = ParamLiteral(value, dataType, numConstants, paramListId)
parameterizedConstants += p
if (paramConstantMap ne null) paramConstantMap.put(dataType -> value, p)
p
findExistingParamLiteral(value, dataType, numConstants) match {
case None =>
val p = ParamLiteral(value, dataType, numConstants, paramListId)
parameterizedConstants += p
if (paramConstantMap ne null) paramConstantMap.put(dataType -> value, p)
p
case Some(existing) =>
// Add to parameterizedConstants list so that its position can be updated
// if required (e.g. if a ParamLiteral is reverted to a Literal for
// functions that require so as in SnappyParserConsts.FOLDABLE_FUNCTIONS)
// In addition RefParamLiteral maintains its own copy of value to avoid updating
// the referenced ParamLiteral's value by functions like ROUND, so that needs to
// be changed too when a plan with updated tokens is created.
val ref = new RefParamLiteral(existing, value, dataType, numConstants)
parameterizedConstants += ref
ref
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ case class SnappyStoreSink(snappySession: SnappySession,
}
}

val hashAggregateSizeChanged = HashAggregateSize.get(snappySession.sessionState.conf)
val hashAggregateSizeIsDefault = HashAggregateSize.get(snappySession.sessionState.conf)
.equals(HashAggregateSize.defaultValue.get)
if (hashAggregateSizeChanged) {
if (hashAggregateSizeIsDefault) {
HashAggregateSize.set(snappySession.sessionState.conf, "10m")
}
try {
sinkCallback.process(snappySession, parameters, batchId, convert(data), posDup)
} finally {
if (hashAggregateSizeChanged) {
if (hashAggregateSizeIsDefault) {
HashAggregateSize.set(snappySession.sessionState.conf, HashAggregateSize.defaultValue.get)
}
}
Expand Down Expand Up @@ -148,7 +148,6 @@ import org.apache.spark.sql.snappy._
class DefaultSnappySinkCallback extends SnappySinkCallback {
def process(snappySession: SnappySession, parameters: Map[String, String],
batchId: Long, df: Dataset[Row], posDup: Boolean) {
df.cache().count()
log.debug(s"Processing batchId $batchId with parameters $parameters ...")
val tableName = snappySession.sessionCatalog.formatTableName(parameters(TABLE_NAME))
val conflationEnabled = if (parameters.contains(CONFLATION)) {
Expand All @@ -163,6 +162,7 @@ class DefaultSnappySinkCallback extends SnappySinkCallback {
s", eventTypeColumnAvailable:$eventTypeColumnAvailable,possible duplicate: $posDup")

if (keyColumns.nonEmpty) {
df.cache().count()
val dataFrame: DataFrame = if (conflationEnabled) getConflatedDf else df
if (eventTypeColumnAvailable) {
val deleteDf = dataFrame.filter(dataFrame(EVENT_TYPE_COLUMN) === EventType.DELETE)
Expand Down Expand Up @@ -202,7 +202,6 @@ class DefaultSnappySinkCallback extends SnappySinkCallback {

log.debug(s"Processing batchId $batchId with parameters $parameters ... Done.")


// We are grouping by key columns and getting the last record.
// Note that this approach will work as far as the incoming dataframe is partitioned
// by key columns and events are available in the correct order in the respective partition.
Expand All @@ -218,12 +217,20 @@ class DefaultSnappySinkCallback extends SnappySinkCallback {
index += 1
contains
}
val conflatedDf = if (otherCols.isEmpty) df.distinct()
else {
val exprs = otherCols.map(c => last(c).alias(c))

val conflatedDf: DataFrame = {
val exprs = otherCols.map(c => last(c).alias(c)) ++
Seq(count(lit(1)).alias(EVENT_COUNT_COLUMN))

// if event type of the last event for a key is insert and there are more than one
// events for the same key, then convert inserts to put into
val columns = df.columns.filter(_ != EVENT_TYPE_COLUMN).map(col) ++
Seq(when(col(EVENT_TYPE_COLUMN) === EventType.INSERT && col(EVENT_COUNT_COLUMN) > 1,
EventType.UPDATE).otherwise(col(EVENT_TYPE_COLUMN)).alias(EVENT_TYPE_COLUMN))

df.groupBy(keyCols.head, keyCols.tail: _*)
.agg(exprs.head, exprs.tail: _*)
.select(df.columns.head, df.columns.tail: _*)
.select(columns: _*)
}
conflatedDf.cache()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,24 @@ class SnappyStoreSinkProviderSuite extends SnappyFunSuite
assertData(Array(Row(1, "name999", 999, "lname1")))
}

test("[SNAP-2745]-conflation: delete,insert") {
val testId = testIdGenerator.getAndIncrement()
createTable()()
val topic = getTopic(testId)
kafkaTestUtils.createTopic(topic, partitions = 1)

val batch1 = Seq(Seq(1, "name1", 30, "lname1", 0))
kafkaTestUtils.sendMessages(topic, batch1.map(r => r.mkString(",")).toArray)
val streamingQuery = createAndStartStreamingQuery(topic, testId, conflation = true)

waitTillTheBatchIsPickedForProcessing(0, testId)
val batch2 = Seq(Seq(1, "name1", 30, "lname1", 2), Seq(1, "name1", 30, "lname1", 0))
kafkaTestUtils.sendMessages(topic, batch2.map(r => r.mkString(",")).toArray)

streamingQuery.processAllAvailable()

assertData(Array(Row(1, "name1", 30, "lname1")))
}

test("test conflation disabled") {
val testId = testIdGenerator.getAndIncrement()
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.