From a495e975c28d60fbdab57b980abd5d3462d98580 Mon Sep 17 00:00:00 2001 From: wenfang Date: Fri, 18 Oct 2019 15:44:40 +0800 Subject: [PATCH 1/3] Fix mysql datasource can't partition --- .../spark/sql/xsql/manager/MysqlManager.scala | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/MysqlManager.scala b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/MysqlManager.scala index 3b33f18..1813366 100644 --- a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/MysqlManager.scala +++ b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/MysqlManager.scala @@ -104,6 +104,25 @@ private[xsql] class MysqlManager(conf: SparkConf) extends DataSourceManager with } } + /** + * Cache special properties for mysql dataSource + */ + private def cacheSpecialProperties( + dsName: String, + dbName: String, + tbName: String): Unit = { + val tablePartitionsMap = partitionsMap.get(dbName) + var partitionsParameters = new HashMap[String, String] + if (tablePartitionsMap != None) { + if (tablePartitionsMap.get.get(tbName) != None) { + partitionsParameters = tablePartitionsMap.get.get(tbName).get + } + } + if (partitionsParameters.nonEmpty) { + specialProperties += ((s"${dsName}.${dbName}.${tbName}", partitionsParameters)) + } + } + @throws[SQLException] implicit private def typeConvertor(rs: ResultSet) = { val list = new ArrayBuffer[HashMap[String, Object]]() @@ -206,7 +225,8 @@ private[xsql] class MysqlManager(conf: SparkConf) extends DataSourceManager with } } if (partitionsParameters.nonEmpty) { - specialProperties += ((s"${dbName}.${tbName}", partitionsParameters)) + specialProperties += + ((s"${dataSourceName}.${dbName}.${tbName}", partitionsParameters)) } // The storage contains the following info // jdbcOptions: @@ -226,7 +246,8 @@ private[xsql] class MysqlManager(conf: SparkConf) extends DataSourceManager with tableType = CatalogTableType.JDBC, storage = CatalogStorageFormat.empty.copy( properties = jdbcOptions.asProperties.asScala.toMap ++ - specialProperties.getOrElse(s"${dbName}.${tbName}", Map.empty[String, String])), + specialProperties. + getOrElse(s"${dataSourceName}.${dbName}.${tbName}", Map.empty[String, String])), schema = schema, provider = Some(FULL_PROVIDER)) xtables += ((tbName, tb)) @@ -574,13 +595,15 @@ private[xsql] class MysqlManager(conf: SparkConf) extends DataSourceManager with val conn = getConnect() val jdbcOptions = setJdbcOptions(dbName, table) val schema = resolveTableConnnectOnce(conn, jdbcOptions) + cacheSpecialProperties(dsName, dbName, table) Option( CatalogTable( identifier = TableIdentifier(table, Option(dbName), Option(dsName)), tableType = CatalogTableType.JDBC, storage = CatalogStorageFormat.empty.copy( properties = jdbcOptions.asProperties.asScala.toMap ++ - specialProperties.getOrElse(s"${dbName}.${table}", Map.empty[String, String])), + specialProperties + .getOrElse(s"${dsName}.${dbName}.${table}", Map.empty[String, String])), schema = schema, provider = Some(FULL_PROVIDER))) } From 13cfae7b3d7c251232f4ce48127575439914ae37 Mon Sep 17 00:00:00 2001 From: wenfang Date: Mon, 21 Oct 2019 11:35:07 +0800 Subject: [PATCH 2/3] Optimization code --- .../org/apache/spark/sql/xsql/manager/MysqlManager.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/MysqlManager.scala b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/MysqlManager.scala index 1813366..ffb662d 100644 --- a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/MysqlManager.scala +++ b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/MysqlManager.scala @@ -112,15 +112,11 @@ private[xsql] class MysqlManager(conf: SparkConf) extends DataSourceManager with dbName: String, tbName: String): Unit = { val tablePartitionsMap = partitionsMap.get(dbName) - var partitionsParameters = new HashMap[String, String] if (tablePartitionsMap != None) { - if (tablePartitionsMap.get.get(tbName) != None) { - partitionsParameters = tablePartitionsMap.get.get(tbName).get + tablePartitionsMap.get.get(tbName).foreach { m => + specialProperties += ((s"${dsName}.${dbName}.${tbName}", m)) } } - if (partitionsParameters.nonEmpty) { - specialProperties += ((s"${dsName}.${dbName}.${tbName}", partitionsParameters)) - } } @throws[SQLException] From 3d5c1099accba437e006d34e2d9d0247b8b30ffc Mon Sep 17 00:00:00 2001 From: wenfang Date: Tue, 22 Oct 2019 11:11:38 +0800 Subject: [PATCH 3/3] Optimization code --- .../org/apache/spark/sql/xsql/manager/MysqlManager.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/MysqlManager.scala b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/MysqlManager.scala index ffb662d..4518baa 100644 --- a/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/MysqlManager.scala +++ b/sql/xsql/src/main/scala/org/apache/spark/sql/xsql/manager/MysqlManager.scala @@ -111,9 +111,8 @@ private[xsql] class MysqlManager(conf: SparkConf) extends DataSourceManager with dsName: String, dbName: String, tbName: String): Unit = { - val tablePartitionsMap = partitionsMap.get(dbName) - if (tablePartitionsMap != None) { - tablePartitionsMap.get.get(tbName).foreach { m => + partitionsMap.get(dbName).foreach { tablePartitionsMap => + tablePartitionsMap.get(tbName).foreach {m => specialProperties += ((s"${dsName}.${dbName}.${tbName}", m)) } }