Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
b660f4b
changes for integrated catalog and cleanup
Nov 28, 2018
7132932
Merge remote-tracking branch 'origin/master' into catalog-cleanup
Nov 28, 2018
9de169d
some fixes
Nov 28, 2018
3a46574
use non-isolated hive client
Nov 29, 2018
a5d078d
replace koloboke maps with eclipse collections
Nov 29, 2018
2a6635a
Merge remote-tracking branch 'origin/master' into catalog-cleanup
Nov 29, 2018
dd64f47
more fixes to tests
Nov 29, 2018
1dea3e4
corrected tokenize to be session property rather than global
Nov 29, 2018
d62413d
more test fixes as per the catalog fixes
Nov 30, 2018
17315d9
Merge remote-tracking branch 'origin/master' into catalog-cleanup
Dec 8, 2018
fb06812
fix build error after master merge
Dec 8, 2018
3af6077
more fixes/additions and enabled a bunch of compatibility tests
Dec 10, 2018
d2f8b1e
more fixes and changes
Dec 11, 2018
bffb66c
fixed issue with update sub-query due to alias removal
Dec 11, 2018
71d5597
Merge remote-tracking branch 'origin/master' into catalog-cleanup
Dec 12, 2018
285560b
fixes for AQP
Dec 13, 2018
369f583
clear global view catalog explicitly in close
Dec 14, 2018
8d853e0
don't resolve baseTable since it can be temporary table
Dec 14, 2018
aa809b4
fixes for AQP test failures
Dec 16, 2018
e36976e
some cleanups
Dec 16, 2018
74becab
Support for external hive catalog/session from within SnappySession
Dec 17, 2018
b7819bd
switch the sharedState and sessionState in SnappySession before runni…
Dec 17, 2018
1a0a503
minor comment change
Dec 17, 2018
21f8fbd
add gemfire to the providers for which dbtable is added implicitly
Dec 17, 2018
6687984
update store link
Dec 17, 2018
cdd2259
add special path for "gemfire" data source
Dec 18, 2018
fb49f64
minor logging changes
Dec 18, 2018
1da2075
minor comment changes
Dec 18, 2018
187b443
Merge branch 'catalog-cleanup' into external-hive-support
Dec 18, 2018
8494e0f
Merge remote-tracking branch 'origin/master' into external-hive-support
Dec 18, 2018
9ccef06
fixed initialization of shared state in SessionBase to be lazy
Dec 18, 2018
9d3ca95
fixing a couple of failures
Dec 19, 2018
e13521c
improve some exceptions to use "schema" in message rather than "datab…
Dec 19, 2018
ebe9a84
more exception improvements
Dec 19, 2018
0d848d8
correct some issues in the lookup path
Dec 19, 2018
c091e9f
completing the read path support
Dec 20, 2018
68a5544
Merge remote-tracking branch 'origin/master' into external-hive-support
Dec 21, 2018
7840860
Merge remote-tracking branch 'origin/master' into external-hive-support
Dec 29, 2018
69bfe80
write support in external hive catalog
Jan 1, 2019
60b1c08
allow identifier to start with numerics after AS
Jan 10, 2019
72957b6
changed behaviour as per internal discussions
Jan 10, 2019
9fa6c24
Merge remote-tracking branch 'origin/master' into external-hive-support
Apr 11, 2019
ec1c071
Merge remote-tracking branch 'origin/master' into external-hive-support
Apr 11, 2019
75110ee
Remove the test files dtests/src/test/java/io/snappydata/hydra/extern…
Jun 24, 2019
9fe19c6
Fix for SNAP-2998
Jul 8, 2019
485fc61
Merge remote-tracking branch 'origin/master' into external-hive-support
Jul 20, 2019
7991e15
Merge commit '75110ee40f0ec6bf7940f6a554adc0751e641617' into external…
Jul 20, 2019
6de8b61
Merge commit '9fe19c6613986a1f4a7cc732d335daf25444bc19' into external…
Jul 20, 2019
6f601fb
manually merging some changes from master that were refactored into s…
Jul 20, 2019
81f9364
enable external-hive support if non-default hive configuration is pre…
Jul 21, 2019
0e0b59a
changed to temporarily switch to SparkSession
Jul 22, 2019
3de52d4
minor changes
Jul 22, 2019
84c9f05
minor change
Jul 22, 2019
a8bebb4
Merge remote-tracking branch 'origin/master' into external-hive-support
Jul 22, 2019
978e41d
remove snappydata.sql.hive.defaultHiveSource property
Jul 22, 2019
2518402
honour spark.sql.sources.default for default data source
Jul 23, 2019
e455ab6
minor fix
Jul 23, 2019
063614b
removed snappydata.sql.hive.enabled
Jul 23, 2019
1d1707d
Merge remote-tracking branch 'origin/master' into external-hive-support
Jul 23, 2019
29a3e76
update store link
Jul 23, 2019
015ecdb
initial code for porting hive suite
Jul 24, 2019
1a4e571
added check to avoid recursive calls to SnappySessionState initializa…
Jul 24, 2019
86d910a
first working ported hive test suite
Jul 24, 2019
a70046a
fix dynamic setting of spark.sql.catalogImplementation which is other…
Jul 24, 2019
195e247
interpret CREATE TABLE containing hive-specific extensions
Jul 25, 2019
a15d16f
some minor changes to behaviour
Jul 26, 2019
43243ec
fix for SNAP-3100
Jul 26, 2019
2958e1b
fixes for schema/database handling and improved help messages
Jul 26, 2019
99fcf4a
Merge remote-tracking branch 'origin/master' into external-hive-support
Jul 26, 2019
d30d9b9
uniform databaseExists check for hiveSessionCatalog
Jul 26, 2019
b60cfdc
fix MetadataTest to add "DEFAULT" to list of schemas
Jul 27, 2019
80ead5b
update spark and store links
Jul 27, 2019
91ce084
Merge remote-tracking branch 'origin/master' into external-hive-support
Jul 27, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Support for external hive catalog/session from within SnappySession
  • Loading branch information
Sumedh Wale committed Dec 17, 2018
commit 74becab942702b51a1f4218d22d6daae967e8636
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*/
package org.apache.spark.sql.test

import org.apache.spark.sql.internal.{SnappyConf, SnappySessionState}
import org.apache.spark.sql.hive.SnappySessionState
import org.apache.spark.sql.internal.SnappyConf
import org.apache.spark.sql.{SnappySession, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

Expand Down
97 changes: 97 additions & 0 deletions core/src/main/java/org/apache/spark/sql/internal/SessionBase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2018 SnappyData, Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/

package org.apache.spark.sql.internal;

import java.lang.reflect.InvocationTargetException;

import org.apache.spark.SparkContext;
import org.apache.spark.sql.SnappyContext$;
import org.apache.spark.sql.SnappySession;
import org.apache.spark.sql.SnappySession$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.SnappySessionState;

/**
* Base class for SnappySession that allows having the SessionState and SharedState
* as variables that can be changed when required (for hive support).
*/
public abstract class SessionBase extends SparkSession {

private static final long serialVersionUID = 7013637782126648003L;

private transient volatile SessionState snappySessionState;

private transient SharedState snappySharedState;

public SessionBase(SparkContext sc) {
super(sc);
this.snappySharedState = SnappyContext$.MODULE$.sharedState(sc);
}

/**
* State isolated across sessions, including SQL configurations, temporary tables, registered
* functions, and everything else that accepts a {@link org.apache.spark.sql.internal.SQLConf}.
*/
@Override
public SessionState sessionState() {
SessionState sessionState = this.snappySessionState;
if (sessionState != null) {
return sessionState;
}
synchronized (this) {
sessionState = this.snappySessionState;
if (sessionState != null) {
return sessionState;
}
scala.Option<Class<?>> sessionStateClass = SnappySession$.MODULE$.aqpSessionStateClass();
if (sessionStateClass.isEmpty()) {
sessionState = this.snappySessionState = new SnappySessionState((SnappySession)this);
} else {
Class<?> aqpClass = sessionStateClass.get();
try {
sessionState = this.snappySessionState = (SessionState)aqpClass.getConstructor(
SnappySession.class).newInstance((SnappySession)this);
} catch (InvocationTargetException e) {
throw new RuntimeException(e.getCause());
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return sessionState;
}
}

public void setSessionState(SessionState sessionState) {
this.snappySessionState = sessionState;
}

/**
* State shared across sessions, including the {@link SparkContext}, cached data, listener,
* and a catalog that interacts with external systems.
*/
@Override
public SharedState sharedState() {
return this.snappySharedState;
}

public void setSharedState(SharedState sharedState) {
this.snappySharedState = sharedState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.internal;

import javax.annotation.concurrent.GuardedBy;

import io.snappydata.sql.catalog.ConnectorExternalCatalog;
import io.snappydata.sql.catalog.SnappyExternalCatalog;
import org.apache.spark.SparkContext;
Expand All @@ -37,7 +39,7 @@
import org.apache.spark.sql.execution.ui.SQLTab;
import org.apache.spark.sql.execution.ui.SnappySQLListener;
import org.apache.spark.sql.hive.HiveClientUtil$;
import org.apache.spark.sql.hive.SnappyHiveExternalCatalog;
import org.apache.spark.sql.hive.HiveExternalCatalog;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.ui.SparkUI;

Expand All @@ -59,7 +61,7 @@ public final class SnappySharedState extends SharedState {
/**
* The ExternalCatalog implementation used for SnappyData in embedded mode.
*/
private final SnappyHiveExternalCatalog embedCatalog;
private final HiveExternalCatalog embedCatalog;

/**
* Overrides to use upper-case "database" name as assumed by SnappyData
Expand All @@ -72,6 +74,9 @@ public final class SnappySharedState extends SharedState {
*/
private final boolean initialized;

@GuardedBy("this")
private SharedState hiveState;

private static final String CATALOG_IMPLEMENTATION = "spark.sql.catalogImplementation";

/**
Expand Down Expand Up @@ -172,6 +177,21 @@ public static synchronized SnappySharedState create(SparkContext sparkContext) {
return sharedState;
}

/**
* Private constructor for getting Spark's hive state while sharing the
* global temp views and cache
*/
private SnappySharedState(SparkContext sparkContext,
CacheManager cacheManager, GlobalTempViewManager globalViewManager) {
super(sparkContext);

this.snappyCacheManager = cacheManager;
this.embedCatalog = (HiveExternalCatalog)externalCatalog();
this.globalViewManager = globalViewManager;

this.initialized = true;
}

/**
* Returns the global external hive catalog embedded mode, while in smart
* connector mode returns a new instance of external catalog since it
Expand All @@ -180,10 +200,10 @@ public static synchronized SnappySharedState create(SparkContext sparkContext) {
*/
public SnappyExternalCatalog getExternalCatalogInstance(SnappySession session) {
if (!this.initialized) {
throw new IllegalStateException("getExternalCatalogInstance unexpected invocation " +
throw new IllegalStateException("getExternalCatalogInstance: unexpected invocation " +
"from within SnappySharedState constructor");
} else if (this.embedCatalog != null) {
return this.embedCatalog;
return (SnappyExternalCatalog)this.embedCatalog;
} else {
// create a new connector catalog instance for connector mode
// each instance has its own set of credentials for authentication
Expand Down Expand Up @@ -220,4 +240,29 @@ public GlobalTempViewManager globalTempViewManager() {
return super.globalTempViewManager();
}
}

/**
* Create a Spark hive shared state while sharing the global temp view and cache managers.
*/
public synchronized SharedState getHiveSharedState() {
if (this.hiveState != null) return this.hiveState;

if (!this.initialized) {
throw new IllegalStateException("getHiveSharedState: unexpected invocation " +
"from within SnappySharedState constructor");
}
final SparkContext context = sparkContext();
final String catalogImpl = context.conf().get(CATALOG_IMPLEMENTATION, null);
context.conf().set(CATALOG_IMPLEMENTATION, "hive");
try {
return (this.hiveState = new SnappySharedState(context,
this.snappyCacheManager, this.globalViewManager));
} finally {
if (catalogImpl != null) {
context.conf().set(CATALOG_IMPLEMENTATION, catalogImpl);
} else {
context.conf().remove(CATALOG_IMPLEMENTATION);
}
}
}
}
5 changes: 5 additions & 0 deletions core/src/main/scala/io/snappydata/Literals.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ object Property extends Enumeration {
s"${Constant.PROPERTY_PREFIX}sql.parser.traceError",
"Property to enable detailed rule tracing for parse errors", Some(false))

val EnableHiveSupport: SQLValue[Boolean] = SQLVal(
s"${Constant.PROPERTY_PREFIX}sql.enableHiveSupport", "Property on SnappySession to " +
"enable external hive meta-store support as configured using SparkConf and hive-site.xml",
Some(false))

val EnableExperimentalFeatures: SQLValue[Boolean] = SQLVal[Boolean](
s"${Constant.PROPERTY_PREFIX}enable-experimental-features",
"SQLConf property that enables snappydata experimental features like distributed index " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class CachedDataFrame(snappySession: SnappySession, queryExecution: QueryExecuti
}

private def setPoolForExecution(): Unit = {
var pool = snappySession.sessionState.conf.activeSchedulerPool
var pool = snappySession.snappySessionState.conf.activeSchedulerPool
// Check if it is pruned query, execute it automatically on the low latency pool
if (isLowLatencyQuery && pool == "default") {
if (snappySession.sparkContext.getPoolForName(Constant.LOW_LATENCY_POOL).isDefined) {
Expand Down Expand Up @@ -255,7 +255,7 @@ class CachedDataFrame(snappySession: SnappySession, queryExecution: QueryExecuti
prepared = false
// reset the pool
if (isLowLatencyQuery) {
val pool = snappySession.sessionState.conf.activeSchedulerPool
val pool = snappySession.snappySessionState.conf.activeSchedulerPool
snappySession.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
// clear the shuffle dependencies asynchronously after the execution.
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/sql/SnappyContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ import org.apache.spark.sql.collection.{ToolsCallbackInit, Utils}
import org.apache.spark.sql.execution.columnar.ExternalStoreUtils.CaseInsensitiveMutableHashMap
import org.apache.spark.sql.execution.joins.HashedObjectCache
import org.apache.spark.sql.execution.{ConnectionPool, DeployCommand, DeployJarCommand}
import org.apache.spark.sql.hive.SnappyHiveExternalCatalog
import org.apache.spark.sql.internal.{SnappySessionState, SnappySharedState}
import org.apache.spark.sql.hive.{SnappyHiveExternalCatalog, SnappySessionState}
import org.apache.spark.sql.internal.{SessionState, SnappySharedState}
import org.apache.spark.sql.store.CodeGeneration
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{StructField, StructType}
Expand Down Expand Up @@ -96,7 +96,9 @@ class SnappyContext protected[spark](val snappySession: SnappySession)
override def newSession(): SnappyContext =
snappySession.newSession().snappyContext

override def sessionState: SnappySessionState = snappySession.sessionState
override def sessionState: SessionState = snappySession.sessionState

def snappySessionState: SnappySessionState = snappySession.snappySessionState

def clear(): Unit = {
snappySession.clear()
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/sql/SnappyParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1222,8 +1222,9 @@ class SnappyParser(session: SnappySession)
final def parse[T](sqlText: String, parseRule: => Try[T],
clearExecutionData: Boolean = false): T = session.synchronized {
session.clearQueryData()
if (clearExecutionData) session.sessionState.clearExecutionData()
caseSensitive = session.sessionState.conf.caseSensitiveAnalysis
val sessionState = session.snappySessionState
if (clearExecutionData) sessionState.clearExecutionData()
caseSensitive = sessionState.conf.caseSensitiveAnalysis
parseSQL(sqlText, parseRule)
}

Expand Down
42 changes: 15 additions & 27 deletions core/src/main/scala/org/apache/spark/sql/SnappySession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, Logi
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}
import org.apache.spark.sql.execution.ui.SparkListenerSQLPlanExecutionStart
import org.apache.spark.sql.hive.HiveClientUtil
import org.apache.spark.sql.hive.{HiveClientUtil, SnappySessionState}
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.internal.{BypassRowLevelSecurity, MarkerForCreateTableAsSelect, SnappySessionCatalog, SnappySessionState, SnappySharedState}
import org.apache.spark.sql.internal.{BypassRowLevelSecurity, MarkerForCreateTableAsSelect, SessionBase, SnappySessionCatalog}
import org.apache.spark.sql.row.SnappyStoreDialect
import org.apache.spark.sql.sources._
import org.apache.spark.sql.store.StoreUtils
Expand All @@ -78,7 +78,7 @@ import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.{Logging, ShuffleDependency, SparkContext, SparkEnv}


class SnappySession(_sc: SparkContext) extends SparkSession(_sc) {
class SnappySession(_sc: SparkContext) extends SessionBase(_sc) {

self =>

Expand All @@ -96,35 +96,19 @@ class SnappySession(_sc: SparkContext) extends SparkSession(_sc) {

new FinalizeSession(this)

private def sc: SparkContext = sparkContext

/**
* State shared across sessions, including the [[SparkContext]], cached data, listener,
* and a catalog that interacts with external systems.
*/
@transient
override lazy val sharedState: SnappySharedState = SnappyContext.sharedState(sparkContext)

/**
* State isolated across sessions, including SQL configurations, temporary tables, registered
* functions, and everything else that accepts a [[org.apache.spark.sql.internal.SQLConf]].
*/
@transient
lazy override val sessionState: SnappySessionState = {
SnappySession.aqpSessionStateClass match {
case Some(aqpClass) => aqpClass.getConstructor(classOf[SnappySession]).
newInstance(self).asInstanceOf[SnappySessionState]
case None => new SnappySessionState(self)
}
}
def snappySessionState: SnappySessionState = sessionState.asInstanceOf[SnappySessionState]

def sessionCatalog: SnappySessionCatalog = sessionState.catalog
def sessionCatalog: SnappySessionCatalog = snappySessionState.catalog

def externalCatalog: SnappyExternalCatalog = sessionState.catalog.externalCatalog
def externalCatalog: SnappyExternalCatalog = snappySessionState.catalog.externalCatalog

def snappyParser: SnappyParser = sessionState.sqlParser.sqlParser
def snappyParser: SnappyParser = snappySessionState.sqlParser.sqlParser

private[spark] def snappyContextFunctions = sessionState.contextFunctions
private[spark] def snappyContextFunctions = snappySessionState.contextFunctions

SnappyContext.initGlobalSnappyContext(sparkContext, this)
SnappyDataFunctions.registerSnappyFunctions(sessionState.functionRegistry)
Expand Down Expand Up @@ -191,6 +175,7 @@ class SnappySession(_sc: SparkContext) extends SparkSession(_sc) {
}

final def prepareSQL(sqlText: String): LogicalPlan = {
val sessionState = this.snappySessionState
val logical = sessionState.sqlParser.parsePlan(sqlText, clearExecutionData = true)
SparkSession.setActiveSession(this)
sessionState.analyzerPrepare.execute(logical)
Expand Down Expand Up @@ -247,6 +232,9 @@ class SnappySession(_sc: SparkContext) extends SparkSession(_sc) {
@transient
private[sql] var disableHashJoin: Boolean = Property.DisableHashJoin.get(sessionState.conf)

@transient
private[sql] var enableHiveSupport: Boolean = Property.EnableHiveSupport.get(sessionState.conf)

@transient
private var sqlWarnings: SQLWarning = _

Expand Down Expand Up @@ -1294,7 +1282,7 @@ class SnappySession(_sc: SparkContext) extends SparkSession(_sc) {
"not supported for temporary tables")
}

SnappyContext.getClusterMode(sc) match {
SnappyContext.getClusterMode(sparkContext) match {
case ThinClientConnectorMode(_, _) =>
throw new AnalysisException("ALTER TABLE enable/disable Row Level Security " +
"not supported for smart connector mode")
Expand Down Expand Up @@ -1790,7 +1778,7 @@ object SnappySession extends Logging {
GemFireVersion.isEnterpriseEdition
}

private lazy val aqpSessionStateClass: Option[Class[_]] = {
lazy val aqpSessionStateClass: Option[Class[_]] = {
if (isEnterpriseEdition) {
try {
Some(org.apache.spark.util.Utils.classForName(
Expand Down Expand Up @@ -1990,7 +1978,7 @@ object SnappySession extends Logging {
def getPlanCache: Cache[CachedKey, CachedDataFrame] = planCache

def sqlPlan(session: SnappySession, sqlText: String): CachedDataFrame = {
val parser = session.sessionState.sqlParser
val parser = session.snappySessionState.sqlParser
val plan = parser.parsePlan(sqlText, clearExecutionData = true)
val planCaching = session.planCaching
val paramLiterals = parser.sqlParser.getAllLiterals
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import org.apache.spark.sql.execution.columnar.ExternalStoreUtils
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, Exchange, ShuffleExchange}
import org.apache.spark.sql.execution.sources.PhysicalScan
import org.apache.spark.sql.internal.{JoinQueryPlanning, LogicalPlanWithHints, SQLConf, SnappySessionState}
import org.apache.spark.sql.hive.SnappySessionState
import org.apache.spark.sql.internal.{JoinQueryPlanning, LogicalPlanWithHints, SQLConf}
import org.apache.spark.sql.streaming._

/**
Expand Down Expand Up @@ -798,7 +799,7 @@ case class InsertCachedPlanFallback(session: SnappySession, topLevel: Boolean)
// or if the plan is not a top-level one e.g. a subquery or inside
// CollectAggregateExec (only top-level plan will catch and retry
// with disabled optimizations)
if (!topLevel || session.sessionState.disableStoreOptimizations) plan
if (!topLevel || session.snappySessionState.disableStoreOptimizations) plan
else plan match {
// TODO: disabled for StreamPlans due to issues but can it require fallback?
case _: StreamPlan => plan
Expand Down
Loading