diff --git a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/DataSourceManager.scala b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/DataSourceManager.scala index 965f0ba..eb01584 100644 --- a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/DataSourceManager.scala +++ b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/DataSourceManager.scala @@ -26,6 +26,7 @@ import scala.util.matching.Regex import net.sf.json.JSONArray import org.apache.spark.SparkException +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute} @@ -46,7 +47,7 @@ import org.apache.spark.util.Utils * [[ExternalCatalog]]'s abstract methods with default implementation which means specific kind of * operation on this datasource is not supported for now. */ -trait DataSourceManager extends ExternalCatalog { +trait DataSourceManager extends ExternalCatalog with Logging { import DataSourceManager._ def shortName(): String @@ -63,6 +64,11 @@ trait DataSourceManager extends ExternalCatalog { */ private var enablePushDown: Boolean = true + /** + * If true, XSQL will discover the schema of tables in the data source. + */ + private var discoverSchema: Boolean = false + /** * If true, XSQL will allow queries by structured stream API. */ @@ -180,6 +186,7 @@ trait DataSourceManager extends ExternalCatalog { dsName = dataSourceName defaultCluster = infos.get(CLUSTER) enablePushDown = java.lang.Boolean.valueOf(infos.getOrElse(PUSHDOWN, TRUE)) + discoverSchema = java.lang.Boolean.valueOf(infos.getOrElse(SCHEMAS_DISCOVER, FALSE)) enableStream = java.lang.Boolean.valueOf(infos.getOrElse(STREAM, FALSE)) val whitelistFile = infos.get(WHITELIST) if (whitelistFile.isDefined) { @@ -193,7 +200,7 @@ trait DataSourceManager extends ExternalCatalog { } else if (cachedProperties.contains("schemas.str")) { schemaReader(cachedProperties("schemas.str"), schemasMap) } - val discoverFile = cachedProperties.get(DISCOVER_SCHEMA) + val discoverFile = cachedProperties.get(SCHEMAS_DISCOVER_CONFIG) if (discoverFile.isDefined) { parseDiscoverFile(dataSourceName, discoverFile.get) } @@ -279,6 +286,11 @@ trait DataSourceManager extends ExternalCatalog { def isPushDown(runtimeConf: SQLConf, dataSourceName: String, plan: LogicalPlan): Boolean = enablePushDown + /** + * Check if the specified table contains fields need to discover. + */ + def isDiscover: Boolean = discoverSchema + /** * Structured streaming enabled or not. */ @@ -493,7 +505,32 @@ trait DataSourceManager extends ExternalCatalog { db: String, originDB: String, table: String): Option[CatalogTable] = { - throw new UnsupportedOperationException(s"Get ${shortName()} raw table not supported!") + if (isDiscover) { + logDebug(s"Discovering $dsName.$db.$table, $dsName.$originDB.$table in fact.") + discoverRawTable(db, originDB, table) + } else { + fastGetRawTable(db, originDB, table) + } + } + + /** + * Discover the schema of the specified table. + */ + protected def discoverRawTable( + db: String, + originDB: String, + table: String): Option[CatalogTable] = { + throw new UnsupportedOperationException(s"Discover ${shortName()} raw table not supported!") + } + + /** + * Get the schema of the specified table in fast way. + */ + protected def fastGetRawTable( + db: String, + originDB: String, + table: String): Option[CatalogTable] = { + throw new UnsupportedOperationException(s"Get ${shortName()} raw table fast not supported!") } /** @@ -961,8 +998,9 @@ object DataSourceManager { val PUSHDOWN = "pushdown" val STREAM = "stream" val SCHEMAS = "schemas" + val SCHEMAS_DISCOVER = "schemas.discover" val CACHE_LEVEL = "cache.level" - val DISCOVER_SCHEMA = "discover" + val SCHEMAS_DISCOVER_CONFIG = "schemas.discover.config" val TEMP_FLAG = "temp_flag" val CLUSTER = "cluster" diff --git a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/ElasticSearchManager.scala b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/ElasticSearchManager.scala index 5ae1632..90fc919 100644 --- a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/ElasticSearchManager.scala +++ b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/ElasticSearchManager.scala @@ -201,7 +201,7 @@ private[xsql] class ElasticSearchManager(conf: SparkConf) extends DataSourceMana while (itr.hasNext()) { val typeName = itr.next().asInstanceOf[String] if (isSelectedTable(whiteTables, blackTables, typeName)) { - val tableOpt = if (isDiscover(originDBName, typeName)) { + val tableOpt = if (isDiscover) { discoverRawTable(dbName, originDBName, typeName) } else { getTableOption(dataSourceName, dbName, originDBName, typeName, mappingsObj) @@ -342,49 +342,7 @@ private[xsql] class ElasticSearchManager(conf: SparkConf) extends DataSourceMana } } - override protected def doGetRawTable( - db: String, - originDB: String, - table: String): Option[CatalogTable] = { - if (isDiscover(originDB, table)) { - discoverRawTable(db, originDB, table) - } else { - logDebug(s"Looking up $dsName.$db.$table, $dsName.$originDB.$table in fact.") - val response = restClient.performRequest( - HttpGet.METHOD_NAME, - s"/$originDB/$table/_mapping", - Collections.singletonMap("pretty", "true")) - val rootObj = JSONObject.fromObject(EntityUtils.toString(response.getEntity()).trim) - val dbObj = rootObj.get(originDB).asInstanceOf[JSONObject] - val mappingsObj = dbObj.get(MAPPINGS).asInstanceOf[JSONObject] - val itr = mappingsObj.keys() - if (itr.hasNext()) { - val typeName = itr.next().asInstanceOf[String] - getTableOption(dsName, db, originDB, typeName, mappingsObj) - } else { - None - } - } - } - - /** - * Check if the specified table contains fields need to discover. - */ - private def isDiscover(originDB: String, table: String): Boolean = { - var flag = false - if (discoverFields.contains(originDB)) { - val tableDiscoverFields = discoverFields.get(originDB) - if (tableDiscoverFields.isDefined && tableDiscoverFields.get.contains(table)) { - flag = true - } - } - flag - } - - /** - * Discover the schema of the specified table. - */ - private def discoverRawTable( + override protected def discoverRawTable( db: String, originDB: String, table: String): Option[CatalogTable] = { @@ -397,6 +355,27 @@ private[xsql] class ElasticSearchManager(conf: SparkConf) extends DataSourceMana getTableOption(dsName, db, originDB, table, lazySchema.struct, Some(parameters)) } + override protected def fastGetRawTable( + db: String, + originDB: String, + table: String): Option[CatalogTable] = { + logDebug(s"Looking up $dsName.$db.$table, $dsName.$originDB.$table in fact.") + val response = restClient.performRequest( + HttpGet.METHOD_NAME, + s"/$originDB/$table/_mapping", + Collections.singletonMap("pretty", "true")) + val rootObj = JSONObject.fromObject(EntityUtils.toString(response.getEntity()).trim) + val dbObj = rootObj.get(originDB).asInstanceOf[JSONObject] + val mappingsObj = dbObj.get(MAPPINGS).asInstanceOf[JSONObject] + val itr = mappingsObj.keys() + if (itr.hasNext()) { + val typeName = itr.next().asInstanceOf[String] + getTableOption(dsName, db, originDB, typeName, mappingsObj) + } else { + None + } + } + private def getTableOption( dsName: String, dbName: String, diff --git a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/MongoManager.scala b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/MongoManager.scala index 6f424c6..b1a4ea4 100644 --- a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/MongoManager.scala +++ b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/MongoManager.scala @@ -72,6 +72,8 @@ private[xsql] class MongoManager(conf: SparkConf) extends DataSourceManager with private var mongoClient: MongoClient = _ + override def isDiscover: Boolean = true + override protected def cacheDatabase( isDefault: Boolean, dataSourceName: String, @@ -251,6 +253,37 @@ private[xsql] class MongoManager(conf: SparkConf) extends DataSourceManager with list.contains(table) } + override protected def discoverRawTable( + db: String, + originDB: String, + table: String): Option[CatalogTable] = { + val sc = SparkContext.getActive.get + val url = cachedProperties(URL) + val options = new HashMap[String, String] + options += ((MONGODB_INPUT_URI, url)) + options += ((MONGODB_INPUT_DATABASE, originDB)) + options += ((MONGODB_INPUT_COLLECTION, table)) + val mongoRDD: MongoRDD[org.bson.BsonDocument] = + sc.loadFromMongoDB(ReadConfig(options.toMap)) + val schema = MongoInferSchema(mongoRDD) + val catalogTable = CatalogTable( + identifier = TableIdentifier(table, Option(db), Option(dsName)), + tableType = CatalogTableType.COLLECTION, + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + compressed = false, + properties = Map( + MONGODB_INPUT_URI -> url, + MONGODB_INPUT_DATABASE -> originDB, + MONGODB_INPUT_COLLECTION -> table)), + schema = schema, + provider = Some(FULL_PROVIDER)) + Option(catalogTable) + } + override def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { val dbName = tableDefinition.database val collectionName = tableDefinition.identifier.table