Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.util.matching.Regex
import net.sf.json.JSONArray

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
Expand All @@ -46,7 +47,7 @@ import org.apache.spark.util.Utils
* [[ExternalCatalog]]'s abstract methods with default implementation which means specific kind of
* operation on this datasource is not supported for now.
*/
trait DataSourceManager extends ExternalCatalog {
trait DataSourceManager extends ExternalCatalog with Logging {
import DataSourceManager._

def shortName(): String
Expand All @@ -63,6 +64,11 @@ trait DataSourceManager extends ExternalCatalog {
*/
private var enablePushDown: Boolean = true

/**
* If true, XSQL will discover the schema of tables in the data source.
*/
private var discoverSchema: Boolean = false

/**
* If true, XSQL will allow queries by structured stream API.
*/
Expand Down Expand Up @@ -180,6 +186,7 @@ trait DataSourceManager extends ExternalCatalog {
dsName = dataSourceName
defaultCluster = infos.get(CLUSTER)
enablePushDown = java.lang.Boolean.valueOf(infos.getOrElse(PUSHDOWN, TRUE))
discoverSchema = java.lang.Boolean.valueOf(infos.getOrElse(SCHEMAS_DISCOVER, FALSE))
enableStream = java.lang.Boolean.valueOf(infos.getOrElse(STREAM, FALSE))
val whitelistFile = infos.get(WHITELIST)
if (whitelistFile.isDefined) {
Expand All @@ -193,7 +200,7 @@ trait DataSourceManager extends ExternalCatalog {
} else if (cachedProperties.contains("schemas.str")) {
schemaReader(cachedProperties("schemas.str"), schemasMap)
}
val discoverFile = cachedProperties.get(DISCOVER_SCHEMA)
val discoverFile = cachedProperties.get(SCHEMAS_DISCOVER_CONFIG)
if (discoverFile.isDefined) {
parseDiscoverFile(dataSourceName, discoverFile.get)
}
Expand Down Expand Up @@ -279,6 +286,11 @@ trait DataSourceManager extends ExternalCatalog {
def isPushDown(runtimeConf: SQLConf, dataSourceName: String, plan: LogicalPlan): Boolean =
enablePushDown

/**
* Check if the specified table contains fields need to discover.
*/
def isDiscover: Boolean = discoverSchema

/**
* Structured streaming enabled or not.
*/
Expand Down Expand Up @@ -493,7 +505,32 @@ trait DataSourceManager extends ExternalCatalog {
db: String,
originDB: String,
table: String): Option[CatalogTable] = {
throw new UnsupportedOperationException(s"Get ${shortName()} raw table not supported!")
if (isDiscover) {
logDebug(s"Discovering $dsName.$db.$table, $dsName.$originDB.$table in fact.")
discoverRawTable(db, originDB, table)
} else {
fastGetRawTable(db, originDB, table)
}
}

/**
* Discover the schema of the specified table.
*/
protected def discoverRawTable(
db: String,
originDB: String,
table: String): Option[CatalogTable] = {
throw new UnsupportedOperationException(s"Discover ${shortName()} raw table not supported!")
}

/**
* Get the schema of the specified table in fast way.
*/
protected def fastGetRawTable(
db: String,
originDB: String,
table: String): Option[CatalogTable] = {
throw new UnsupportedOperationException(s"Get ${shortName()} raw table fast not supported!")
}

/**
Expand Down Expand Up @@ -961,8 +998,9 @@ object DataSourceManager {
val PUSHDOWN = "pushdown"
val STREAM = "stream"
val SCHEMAS = "schemas"
val SCHEMAS_DISCOVER = "schemas.discover"
val CACHE_LEVEL = "cache.level"
val DISCOVER_SCHEMA = "discover"
val SCHEMAS_DISCOVER_CONFIG = "schemas.discover.config"
val TEMP_FLAG = "temp_flag"

val CLUSTER = "cluster"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private[xsql] class ElasticSearchManager(conf: SparkConf) extends DataSourceMana
while (itr.hasNext()) {
val typeName = itr.next().asInstanceOf[String]
if (isSelectedTable(whiteTables, blackTables, typeName)) {
val tableOpt = if (isDiscover(originDBName, typeName)) {
val tableOpt = if (isDiscover) {
discoverRawTable(dbName, originDBName, typeName)
} else {
getTableOption(dataSourceName, dbName, originDBName, typeName, mappingsObj)
Expand Down Expand Up @@ -342,49 +342,7 @@ private[xsql] class ElasticSearchManager(conf: SparkConf) extends DataSourceMana
}
}

override protected def doGetRawTable(
db: String,
originDB: String,
table: String): Option[CatalogTable] = {
if (isDiscover(originDB, table)) {
discoverRawTable(db, originDB, table)
} else {
logDebug(s"Looking up $dsName.$db.$table, $dsName.$originDB.$table in fact.")
val response = restClient.performRequest(
HttpGet.METHOD_NAME,
s"/$originDB/$table/_mapping",
Collections.singletonMap("pretty", "true"))
val rootObj = JSONObject.fromObject(EntityUtils.toString(response.getEntity()).trim)
val dbObj = rootObj.get(originDB).asInstanceOf[JSONObject]
val mappingsObj = dbObj.get(MAPPINGS).asInstanceOf[JSONObject]
val itr = mappingsObj.keys()
if (itr.hasNext()) {
val typeName = itr.next().asInstanceOf[String]
getTableOption(dsName, db, originDB, typeName, mappingsObj)
} else {
None
}
}
}

/**
* Check if the specified table contains fields need to discover.
*/
private def isDiscover(originDB: String, table: String): Boolean = {
var flag = false
if (discoverFields.contains(originDB)) {
val tableDiscoverFields = discoverFields.get(originDB)
if (tableDiscoverFields.isDefined && tableDiscoverFields.get.contains(table)) {
flag = true
}
}
flag
}

/**
* Discover the schema of the specified table.
*/
private def discoverRawTable(
override protected def discoverRawTable(
db: String,
originDB: String,
table: String): Option[CatalogTable] = {
Expand All @@ -397,6 +355,27 @@ private[xsql] class ElasticSearchManager(conf: SparkConf) extends DataSourceMana
getTableOption(dsName, db, originDB, table, lazySchema.struct, Some(parameters))
}

override protected def fastGetRawTable(
db: String,
originDB: String,
table: String): Option[CatalogTable] = {
logDebug(s"Looking up $dsName.$db.$table, $dsName.$originDB.$table in fact.")
val response = restClient.performRequest(
HttpGet.METHOD_NAME,
s"/$originDB/$table/_mapping",
Collections.singletonMap("pretty", "true"))
val rootObj = JSONObject.fromObject(EntityUtils.toString(response.getEntity()).trim)
val dbObj = rootObj.get(originDB).asInstanceOf[JSONObject]
val mappingsObj = dbObj.get(MAPPINGS).asInstanceOf[JSONObject]
val itr = mappingsObj.keys()
if (itr.hasNext()) {
val typeName = itr.next().asInstanceOf[String]
getTableOption(dsName, db, originDB, typeName, mappingsObj)
} else {
None
}
}

private def getTableOption(
dsName: String,
dbName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ private[xsql] class MongoManager(conf: SparkConf) extends DataSourceManager with

private var mongoClient: MongoClient = _

override def isDiscover: Boolean = true

override protected def cacheDatabase(
isDefault: Boolean,
dataSourceName: String,
Expand Down Expand Up @@ -251,6 +253,37 @@ private[xsql] class MongoManager(conf: SparkConf) extends DataSourceManager with
list.contains(table)
}

override protected def discoverRawTable(
db: String,
originDB: String,
table: String): Option[CatalogTable] = {
val sc = SparkContext.getActive.get
val url = cachedProperties(URL)
val options = new HashMap[String, String]
options += ((MONGODB_INPUT_URI, url))
options += ((MONGODB_INPUT_DATABASE, originDB))
options += ((MONGODB_INPUT_COLLECTION, table))
val mongoRDD: MongoRDD[org.bson.BsonDocument] =
sc.loadFromMongoDB(ReadConfig(options.toMap))
val schema = MongoInferSchema(mongoRDD)
val catalogTable = CatalogTable(
identifier = TableIdentifier(table, Option(db), Option(dsName)),
tableType = CatalogTableType.COLLECTION,
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = None,
outputFormat = None,
serde = None,
compressed = false,
properties = Map(
MONGODB_INPUT_URI -> url,
MONGODB_INPUT_DATABASE -> originDB,
MONGODB_INPUT_COLLECTION -> table)),
schema = schema,
provider = Some(FULL_PROVIDER))
Option(catalogTable)
}

override def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
val dbName = tableDefinition.database
val collectionName = tableDefinition.identifier.table
Expand Down