Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
51ca7bd
Improve building with maven docs
Mar 6, 2014
cda381f
SPARK-1184: Update the distribution tar.gz to include spark-assembly jar
markgrover Mar 6, 2014
3eb009f
SPARK-1156: allow user to login into a cluster without slaves
CodingCat Mar 6, 2014
3d3acef
SPARK-1187, Added missing Python APIs
Mar 6, 2014
40566e1
SPARK-942: Do not materialize partitions when DISK_ONLY storage level…
kellrott Mar 6, 2014
7edbea4
SPARK-1189: Add Security to Spark - Akka, Http, ConnectionManager, UI…
tgravescs Mar 7, 2014
328c73d
SPARK-1197. Change yarn-standalone to yarn-cluster and fix up running…
sryza Mar 7, 2014
9ae919c
Example for cassandra CQL read/write from spark
anitatailor Mar 7, 2014
33baf14
Small clean-up to flatmap tests
pwendell Mar 7, 2014
dabeb6f
SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1
aarondav Mar 7, 2014
b7cd9e9
SPARK-1195: set map_input_file environment variable in PipedRDD
tgravescs Mar 7, 2014
6e730ed
Spark 1165 rdd.intersection in python and java
ScrapCodes Mar 8, 2014
a99fb37
SPARK-1193. Fix indentation in pom.xmls
sryza Mar 8, 2014
8ad486a
Allow sbt to use more than 1G of heap.
rxin Mar 8, 2014
0b7b7fd
[SPARK-1194] Fix the same-RDD rule for cache replacement
liancheng Mar 8, 2014
c2834ec
Update junitxml plugin to the latest version to avoid recompilation i…
rxin Mar 8, 2014
e59a3b6
SPARK-1190: Do not initialize log4j if slf4j log4j backend is not bei…
pwendell Mar 9, 2014
52834d7
SPARK-929: Fully deprecate usage of SPARK_MEM
aarondav Mar 9, 2014
f6f9d02
Add timeout for fetch file
guojc Mar 9, 2014
faf4cad
Fix markup errors introduced in #33 (SPARK-1189)
pwendell Mar 9, 2014
b9be160
SPARK-782 Clean up for ASM dependency.
pwendell Mar 9, 2014
5d98cfc
maintain arbitrary state data for each key
CrazyJvm Mar 10, 2014
32ad348
[SPARK-1186] : Enrich the Spark Shell to support additional arguments.
berngp Mar 10, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Example for cassandra CQL read/write from spark
Cassandra read/write using CqlPagingInputFormat/CqlOutputFormat

Author: anitatailor <[email protected]>

Closes #87 from anitatailor/master and squashes the following commits:

3493f81 [anitatailor] Fixed scala style as per review
19480b7 [anitatailor] Example for cassandra CQL read/write from spark
  • Loading branch information
anitatailor authored and pwendell committed Mar 7, 2014
commit 9ae919c02f7b7d069215e8dc6cafef0ec79c9d5f
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples

import java.nio.ByteBuffer
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
import scala.collection.immutable.Map
import org.apache.cassandra.hadoop.ConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
import org.apache.cassandra.utils.ByteBufferUtil
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

/*
Need to create following keyspace and column family in cassandra before running this example
Start CQL shell using ./bin/cqlsh and execute following commands
CREATE KEYSPACE retail WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
use retail;
CREATE TABLE salecount (prod_id text, sale_count int, PRIMARY KEY (prod_id));
CREATE TABLE ordercf (user_id text,
time timestamp,
prod_id text,
quantity int,
PRIMARY KEY (user_id, time));
INSERT INTO ordercf (user_id,
time,
prod_id,
quantity) VALUES ('bob', 1385983646000, 'iphone', 1);
INSERT INTO ordercf (user_id,
time,
prod_id,
quantity) VALUES ('tom', 1385983647000, 'samsung', 4);
INSERT INTO ordercf (user_id,
time,
prod_id,
quantity) VALUES ('dora', 1385983648000, 'nokia', 2);
INSERT INTO ordercf (user_id,
time,
prod_id,
quantity) VALUES ('charlie', 1385983649000, 'iphone', 2);
*/

/**
* This example demonstrates how to read and write to cassandra column family created using CQL3
* using Spark.
* Parameters : <spark_master> <cassandra_node> <cassandra_port>
* Usage: ./bin/run-example org.apache.spark.examples.CassandraCQLTest local[2] localhost 9160
*
*/
object CassandraCQLTest {

def main(args: Array[String]) {
val sc = new SparkContext(args(0),
"CQLTestApp",
System.getenv("SPARK_HOME"),
SparkContext.jarOfClass(this.getClass))
val cHost: String = args(1)
val cPort: String = args(2)
val KeySpace = "retail"
val InputColumnFamily = "ordercf"
val OutputColumnFamily = "salecount"

val job = new Job()
job.setInputFormatClass(classOf[CqlPagingInputFormat])
ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost)
ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort)
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily)
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3")

/** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'") */

/** An UPDATE writes one or more columns to a record in a Cassandra column family */
val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? "
CqlConfigHelper.setOutputCql(job.getConfiguration(), query)

job.setOutputFormatClass(classOf[CqlOutputFormat])
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily)
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost)
ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort)
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")

val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
classOf[CqlPagingInputFormat],
classOf[java.util.Map[String,ByteBuffer]],
classOf[java.util.Map[String,ByteBuffer]])

println("Count: " + casRdd.count)
val productSaleRDD = casRdd.map {
case (key, value) => {
(ByteBufferUtil.string(value.get("prod_id")), ByteBufferUtil.toInt(value.get("quantity")))
}
}
val aggregatedRDD = productSaleRDD.reduceByKey(_ + _)
aggregatedRDD.collect().foreach {
case (productId, saleCount) => println(productId + ":" + saleCount)
}

val casoutputCF = aggregatedRDD.map {
case (productId, saleCount) => {
val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId))
val outKey: java.util.Map[String, ByteBuffer] = outColFamKey
var outColFamVal = new ListBuffer[ByteBuffer]
outColFamVal += ByteBufferUtil.bytes(saleCount)
val outVal: java.util.List[ByteBuffer] = outColFamVal
(outKey, outVal)
}
}

casoutputCF.saveAsNewAPIHadoopFile(
KeySpace,
classOf[java.util.Map[String, ByteBuffer]],
classOf[java.util.List[ByteBuffer]],
classOf[CqlOutputFormat],
job.getConfiguration()
)
}
}