Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.SparkContext._

/**
* Executes a roll up-style query against Apache logs.
*
*
* Usage: LogQuery [logFile]
*/
object LogQuery {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,22 @@ object DenseGaussianMixture {
private def run(inputFile: String, k: Int, convergenceTol: Double, maxIterations: Int) {
val conf = new SparkConf().setAppName("Gaussian Mixture Model EM example")
val ctx = new SparkContext(conf)

val data = ctx.textFile(inputFile).map { line =>
Vectors.dense(line.trim.split(' ').map(_.toDouble))
}.cache()

val clusters = new GaussianMixture()
.setK(k)
.setConvergenceTol(convergenceTol)
.setMaxIterations(maxIterations)
.run(data)

for (i <- 0 until clusters.k) {
println("weight=%f\nmu=%s\nsigma=\n%s\n" format
println("weight=%f\nmu=%s\nsigma=\n%s\n" format
(clusters.weights(i), clusters.gaussians(i).mu, clusters.gaussians(i).sigma))
}

println("Cluster labels (first <= 100):")
val clusterLabels = clusters.predict(data)
clusterLabels.take(100).foreach { x =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object MQTTPublisher {
StreamingExamples.setStreamingLogLevels()

val Seq(brokerUrl, topic) = args.toSeq

var client: MqttClient = null

try {
Expand All @@ -59,10 +59,10 @@ object MQTTPublisher {
println(s"Published data. topic: ${msgtopic.getName()}; Message: $message")
} catch {
case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
Thread.sleep(10)
Thread.sleep(10)
println("Queue is full, wait for to consume data from the message queue")
}
}
}
}
} catch {
case e: MqttException => println("Exception Caught: " + e)
} finally {
Expand Down Expand Up @@ -107,7 +107,7 @@ object MQTTWordCount {
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
val words = lines.flatMap(x => x.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts.print()
ssc.start()
ssc.awaitTermination()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ class FlumeReceiver(
val channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())
val channelPipelineFactory = new CompressionChannelPipelineFactory()

new NettyServer(
responder,
responder,
new InetSocketAddress(host, port),
channelFactory,
channelPipelineFactory,
Expand Down Expand Up @@ -188,12 +188,12 @@ class FlumeReceiver(

override def preferredLocation: Option[String] = Option(host)

/** A Netty Pipeline factory that will decompress incoming data from
/** A Netty Pipeline factory that will decompress incoming data from
* and the Netty client and compress data going back to the client.
*
* The compression on the return is required because Flume requires
* a successful response to indicate it can remove the event/batch
* from the configured channel
* a successful response to indicate it can remove the event/batch
* from the configured channel
*/
private[streaming]
class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private[streaming] class FlumePollingReceiver(
}

/**
* A wrapper around the transceiver and the Avro IPC API.
* A wrapper around the transceiver and the Avro IPC API.
* @param transceiver The transceiver to use for communication with Flume
* @param client The client that the callbacks are received on.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
val status = client.appendBatch(inputEvents.toList)
status should be (avro.Status.OK)
}

eventually(timeout(10 seconds), interval(100 milliseconds)) {
val outputEvents = outputBuffer.flatten.map { _.event }
outputEvents.foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
r.flatMap { tm: TopicMetadata =>
tm.partitionsMetadata.map { pm: PartitionMetadata =>
TopicAndPartition(tm.topic, pm.partitionId)
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ object KafkaUtils {
* Points to note:
* - No receivers: This stream does not use any receiver. It directly queries Kafka
* - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
* by the stream itself. For interoperability with Kafka monitoring tools that depend on
* by the stream itself. For interoperability with Kafka monitoring tools that depend on
* Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
* You can access the offsets used in each batch from the generated RDDs (see
* [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
Expand Down Expand Up @@ -363,7 +363,7 @@ object KafkaUtils {
* Points to note:
* - No receivers: This stream does not use any receiver. It directly queries Kafka
* - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
* by the stream itself. For interoperability with Kafka monitoring tools that depend on
* by the stream itself. For interoperability with Kafka monitoring tools that depend on
* Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
* You can access the offsets used in each batch from the generated RDDs (see
* [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
Expand Down Expand Up @@ -427,7 +427,7 @@ object KafkaUtils {
* Points to note:
* - No receivers: This stream does not use any receiver. It directly queries Kafka
* - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
* by the stream itself. For interoperability with Kafka monitoring tools that depend on
* by the stream itself. For interoperability with Kafka monitoring tools that depend on
* Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
* You can access the offsets used in each batch from the generated RDDs (see
* [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
Expand Down Expand Up @@ -489,7 +489,7 @@ object KafkaUtils {
* Points to note:
* - No receivers: This stream does not use any receiver. It directly queries Kafka
* - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
* by the stream itself. For interoperability with Kafka monitoring tools that depend on
* by the stream itself. For interoperability with Kafka monitoring tools that depend on
* Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
* You can access the offsets used in each batch from the generated RDDs (see
* [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ object KinesisWordCountASL extends Logging {
val batchInterval = Milliseconds(2000)

// Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
// on sequence number of records that have been received. Same as batchInterval for this
// on sequence number of records that have been received. Same as batchInterval for this
// example.
val kinesisCheckpointInterval = batchInterval

Expand All @@ -145,7 +145,7 @@ object KinesisWordCountASL extends Logging {

// Map each word to a (word, 1) tuple so we can reduce by key to count the words
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

// Print the first 10 wordCounts
wordCounts.print()

Expand Down Expand Up @@ -210,14 +210,14 @@ object KinesisWordProducerASL {

val randomWords = List("spark", "you", "are", "my", "father")
val totals = scala.collection.mutable.Map[String, Int]()

// Create the low-level Kinesis Client from the AWS Java SDK.
val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
kinesisClient.setEndpoint(endpoint)

println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
s" $recordsPerSecond records per second and $wordsPerRecord words per record")

// Iterate and put records onto the stream per the given recordPerSec and wordsPerRecord
for (i <- 1 to 10) {
// Generate recordsPerSec records to put onto the stream
Expand Down Expand Up @@ -255,8 +255,8 @@ object KinesisWordProducerASL {
}
}

/**
* Utility functions for Spark Streaming examples.
/**
* Utility functions for Spark Streaming examples.
* This has been lifted from the examples/ project to remove the circular dependency.
*/
private[streaming] object StreamingExamples extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ import org.apache.spark.util.{Clock, ManualClock, SystemClock}
/**
* This is a helper class for managing checkpoint clocks.
*
* @param checkpointInterval
* @param checkpointInterval
* @param currentClock. Default to current SystemClock if none is passed in (mocking purposes)
*/
private[kinesis] class KinesisCheckpointState(
checkpointInterval: Duration,
checkpointInterval: Duration,
currentClock: Clock = new SystemClock())
extends Logging {

/* Initialize the checkpoint clock using the given currentClock + checkpointInterval millis */
val checkpointClock = new ManualClock()
checkpointClock.setTime(currentClock.getTimeMillis() + checkpointInterval.milliseconds)

/**
* Check if it's time to checkpoint based on the current time and the derived time
* Check if it's time to checkpoint based on the current time and the derived time
* for the next checkpoint
*
* @return true if it's time to checkpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
* https://github.com/awslabs/amazon-kinesis-client
* This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here:
* http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* Instances of this class will get shipped to the Spark Streaming Workers to run within a
* Instances of this class will get shipped to the Spark Streaming Workers to run within a
* Spark Executor.
*
* @param appName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
* by the Kinesis Client Library. If you change the App name or Stream name,
* the KCL will throw errors. This usually requires deleting the backing
* the KCL will throw errors. This usually requires deleting the backing
* DynamoDB table with the same name this Kinesis application.
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
Expand Down Expand Up @@ -87,7 +87,7 @@ private[kinesis] class KinesisReceiver(
*/

/**
* workerId is used by the KCL should be based on the ip address of the actual Spark Worker
* workerId is used by the KCL should be based on the ip address of the actual Spark Worker
* where this code runs (not the driver's IP address.)
*/
private var workerId: String = null
Expand Down Expand Up @@ -121,7 +121,7 @@ private[kinesis] class KinesisReceiver(

/*
* RecordProcessorFactory creates impls of IRecordProcessor.
* IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
* IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
* IRecordProcessor.processRecords() method.
* We're using our custom KinesisRecordProcessor in this case.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import com.amazonaws.services.kinesis.model.Record
/**
* Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
* This implementation operates on the Array[Byte] from the KinesisReceiver.
* The Kinesis Worker creates an instance of this KinesisRecordProcessor for each
* shard in the Kinesis stream upon startup. This is normally done in separate threads,
* but the KCLs within the KinesisReceivers will balance themselves out if you create
* The Kinesis Worker creates an instance of this KinesisRecordProcessor for each
* shard in the Kinesis stream upon startup. This is normally done in separate threads,
* but the KCLs within the KinesisReceivers will balance themselves out if you create
* multiple Receivers.
*
* @param receiver Kinesis receiver
Expand Down Expand Up @@ -69,34 +69,34 @@ private[kinesis] class KinesisRecordProcessor(
* and Spark Streaming's Receiver.store().
*
* @param batch list of records from the Kinesis stream shard
* @param checkpointer used to update Kinesis when this batch has been processed/stored
* @param checkpointer used to update Kinesis when this batch has been processed/stored
* in the DStream
*/
override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) {
if (!receiver.isStopped()) {
try {
/*
* Notes:
* Notes:
* 1) If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming
* Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the
* internally-configured Spark serializer (kryo, etc).
* 2) This is not desirable, so we instead store a raw Array[Byte] and decouple
* ourselves from Spark's internal serialization strategy.
* 3) For performance, the BlockGenerator is asynchronously queuing elements within its
* memory before creating blocks. This prevents the small block scenario, but requires
* that you register callbacks to know when a block has been generated and stored
* that you register callbacks to know when a block has been generated and stored
* (WAL is sufficient for storage) before can checkpoint back to the source.
*/
batch.foreach(record => receiver.store(record.getData().array()))

logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")

/*
* Checkpoint the sequence number of the last record successfully processed/stored
* Checkpoint the sequence number of the last record successfully processed/stored
* in the batch.
* In this implementation, we're checkpointing after the given checkpointIntervalMillis.
* Note that this logic requires that processRecords() be called AND that it's time to
* checkpoint. I point this out because there is no background thread running the
* Note that this logic requires that processRecords() be called AND that it's time to
* checkpoint. I point this out because there is no background thread running the
* checkpointer. Checkpointing is tested and trigger only when a new batch comes in.
* If the worker is shutdown cleanly, checkpoint will happen (see shutdown() below).
* However, if the worker dies unexpectedly, a checkpoint may not happen.
Expand Down Expand Up @@ -130,16 +130,16 @@ private[kinesis] class KinesisRecordProcessor(
}
} else {
/* RecordProcessor has been stopped. */
logInfo(s"Stopped: The Spark KinesisReceiver has stopped for workerId $workerId" +
logInfo(s"Stopped: The Spark KinesisReceiver has stopped for workerId $workerId" +
s" and shardId $shardId. No more records will be processed.")
}
}

/**
* Kinesis Client Library is shutting down this Worker for 1 of 2 reasons:
* 1) the stream is resharding by splitting or merging adjacent shards
* 1) the stream is resharding by splitting or merging adjacent shards
* (ShutdownReason.TERMINATE)
* 2) the failed or latent Worker has stopped sending heartbeats for whatever reason
* 2) the failed or latent Worker has stopped sending heartbeats for whatever reason
* (ShutdownReason.ZOMBIE)
*
* @param checkpointer used to perform a Kinesis checkpoint for ShutdownReason.TERMINATE
Expand All @@ -153,7 +153,7 @@ private[kinesis] class KinesisRecordProcessor(
* Checkpoint to indicate that all records from the shard have been drained and processed.
* It's now OK to read from the new shards that resulted from a resharding event.
*/
case ShutdownReason.TERMINATE =>
case ShutdownReason.TERMINATE =>
KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)

/*
Expand Down
10 changes: 5 additions & 5 deletions graphx/src/test/scala/org/apache/spark/graphx/EdgeSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ class EdgeSuite extends SparkFunSuite {
test ("compare") {
// decending order
val testEdges: Array[Edge[Int]] = Array(
Edge(0x7FEDCBA987654321L, -0x7FEDCBA987654321L, 1),
Edge(0x2345L, 0x1234L, 1),
Edge(0x1234L, 0x5678L, 1),
Edge(0x1234L, 0x2345L, 1),
Edge(0x7FEDCBA987654321L, -0x7FEDCBA987654321L, 1),
Edge(0x2345L, 0x1234L, 1),
Edge(0x1234L, 0x5678L, 1),
Edge(0x1234L, 0x2345L, 1),
Edge(-0x7FEDCBA987654321L, 0x7FEDCBA987654321L, 1)
)
// to ascending order
val sortedEdges = testEdges.sorted(Edge.lexicographicOrdering[Int])

for (i <- 0 until testEdges.length) {
assert(sortedEdges(i) == testEdges(testEdges.length - i - 1))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private[streaming] class BlockGenerator(
logError(message, t)
listener.onError(message, t)
}

private def pushBlock(block: Block) {
listener.onPushBlock(block.id, block.buffer)
logInfo("Pushed block " + block.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockI
private[streaming] class BlockManagerBasedBlockHandler(
blockManager: BlockManager, storageLevel: StorageLevel)
extends ReceivedBlockHandler with Logging {

def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
val putResult: Seq[(BlockId, BlockStatus)] = block match {
case ArrayBufferBlock(arrayBuffer) =>
Expand Down
Loading