Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@
<artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-eventhubs_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-testing-util</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.eventhubs.EventHubsUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}

/**
* Count messages from EventHubs partitions.
* Usage: EventCount <checkpointDirectory> <policy> <key> <namespace>
* <name> <partitionCount> <consumerGroup> <outputDirectory>
* <checkpointDirectory> is the DFS location to store context checkpoint
* <policyname> is the EventHubs policy name
* <policykey> is the EventHubs policy key
* <namespace> is the EventHubs namespace
* <name> is the name of the EventHubs
* <partitionCount> is the partition count of the EventHubs
* <consumerGroup> is the name of consumer group
* <outputDirectory> is the DFS location to store the output
*
* Example:
* `%SPARK_HOME%\bin\spark-submit.cmd
* --class org.apache.spark.streaming.eventhubs.example.EventCount
* --master spark://headnodehost:7077
* c:\sparktest\spark-streaming-eventhubs*.jar
* wasb://{container}@{account}.blob.core.windows.net/sparkcheckpoint
* root
* "pHUoYy8dvBHuXciNuD3NpVpDPL4mwycWhWQ9DVHASTM="
* myeh-ns
* myeh
* 4
* "$default"
* wasb://{container}@{account}.blob.core.windows.net/sparkoutput/ehcount`
*/
object EventCount {
def createContext(checkpointDir: String, ehParams: Map[String, String], outputDir: String) : StreamingContext = {
println("Creating new StreamingContext")

// Set max number of cores to double the partition count
val partitionCount = ehParams("eventhubs.partition.count").toInt
val sparkConf = new SparkConf().setAppName("EventCount").set("spark.cores.max",
(partitionCount*2).toString)

// Set batch size
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint(checkpointDir)

// Create a unioned stream for all partitions
val stream = EventHubsUtils.createUnionStream(ssc, ehParams)

// Create a single stream for one partition
// val stream = EventHubsUtils.createStream(ssc, ehParams, "0")

// Set checkpoint interval
stream.checkpoint(Seconds(10))

// Count number of events in the past minute
// val counts = stream.countByWindow(Minutes(1), Seconds(5))

// Count number of events in the past batch
val counts = stream.count()

counts.saveAsTextFiles(outputDir)
counts.print()

ssc
}

def main(args: Array[String]) {
if (args.length < 8) {
System.err.println("Usage: EventCount <checkpointDirectory> <policyname> <policykey>"
+ "<namespace> <name> <partitionCount> <consumerGroup> <outputDirectory>")
System.exit(1)
}
val Array(checkpointDir, policy, key, namespace, name,
partitionCount, consumerGroup, outputDir) = args
val ehParams = Map[String, String](
"eventhubs.policyname" -> policy,
"eventhubs.policykey" -> key,
"eventhubs.namespace" -> namespace,
"eventhubs.name" -> name,
"eventhubs.partition.count" -> partitionCount,
"eventhubs.consumergroup" -> consumerGroup,
"eventhubs.checkpoint.dir" -> checkpointDir,
"eventhubs.checkpoint.interval" -> "10"
)

// Get StreamingContext from checkpoint directory, or create a new one
val ssc = StreamingContext.getOrCreate(checkpointDir,
() => {
createContext(checkpointDir, ehParams, outputDir)
})

ssc.start()
ssc.awaitTermination()
}


}
98 changes: 98 additions & 0 deletions external/eventhubs/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-eventhubs_2.10</artifactId>
<properties>
<sbt.project.name>spark-streaming-eventhubs</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project External EventHubs</name>
<url>http://spark.apache.org/</url>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.microsoft.eventhubs.client</groupId>
<artifactId>eventhubs-client</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
<version>4.11</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
<version>1.9.5</version>
</dependency>
<dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
<scope>test</scope>
<version>0.10</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.eventhubs

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration

/**
* An DFS based OffsetStore implementation
*/
@SerialVersionUID(1L)
class DfsBasedOffsetStore(
directory: String,
namespace: String,
name: String,
partition: String) extends OffsetStore {

var path: Path = null
var fs: FileSystem = null

override def open(): Unit = {
if(fs == null) {
path = new Path(directory + "/" + namespace + "/" + name + "/" + partition)
fs = path.getFileSystem(new Configuration())
}
}

override def write(offset: String): Unit = {
val stream = fs.create(path, true)
stream.writeUTF(offset)
stream.close()
}

override def read(): String = {
var offset:String = "-1"
if(fs.exists(path)) {
val stream = fs.open(path)
offset = stream.readUTF()
stream.close()
}
offset
}

override def close(): Unit = {
if(fs != null) {
fs.close()
fs = null
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.eventhubs

import com.microsoft.eventhubs.client._

import scala.collection.Map

/**
* Wraps a raw EventHubReceiver to make it easier for unit tests
*/
@SerialVersionUID(1L)
class EventHubsClientWrapper extends Serializable {
var receiver: ResilientEventHubReceiver = null

def createReceiver(eventhubsParams: Map[String, String],
partitionId: String,
offsetStore: OffsetStore): Unit = {
// Read previously stored offset if exist
var filter: IEventHubFilter = null
val offset = offsetStore.read()
if(offset != "-1" && offset != null) {
filter = new EventHubOffsetFilter(offset)
}
else if (eventhubsParams.contains("eventhubs.filter.offset")) {
filter = new EventHubOffsetFilter(eventhubsParams("eventhubs.filter.offset"))
}
else if (eventhubsParams.contains("eventhubs.filter.enqueuetime")) {
filter = new EventHubEnqueueTimeFilter(
eventhubsParams("eventhubs.filter.enqueuetime").toLong)
}

// Create EventHubs connection string
val connectionString = new ConnectionStringBuilder(
eventhubsParams("eventhubs.policyname"),
eventhubsParams("eventhubs.policykey"),
eventhubsParams("eventhubs.namespace")
).getConnectionString

// Set consumer group name if provided by user
var consumerGroup: String = null
if(eventhubsParams.contains("eventhubs.consumergroup")) {
consumerGroup = eventhubsParams("eventhubs.consumergroup")
}
val name = eventhubsParams("eventhubs.name")

createReceiverProxy(connectionString, name, partitionId, consumerGroup, -1, filter)
}

private[eventhubs]
def createReceiverProxy(connectionString: String,
name: String,
partitionId: String,
consumerGroup: String,
defaultCredits: Int,
filter: IEventHubFilter): Unit = {
receiver = new ResilientEventHubReceiver(
connectionString, name, partitionId, consumerGroup, -1, filter)
receiver.initialize()
}

def receive(): EventHubMessage = {
EventHubMessage.parseAmqpMessage(receiver.receive(5000))
}

def close(): Unit = {
if(receiver != null) {
receiver.close()
}
}
}
Loading