|
1 | | -// Databricks notebook source exported at Mon, 12 Dec 2016 15:08:42 UTC |
| 1 | +// Databricks notebook source exported at Mon, 12 Dec 2016 15:10:36 UTC |
2 | 2 | // MAGIC %md |
3 | 3 | // MAGIC # Tweet Anatomy & Transmission Tree |
4 | 4 | // MAGIC |
@@ -499,253 +499,7 @@ display(TTTDF.select($"CurrentTweetDate",$"CurrentTwID",$"CurrentTweet",$"TweetT |
499 | 499 |
|
500 | 500 | // COMMAND ---------- |
501 | 501 |
|
502 | | -// MAGIC %md |
503 | | -// MAGIC ## Tweet Transmission Tree Implementation: |
504 | | -// MAGIC #### Dataset: The United States 2016 3rd Presidential Debate Streamed Tweets |
505 | | - |
506 | | -// COMMAND ---------- |
507 | | - |
508 | | -// MAGIC %md |
509 | | -// MAGIC ** Step 1: To retrieve the dataset from AWS S3, the authors credentials are verified for mounting.** |
510 | | -// MAGIC |
511 | | -// MAGIC Note: This step can be skipped if your tweet data is in another directory other than AWS S3 such as dbfs or hdfs or databricks local driver. |
512 | | -// MAGIC |
513 | | -// MAGIC The S3 credentials can also be called and runned from another notebook (for example, if the notebook is named "AWS_Credentials") |
514 | | - |
515 | | -// COMMAND ---------- |
516 | | - |
517 | | -// MAGIC %md |
518 | | -// MAGIC ##### View the directory content |
519 | | - |
520 | | -// COMMAND ---------- |
521 | | - |
522 | | -display(StreamedTweetsPQ.take(5)) |
523 | | - |
524 | | -// COMMAND ---------- |
525 | | - |
526 | | -val StreamedTweetsDF = sqlContext.read.json(StreamedTweetsPQ.map({case Row(val1: String) => val1})) |
527 | | - |
528 | | -// COMMAND ---------- |
529 | | - |
530 | | -StreamedTweetsDF.take(5) |
531 | | - |
532 | | -// COMMAND ---------- |
533 | | - |
534 | | -// display(StreamedTweetsDF) |
535 | | - |
536 | | -// COMMAND ---------- |
537 | | - |
538 | | -// MAGIC %md |
539 | | -// MAGIC **Step 3: Print the Schema of the JSON file to view the structure of the streamed tweets** |
540 | | - |
541 | | -// COMMAND ---------- |
542 | | - |
543 | | -StreamedTweetsDF.printSchema() |
544 | | - |
545 | | -// COMMAND ---------- |
546 | | - |
547 | | -// MAGIC %md |
548 | | -// MAGIC **Step 4: Select and Create additional Columns for Tweets Classification** |
549 | | -// MAGIC |
550 | | -// MAGIC Alias is created for all the selected columns for simplicity |
551 | | - |
552 | | -// COMMAND ---------- |
553 | | - |
554 | | -val StreamedTweetsClassDF = StreamedTweetsDF.select( |
555 | | - unix_timestamp($"createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CurrentTweetDate"), //The datae of the current tweet in Unix Time Stamp format |
556 | | - $"id".as("CurrentTwID"), //Unique ID of the current tweet |
557 | | - unix_timestamp($"retweetedStatus.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CreationDateOfOrgTwInRT"), //The time the original tweet being retweeted was created |
558 | | - $"retweetedStatus.id".as("OriginalTwIDinRT"), //Unique ID of the tweet being retweeted |
559 | | - unix_timestamp($"quotedStatus.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CreationDateOfOrgTwInQT"), //The time the original tweet being quoted was created |
560 | | - $"quotedStatus.id".as("OriginalTwIDinQT"), //Unique ID of the tweet being quoted |
561 | | - $"inReplyToStatusId".as("OriginalTwIDinReply"), //Unique ID of the tweet being replied to |
562 | | - $"user.id".as("CPostUserId"), //Unique ID of the user who posted the last status |
563 | | - unix_timestamp($"user.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("userCreatedAtDate"), //The date the user created his/her account |
564 | | - $"retweetedStatus.user.id".as("OPostUserIdinRT"), //Unique ID of the user whose tweet was retweeted |
565 | | - $"quotedStatus.user.id".as("OPostUserIdinQT"), //Unique ID of the user whose tweet was quoted |
566 | | - $"inReplyToUserId".as("OPostUserIdinReply"), //Unique ID of the user whose tweet was replied to |
567 | | - $"user.name".as("CPostUserName"), //The full name of the user who posted the last status |
568 | | - $"retweetedStatus.user.name".as("OPostUserNameinRT"), //The full name of the user whose tweet was retweeted |
569 | | - $"quotedStatus.user.name".as("OPostUserNameinQT"), //The full name of the user whose tweet was quoted |
570 | | - $"user.screenName".as("CPostUserSN"), //The screen name of the user who posted the last status |
571 | | - $"retweetedStatus.user.screenName".as("OPostUserSNinRT"), //The screen name of the user whose tweet was retweeted |
572 | | - $"quotedStatus.user.screenName".as("OPostUserSNinQT"), //The screen name of the user whose tweet was quoted |
573 | | - $"inReplyToScreenName".as("OPostUserSNinReply"), //The screen name of the user whose tweet was replied to |
574 | | - $"user.favouritesCount", // Number of likes by the user of last status |
575 | | - $"user.followersCount", // Number of people the user of last status is following |
576 | | - $"user.friendsCount", // Number of friends or other users following the user of last status |
577 | | - $"user.isVerified", // True if the user of the last status account is enabled |
578 | | - $"user.isGeoEnabled", // True if the user of the last status Geographical location is enabled |
579 | | - $"text".as("CurrentTweet"), // Tweet or status posted by the user |
580 | | - $"retweetedStatus.userMentionEntities.id".as("UMentionRTiD"), // Unique IDs' of User(s) mentioned in a retweet status |
581 | | - $"retweetedStatus.userMentionEntities.screenName".as("UMentionRTsN"), // Screen names of User(s) mentioned in a retweet status |
582 | | - $"quotedStatus.userMentionEntities.id".as("UMentionQTiD"), // Unique IDs' of User(s) mentioned in a quoted status |
583 | | - $"quotedStatus.userMentionEntities.screenName".as("UMentionQTsN"), // Screen names of User(s) mentioned in a quoted status |
584 | | - $"userMentionEntities.id".as("UMentionASiD"), // Unique IDs' of User(s) mentioned in all statuses |
585 | | - $"userMentionEntities.screenName".as("UMentionASsN") // Screen names of User(s) mentioned in all statuses |
586 | | - ).withColumn("TweetType", // The conditional statements used to classify the streamed tweets into distinct types |
587 | | - when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" === -1, |
588 | | - "Original Tweet") |
589 | | - .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" > -1, |
590 | | - "Reply Tweet") |
591 | | - .when($"OriginalTwIDinRT".isNotNull &&$"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" === -1, |
592 | | - "ReTweet") |
593 | | - .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" === -1, |
594 | | - "Quoted Tweet") |
595 | | - .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" === -1, |
596 | | - "Retweet of Quoted Tweet") |
597 | | - .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" > -1, |
598 | | - "Retweet of Reply Tweet") |
599 | | - .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" > -1, |
600 | | - "Reply of Quoted Tweet") |
601 | | - .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" > -1, |
602 | | - "Retweet of Quoted Rely Tweet") |
603 | | - .otherwise("Unclassified")) |
604 | | -.withColumn("MentionType", // The conditional statements used to classify the streamed tweets into unique type of mentions |
605 | | - when($"UMentionRTid".isNotNull && $"UMentionQTid".isNotNull, "RetweetAndQuotedMention") |
606 | | - .when($"UMentionRTid".isNotNull && $"UMentionQTid".isNull, "RetweetMention") |
607 | | - .when($"UMentionRTid".isNull && $"UMentionQTid".isNotNull, "QuotedMention") |
608 | | - .when($"UMentionRTid".isNull && $"UMentionQTid".isNull, "AuthoredMention") |
609 | | - .otherwise("NoMention")) |
610 | | -.withColumn("Weight", lit(1L)) |
611 | | - |
612 | | -// COMMAND ---------- |
613 | | - |
614 | | -StreamedTweetsClassDF.printSchema() |
615 | | - |
616 | | -// COMMAND ---------- |
617 | | - |
618 | | -// MAGIC %md |
619 | | -// MAGIC Count the number of tweets in the TTT Table |
620 | | - |
621 | | -// COMMAND ---------- |
622 | | - |
623 | | -StreamedTweetsClassDF.count() |
624 | | - |
625 | | -// COMMAND ---------- |
626 | | - |
627 | | -// MAGIC %md |
628 | | -// MAGIC ### TTT Function |
629 | | - |
630 | | -// COMMAND ---------- |
631 | | - |
632 | | -// MAGIC %md |
633 | | -// MAGIC ** Step 2-4: For elegant and simplified use, set of functions are created for the TTT. ** |
634 | | -// MAGIC |
635 | | -// MAGIC Step 2 to 4 can be merged and implemented as set of functions. The same commands can be saved in a notebook and called from the User's databricks work space |
636 | | - |
637 | | -// COMMAND ---------- |
638 | | - |
639 | | -//% run "Users/[email protected]/src2run/TTTDFfunctions" |
640 | | - |
641 | | -import org.apache.spark.sql.types.{StructType, StructField, StringType}; |
642 | | -import org.apache.spark.sql.functions._ |
643 | | -import org.apache.spark.sql.types._ |
644 | | -import org.apache.spark.sql.ColumnName |
645 | | -import org.apache.spark.sql.DataFrame |
646 | | - |
647 | | -def fromParquetFile2DF(InputDFAsParquetFilePatternString: String): DataFrame = { |
648 | | - sqlContext. |
649 | | - read.parquet(InputDFAsParquetFilePatternString) |
650 | | -} |
651 | | - |
652 | | -def tweetsJsonStringDF2TweetsDF(tweetsAsJsonStringInputDF: DataFrame): DataFrame = { |
653 | | - sqlContext |
654 | | - .read |
655 | | - .json(tweetsAsJsonStringInputDF.map({case Row(val1: String) => val1})) |
656 | | - } |
657 | | - |
658 | | - |
659 | | -def tweetsDF2TTTDF(tweetsInputDF: DataFrame): DataFrame = { |
660 | | - tweetsInputDF.select( |
661 | | - unix_timestamp($"createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CurrentTweetDate"), |
662 | | - $"id".as("CurrentTwID"), |
663 | | - unix_timestamp($"retweetedStatus.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CreationDateOfOrgTwInRT"), |
664 | | - $"retweetedStatus.id".as("OriginalTwIDinRT"), |
665 | | - unix_timestamp($"quotedStatus.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CreationDateOfOrgTwInQT"), |
666 | | - $"quotedStatus.id".as("OriginalTwIDinQT"), |
667 | | - $"inReplyToStatusId".as("OriginalTwIDinReply"), |
668 | | - $"user.id".as("CPostUserId"), |
669 | | - unix_timestamp($"user.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("userCreatedAtDate"), |
670 | | - $"retweetedStatus.user.id".as("OPostUserIdinRT"), |
671 | | - $"quotedStatus.user.id".as("OPostUserIdinQT"), |
672 | | - $"inReplyToUserId".as("OPostUserIdinReply"), |
673 | | - $"user.name".as("CPostUserName"), |
674 | | - $"retweetedStatus.user.name".as("OPostUserNameinRT"), |
675 | | - $"quotedStatus.user.name".as("OPostUserNameinQT"), |
676 | | - $"user.screenName".as("CPostUserSN"), |
677 | | - $"retweetedStatus.user.screenName".as("OPostUserSNinRT"), |
678 | | - $"quotedStatus.user.screenName".as("OPostUserSNinQT"), |
679 | | - $"inReplyToScreenName".as("OPostUserSNinReply"), |
680 | | - $"user.favouritesCount", |
681 | | - $"user.followersCount", |
682 | | - $"user.friendsCount", |
683 | | - $"user.isVerified", |
684 | | - $"user.isGeoEnabled", |
685 | | - $"text".as("CurrentTweet"), |
686 | | - $"retweetedStatus.userMentionEntities.id".as("UMentionRTiD"), |
687 | | - $"retweetedStatus.userMentionEntities.screenName".as("UMentionRTsN"), |
688 | | - $"quotedStatus.userMentionEntities.id".as("UMentionQTiD"), |
689 | | - $"quotedStatus.userMentionEntities.screenName".as("UMentionQTsN"), |
690 | | - $"userMentionEntities.id".as("UMentionASiD"), |
691 | | - $"userMentionEntities.screenName".as("UMentionASsN") |
692 | | - ).withColumn("TweetType", |
693 | | - when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" === -1, |
694 | | - "Original Tweet") |
695 | | - .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" > -1, |
696 | | - "Reply Tweet") |
697 | | - .when($"OriginalTwIDinRT".isNotNull &&$"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" === -1, |
698 | | - "ReTweet") |
699 | | - .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" === -1, |
700 | | - "Quoted Tweet") |
701 | | - .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" === -1, |
702 | | - "Retweet of Quoted Tweet") |
703 | | - .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" > -1, |
704 | | - "Retweet of Reply Tweet") |
705 | | - .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" > -1, |
706 | | - "Reply of Quoted Tweet") |
707 | | - .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" > -1, |
708 | | - "Retweet of Quoted Rely Tweet") |
709 | | - .otherwise("Unclassified")) |
710 | | -.withColumn("MentionType", |
711 | | - when($"UMentionRTid".isNotNull && $"UMentionQTid".isNotNull, "RetweetAndQuotedMention") |
712 | | - .when($"UMentionRTid".isNotNull && $"UMentionQTid".isNull, "RetweetMention") |
713 | | - .when($"UMentionRTid".isNull && $"UMentionQTid".isNotNull, "QuotedMention") |
714 | | - .when($"UMentionRTid".isNull && $"UMentionQTid".isNull, "AuthoredMention") |
715 | | - .otherwise("NoMention")) |
716 | | -.withColumn("Weight", lit(1L)) |
717 | | -} |
718 | | - |
719 | | -println("""USAGE: val df = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF("parquetFileName"))) |
720 | | -
|
721 | | - """) |
722 | | - |
723 | | -// COMMAND ---------- |
724 | | - |
725 | | -// MAGIC %md |
726 | | -// MAGIC The created functions can be used to read the streamed tweets |
727 | | - |
728 | | -// COMMAND ---------- |
729 | | - |
730 | | -val StreamedTweetsfDF = tweetsJsonStringDF2TweetsDF(fromParquetFile2DF(s"/mnt/s3ReadOnly/datasets/MEP/AllGroupsStreaming/2016/10/19/*/*/*")).cache() |
731 | | - |
732 | | -// COMMAND ---------- |
733 | | - |
734 | | -// MAGIC %md |
735 | | -// MAGIC Print the Schema of the JSON file to view the structure of the streamed tweets |
736 | | - |
737 | | -// COMMAND ---------- |
738 | | - |
739 | | -StreamedTweetsfDF.printSchema() |
740 | | - |
741 | | -// COMMAND ---------- |
742 | | - |
743 | | -// MAGIC %md |
744 | | -// MAGIC TTT table can be created from the ingested dataset by calling the function: |
745 | | - |
746 | | -// COMMAND ---------- |
747 | 502 |
|
748 | | -val StreamedTweetsfClassDF = tweetsDF2TTTDF(StreamedTweetsDF).cache() |
749 | 503 |
|
750 | 504 | // COMMAND ---------- |
751 | 505 |
|
|
0 commit comments