Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, Date, List => JList}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -126,7 +125,7 @@ private[spark] class MesosClusterScheduler(
private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
private val schedulerState = engineFactory.createEngine("scheduler")
private val stateLock = new ReentrantLock()
private val stateLock = new Object()
private val finishedDrivers =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessary to use ReentrantLock here since we only use synchronized

new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers)
private var frameworkId: String = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,7 @@ private class ServerConnection extends LauncherConnection {
protected void handle(Message msg) throws IOException {
try {
if (msg instanceof Hello) {
synchronized (timeout) {
timeout.cancel();
}
timeout.cancel();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeout.cancel already uses a lock internally.

timeout = null;
Hello hello = (Hello) msg;
ChildProcAppHandle handle = pending.remove(hello.secret);
Expand Down
2 changes: 1 addition & 1 deletion launcher/src/main/java/org/apache/spark/launcher/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private static class MainClassOptionParser extends SparkSubmitOptionParser {

@Override
protected boolean handle(String opt, String value) {
if (opt == CLASS) {
if (CLASS.equals(opt)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not use == in Java

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

className = value;
}
return false;
Expand Down
4 changes: 2 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,9 @@ private[tree] object LearningNode {
var levelsToGo = indexToLevel(nodeIndex)
while (levelsToGo > 0) {
if ((nodeIndex & (1 << levelsToGo - 1)) == 0) {
tmpNode = tmpNode.leftChild.asInstanceOf[LearningNode]
tmpNode = tmpNode.leftChild.get
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftChild and rightChild are Option[LearningNode]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkbradley was this code never run before?

} else {
tmpNode = tmpNode.rightChild.asInstanceOf[LearningNode]
tmpNode = tmpNode.rightChild.get
}
levelsToGo -= 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,12 @@ final class DataFrameWriter private[sql](df: DataFrame) {
if (!tableExists) {
val schema = JdbcUtils.schemaString(df, url)
val sql = s"CREATE TABLE $table ($schema)"
conn.createStatement.executeUpdate(sql)
val statement = conn.createStatement
try {
statement.executeUpdate(sql)
} finally {
statement.close()
}
}
} finally {
conn.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,30 +122,35 @@ private[sql] object JDBCRDD extends Logging {
val dialect = JdbcDialects.get(url)
val conn: Connection = getConnector(properties.getProperty("driver"), url, properties)()
try {
val rs = conn.prepareStatement(s"SELECT * FROM $table WHERE 1=0").executeQuery()
val statement = conn.prepareStatement(s"SELECT * FROM $table WHERE 1=0")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try {
val rsmd = rs.getMetaData
val ncols = rsmd.getColumnCount
val fields = new Array[StructField](ncols)
var i = 0
while (i < ncols) {
val columnName = rsmd.getColumnLabel(i + 1)
val dataType = rsmd.getColumnType(i + 1)
val typeName = rsmd.getColumnTypeName(i + 1)
val fieldSize = rsmd.getPrecision(i + 1)
val fieldScale = rsmd.getScale(i + 1)
val isSigned = rsmd.isSigned(i + 1)
val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
val metadata = new MetadataBuilder().putString("name", columnName)
val columnType =
dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse(
getCatalystType(dataType, fieldSize, fieldScale, isSigned))
fields(i) = StructField(columnName, columnType, nullable, metadata.build())
i = i + 1
val rs = statement.executeQuery()
try {
val rsmd = rs.getMetaData
val ncols = rsmd.getColumnCount
val fields = new Array[StructField](ncols)
var i = 0
while (i < ncols) {
val columnName = rsmd.getColumnLabel(i + 1)
val dataType = rsmd.getColumnType(i + 1)
val typeName = rsmd.getColumnTypeName(i + 1)
val fieldSize = rsmd.getPrecision(i + 1)
val fieldScale = rsmd.getScale(i + 1)
val isSigned = rsmd.isSigned(i + 1)
val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
val metadata = new MetadataBuilder().putString("name", columnName)
val columnType =
dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse(
getCatalystType(dataType, fieldSize, fieldScale, isSigned))
fields(i) = StructField(columnName, columnType, nullable, metadata.build())
i = i + 1
}
return new StructType(fields)
} finally {
rs.close()
}
return new StructType(fields)
} finally {
rs.close()
statement.close()
}
} finally {
conn.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,26 @@ object JdbcUtils extends Logging {
// Somewhat hacky, but there isn't a good way to identify whether a table exists for all
// SQL database systems using JDBC meta data calls, considering "table" could also include
// the database name. Query used to find table exists can be overriden by the dialects.
Try(conn.prepareStatement(dialect.getTableExistsQuery(table)).executeQuery()).isSuccess
Try {
val statement = conn.prepareStatement(dialect.getTableExistsQuery(table))
try {
statement.executeQuery()
} finally {
statement.close()
}
}.isSuccess
}

/**
* Drops a table from the JDBC database.
*/
def dropTable(conn: Connection, table: String): Unit = {
conn.createStatement.executeUpdate(s"DROP TABLE $table")
val statement = conn.createStatement
try {
statement.executeUpdate(s"DROP TABLE $table")
} finally {
statement.close()
}
}

/**
Expand Down