Skip to content
This repository was archived by the owner on Jan 20, 2022. It is now read-only.
Prev Previous commit
Next Next commit
Add tests for named options for scalding platform.
  • Loading branch information
pankajroark committed Jul 2, 2016
commit 5ab3053f02ca800d5bb46f4f22458e0089f836ef
Original file line number Diff line number Diff line change
Expand Up @@ -16,62 +16,138 @@

package com.twitter.summingbird.scalding

import com.twitter.algebird.{ MapAlgebra, Monoid, Group, Interval, Last }
import com.twitter.algebird.monad._
import com.twitter.summingbird.{ Producer, TimeExtractor, TestGraphs }
import com.twitter.summingbird.batch._
import com.twitter.summingbird.batch.state.HDFSState
import com.twitter.summingbird.option.JobId
import com.twitter.summingbird.SummingbirdRuntimeStats

import java.util.TimeZone
import java.io.File

import com.twitter.scalding.{ Source => ScaldingSource, Test => TestMode, _ }
import com.twitter.scalding.typed.TypedSink

import org.scalacheck._
import org.scalacheck.Prop._
import org.scalacheck.Properties

import org.apache.hadoop.conf.Configuration

import scala.collection.JavaConverters._
import scala.collection.mutable.{ ArrayBuffer, Buffer, HashMap => MutableHashMap, Map => MutableMap, SynchronizedBuffer, SynchronizedMap }
import scala.util.{ Try => ScalaTry }

import cascading.scheme.local.{ TextDelimited => CLTextDelimited }
import cascading.tuple.{ Tuple, Fields, TupleEntry }
import cascading.flow.Flow
import cascading.stats.FlowStats
import cascading.tap.Tap
import cascading.scheme.NullScheme
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.OutputCollector

import cascading.flow.FlowDef
import cascading.pipe.Pipe
import cascading.property.ConfigDef
import cascading.property.ConfigDef.Setter
import cascading.tuple.Fields
import com.twitter.scalding.{Test => TestMode, _}
import com.twitter.summingbird._
import com.twitter.summingbird.batch.option.Reducers
import org.scalatest.WordSpec

/**
* Tests for Summingbird's Scalding planner.
* Tests for application of named options.
*/

class NamedOptionsSpec extends WordSpec {
import MapAlgebra.sparseEquiv

implicit def timeExtractor[T <: (Long, _)] = TestUtil.simpleTimeExtractor[T]
private val ReducerKey = "mapred.reduce.tasks"
private val FlatMapNodeName1 = "FM1"
private val FlatMapNodeName2 = "FM2"
private val SummerNodeName1 = "SM1"
private val SummerNodeName2 = "SM2"

private val IdentitySink = new Sink[Int] {
override def write(incoming: PipeFactory[Int]): PipeFactory[Int] = incoming
}

implicit def timeExtractor[T <: (Int, _)] =
new TimeExtractor[T] {
override def apply(t: T) = t._1.toLong
}

def sample[T: Arbitrary]: T = Arbitrary.arbitrary[T].sample.get
def pipeConfig(pipe: Pipe): Map[String, String] = {
val configCollector = new Setter {
var config = Map.empty[String, String]
override def set(key: String, value: String): String = { config += key -> value; "" }
override def get(key: String): String = ???
override def update(key: String, value: String): String = ???
}

def recurse(p: Pipe): Unit = {
val cfg = p.getStepConfigDef
if (!cfg.isEmpty) {
cfg.apply(ConfigDef.Mode.REPLACE, configCollector)
}
p.getPrevious.foreach(recurse(_))
}

recurse(pipe)
configCollector.config
}

def verify[T](

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is T?

options: Map[String, Options],
expectedReducers: Int)(
jobGen: (Producer[Scalding, (Int, Int)], scalding.Store[Int, Int]) => TailProducer[Scalding, Any]) = {

val src = Scalding.sourceFromMappable { dr => IterableSource(List.empty[(Int, Int)]) }
val store = TestStore[Int, Int]("store", TestUtil.simpleBatcher, Map.empty[Int, Int], Long.MaxValue)
val job = jobGen(src, store)
val interval = TestUtil.toTimeInterval(1L, Long.MaxValue)

val scaldingPlatform = Scalding("named options test", options)
val mode: Mode = TestMode(t => (store.sourceToBuffer).get(t))

val flowToPipe = scaldingPlatform
.plan(job)
.apply((interval, mode))
.right
.get
._2

val fd = new FlowDef
val typedPipe = flowToPipe.apply((fd, mode))
def tupleSetter[T] = new TupleSetter[T] {
override def apply(arg: T) = {
val tup = cascading.tuple.Tuple.size(1)
tup.set(0, arg)
tup
}
override def arity = 1
}
val pipe = typedPipe.toPipe(new Fields("0"))(fd, mode, tupleSetter)
println(pipeConfig(pipe))
val numReducers = pipeConfig(pipe)(ReducerKey).toInt
assert(numReducers === expectedReducers)
}

"The ScaldingPlatform" should {
"named option should apply for the correct node even if same option defined for previous node" in {
val batchCoveredInput = TestUtil.pruneToBatchCoveredWithTime(inWithTime1, intr, batcher)
val fnAWithTime = toTime(fnA)
val storeAndService = TestStoreService[Int, Int](storeAndServiceStore)
val summer: Summer[P, K, JoinedU] = batchCoveredInput
.flatMap(fnAWithTime).name("fmNode")
.sumByKey(storeAndService).name("smNode")
"use named option for the correct node even if same option defined for previous node" in {
val fmReducers = 50
val smReducers = 100

val options = Map(
FlatMapNodeName1 -> Options().set(Reducers(fmReducers)),
SummerNodeName1 -> Options().set(Reducers(smReducers)))

verify(options, smReducers) { (source, store) =>
source
.flatMap(Some(_)).name(FlatMapNodeName1)
.sumByKey(store).name(SummerNodeName1)
}
}

"use named option from the closest node when two names defined one after the other" in {
val smReducers1 = 50
val smReducers2 = 100

val options = Map(
SummerNodeName1 -> Options().set(Reducers(smReducers1)),
SummerNodeName2 -> Options().set(Reducers(smReducers2)))

verify(options, smReducers1) { (source, store) =>
source
.flatMap(Some(_))
.sumByKey(store).name(SummerNodeName1).name(SummerNodeName2)
}
}

"use named option from the closest upstream node if option not defined on current node" in {
val fmReducers1 = 50
val fmReducers2 = 100

val options = Map(
FlatMapNodeName1 -> Options().set(Reducers(fmReducers1)),
FlatMapNodeName1 -> Options().set(Reducers(fmReducers2)))

verify(options, fmReducers2) { (source, store) =>
source
.flatMap(Some(_)).name(FlatMapNodeName1)
.sumByKey(store).name(SummerNodeName1)
.map { case (k, (optV, v)) => k }.name(FlatMapNodeName2)
.write(IdentitySink)
}
}
}
}