Skip to content

Commit bd0856e

Browse files
author
Raazesh Sainudiin
committed
Ivan's extendedTwitterUtils for Spark Streaming of Twitter4j based streams with both user-id filter by follow and keyword filter by track
1 parent a36bdbc commit bd0856e

File tree

1 file changed

+260
-0
lines changed

1 file changed

+260
-0
lines changed
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
// Databricks notebook source exported at Wed, 5 Oct 2016 06:38:07 UTC
2+
// MAGIC %md
3+
// MAGIC # Extended twitter utils from Spark
4+
// MAGIC
5+
// MAGIC ### 2016, Ivan Sadikov and Raazesh Sainudiin
6+
// MAGIC
7+
// MAGIC We extend twitter utils from Spark to allow for filtering by user-ids using `.follow` and strings in the tweet using `.track` method of `twitter4j`.
8+
// MAGIC
9+
// MAGIC This is part of *Project MEP: Meme Evolution Programme* and supported by databricks academic partners program.
10+
// MAGIC
11+
// MAGIC The analysis is available in the following databricks notebook:
12+
// MAGIC * [http://lamastex.org/lmse/mep/src/extendedTwitterUtils.html](http://lamastex.org/lmse/mep/src/extendedTwitterUtil.html)
13+
// MAGIC
14+
// MAGIC
15+
// MAGIC ```
16+
// MAGIC Copyright 2016 Ivan Sadikov and Raazesh Sainudiin
17+
// MAGIC
18+
// MAGIC Licensed under the Apache License, Version 2.0 (the "License");
19+
// MAGIC you may not use this file except in compliance with the License.
20+
// MAGIC You may obtain a copy of the License at
21+
// MAGIC
22+
// MAGIC http://www.apache.org/licenses/LICENSE-2.0
23+
// MAGIC
24+
// MAGIC Unless required by applicable law or agreed to in writing, software
25+
// MAGIC distributed under the License is distributed on an "AS IS" BASIS,
26+
// MAGIC WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
27+
// MAGIC See the License for the specific language governing permissions and
28+
// MAGIC limitations under the License.
29+
// MAGIC ```
30+
31+
// COMMAND ----------
32+
33+
import twitter4j._
34+
import twitter4j.auth.Authorization
35+
import twitter4j.conf.ConfigurationBuilder
36+
import twitter4j.auth.OAuthAuthorization
37+
38+
import org.apache.spark.streaming._
39+
import org.apache.spark.streaming.dstream._
40+
import org.apache.spark.storage.StorageLevel
41+
import org.apache.spark.streaming.receiver.Receiver
42+
43+
// COMMAND ----------
44+
45+
// MAGIC %md
46+
// MAGIC ### Twitter receiver and stream
47+
48+
// COMMAND ----------
49+
50+
51+
class ExtendedTwitterReceiver(
52+
twitterAuth: Authorization,
53+
filters: Seq[String],
54+
userFilters: Seq[Long],
55+
storageLevel: StorageLevel
56+
) extends Receiver[Status](storageLevel) {
57+
58+
@volatile private var twitterStream: TwitterStream = _
59+
@volatile private var stopped = false
60+
61+
def onStart() {
62+
try {
63+
val newTwitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
64+
newTwitterStream.addListener(new StatusListener {
65+
def onStatus(status: Status): Unit = {
66+
store(status)
67+
}
68+
// Unimplemented
69+
def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
70+
def onTrackLimitationNotice(i: Int) {}
71+
def onScrubGeo(l: Long, l1: Long) {}
72+
def onStallWarning(stallWarning: StallWarning) {}
73+
def onException(e: Exception) {
74+
if (!stopped) {
75+
restart("Error receiving tweets", e)
76+
}
77+
}
78+
})
79+
80+
// do filtering only when filters are available
81+
if (filters.nonEmpty || userFilters.nonEmpty) {
82+
val query = new FilterQuery()
83+
if (filters.nonEmpty) {
84+
query.track(filters.mkString(","))
85+
}
86+
87+
if (userFilters.nonEmpty) {
88+
query.follow(userFilters: _*)
89+
}
90+
91+
newTwitterStream.filter(query)
92+
} else {
93+
newTwitterStream.sample()
94+
}
95+
setTwitterStream(newTwitterStream)
96+
println("Twitter receiver started")
97+
stopped = false
98+
} catch {
99+
case e: Exception => restart("Error starting Twitter stream", e)
100+
}
101+
}
102+
103+
def onStop() {
104+
stopped = true
105+
setTwitterStream(null)
106+
println("Twitter receiver stopped")
107+
}
108+
109+
private def setTwitterStream(newTwitterStream: TwitterStream) = synchronized {
110+
if (twitterStream != null) {
111+
twitterStream.shutdown()
112+
}
113+
twitterStream = newTwitterStream
114+
}
115+
}
116+
117+
// COMMAND ----------
118+
119+
class ExtendedTwitterInputDStream(
120+
ssc_ : StreamingContext,
121+
twitterAuth: Option[Authorization],
122+
filters: Seq[String],
123+
userFilters: Seq[Long],
124+
storageLevel: StorageLevel
125+
) extends ReceiverInputDStream[Status](ssc_) {
126+
127+
private def createOAuthAuthorization(): Authorization = {
128+
new OAuthAuthorization(new ConfigurationBuilder().build())
129+
}
130+
131+
private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
132+
133+
override def getReceiver(): Receiver[Status] = {
134+
new ExtendedTwitterReceiver(authorization, filters, userFilters, storageLevel)
135+
}
136+
}
137+
138+
// COMMAND ----------
139+
140+
// MAGIC %md
141+
// MAGIC ### Extended twitter utils
142+
143+
// COMMAND ----------
144+
145+
import twitter4j.Status
146+
import twitter4j.auth.Authorization
147+
import org.apache.spark.storage.StorageLevel
148+
import org.apache.spark.streaming.StreamingContext
149+
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
150+
151+
object ExtendedTwitterUtils {
152+
def createStream(
153+
ssc: StreamingContext,
154+
twitterAuth: Option[Authorization],
155+
filters: Seq[String] = Nil,
156+
userFilters: Seq[Long] = Nil,
157+
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
158+
): ReceiverInputDStream[Status] = {
159+
new ExtendedTwitterInputDStream(ssc, twitterAuth, filters, userFilters, storageLevel)
160+
}
161+
}
162+
163+
// COMMAND ----------
164+
165+
// MAGIC %md
166+
// MAGIC Raaz's twitter credentials
167+
168+
// COMMAND ----------
169+
170+
import org.apache.spark._
171+
import org.apache.spark.storage._
172+
import org.apache.spark.streaming._
173+
import org.apache.spark.streaming.twitter.TwitterUtils
174+
175+
import twitter4j.auth.OAuthAuthorization
176+
import twitter4j.conf.ConfigurationBuilder
177+
178+
import com.google.gson.Gson
179+
180+
// COMMAND ----------
181+
182+
//put your own twitter credentials
183+
val MyconsumerKey = "..."
184+
val MyconsumerSecret = "..."
185+
val Mytoken = "..."
186+
val MytokenSecret = "..."
187+
188+
System.setProperty("twitter4j.oauth.consumerKey", MyconsumerKey)
189+
System.setProperty("twitter4j.oauth.consumerSecret", MyconsumerSecret)
190+
System.setProperty("twitter4j.oauth.accessToken", Mytoken)
191+
System.setProperty("twitter4j.oauth.accessTokenSecret", MytokenSecret)
192+
193+
val outputDirectoryRoot = "dbfs:/datasets/MEP/AkinTweet/sampleTweets@raazozoneByuserId" // output directory
194+
val batchInterval = 1 // in minutes
195+
val timeoutJobLength = batchInterval * 5
196+
197+
// COMMAND ----------
198+
199+
// the Library has already been attached to this cluster (show live how to do this from scratch?)
200+
201+
var newContextCreated = false
202+
var numTweetsCollected = 0L // track number of tweets collected
203+
//val conf = new SparkConf().setAppName("TrackedTweetCollector").setMaster("local")
204+
// This is the function that creates the SteamingContext and sets up the Spark Streaming job.
205+
def streamFunc(): StreamingContext = {
206+
// Create a Spark Streaming Context.
207+
val ssc = new StreamingContext(sc, Minutes(batchInterval))
208+
// Create a Twitter Stream for the input source.
209+
val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
210+
211+
val track = List("@raazozone")//,"Trump2016", "#MakeAmericaGreatAgain", "Donald Trump","#lovetrumpshate")
212+
val follow = List(4173723312L)
213+
val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth, track, follow)
214+
val twitterStreamJson = twitterStream.map(x => { val gson = new Gson();
215+
val xJson = gson.toJson(x)
216+
xJson
217+
})
218+
219+
val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream.
220+
221+
twitterStreamJson.foreachRDD((rdd, time) => { // for each filtered RDD in the DStream
222+
val count = rdd.count()
223+
if (count > 0) {
224+
val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
225+
//outputRDD.saveAsTextFile(s"${outputDirectory}/tweets_" + time.milliseconds.toString) // save as textfile
226+
outputRDD.saveAsTextFile(outputDirectoryRoot + "/tweets_" + time.milliseconds.toString) // save as textfile in s3
227+
numTweetsCollected += count // update with the latest count
228+
}
229+
})
230+
newContextCreated = true
231+
ssc
232+
}
233+
234+
// COMMAND ----------
235+
236+
val ssc = StreamingContext.getActiveOrCreate(streamFunc)
237+
238+
// COMMAND ----------
239+
240+
ssc.start()
241+
//ssc.awaitTerminationOrTimeout(timeoutJobLength) // you only need one of these to start
242+
243+
// COMMAND ----------
244+
245+
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } // this will make sure all streaming job in the cluster are stopped - raaz
246+
247+
// COMMAND ----------
248+
249+
display(dbutils.fs.ls("dbfs:/datasets/MEP/AkinTweet/sampleTweets@raazozoneByuserId"))
250+
251+
// COMMAND ----------
252+
253+
val a = sqlContext.read.json("dbfs:/datasets/MEP/AkinTweet/sampleTweets@raazozoneByuserId/*")
254+
255+
// COMMAND ----------
256+
257+
display(a.select($"id".as("TweetId"),$"retweetedStatus.id".as("IdOfTweetBeingRT"),$"inReplyToUserId",$"inReplyToStatusId",$"user.name".as("StatusAuthor"),$"retweetedStatus.user.name".as("NameOfAuthorWhoseTweetIsRT"),$"retweetedStatus.userMentionEntities.name".as("ScreenNameQuotedInRT"),$"user.id".as("UserId"),$"retweetedStatus.user.id".as("UserIdOfAuthorWhoseTweetIsRT"),$"retweetedStatus.userMentionEntities.id".as("IdOfScreenNameQuotedInRT"),$"quotedStatusId",$"text".as("currentTweet"), $"quotedStatus.text", $"quotedStatus.id", $"quotedStatus.user.id"))
258+
259+
// COMMAND ----------
260+

0 commit comments

Comments
 (0)