@@ -30,7 +30,7 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
3030 }
3131
3232 def map [KR , VR ](mapper : (K , V ) => (KR , VR )): KStreamS [KR , VR ] = {
33- val kvMapper = mapper.tupled andThen Tuple2ToKeyValue
33+ val kvMapper = mapper.tupled andThen tuple2ToKeyValue
3434 inner.map[KR , VR ]((k, v) => kvMapper(k,v))
3535 }
3636
@@ -39,7 +39,7 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
3939 }
4040
4141 def flatMap [KR , VR ](mapper : (K , V ) => Iterable [(KR , VR )]): KStreamS [KR , VR ] = {
42- val kvMapper = mapper.tupled andThen (iter => iter.map(Tuple2ToKeyValue ).asJava)
42+ val kvMapper = mapper.tupled andThen (iter => iter.map(tuple2ToKeyValue ).asJava)
4343 inner.flatMap[KR , VR ]((k,v) => kvMapper(k , v))
4444 }
4545
@@ -57,12 +57,13 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
5757 inner.branch(predicates.map(_.asPredicate): _* ).map(kstream => wrapKStream(kstream))
5858 }
5959
60- def through (topic : String )(implicit produced : Perhaps [Produced [K , V ]]): KStreamS [K , V ] =
60+ def through (topic : String )(implicit produced : Perhaps [Produced [K , V ]]): KStreamS [K , V ] =
6161 produced.fold[KStreamS [K , V ]] { inner.through(topic) } { ev => inner.through(topic, ev) }
6262
6363 def to (topic : String )(implicit produced : Perhaps [Produced [K , V ]]): Unit =
6464 produced.fold[Unit ] { inner.to(topic) } { implicit ev => inner.to(topic, ev) }
6565
66+ // scalastyle:off null
6667 def transform [K1 , V1 ](transformerSupplier : () => Transformer [K , V , (K1 , V1 )],
6768 stateStoreNames : String * ): KStreamS [K1 , V1 ] = {
6869
@@ -78,7 +79,7 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
7879
7980 override def init (context : ProcessorContext ): Unit = transformerS.init(context)
8081
81- @ deprecated (" Please use Punctuator functional interface at https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/processor/Punctuator.html instead" , " 0.1.3" )
82+ @ deprecated (" Please use Punctuator functional interface at https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/processor/Punctuator.html instead" , " 0.1.3" ) // scalastyle:ignore
8283 override def punctuate (timestamp : Long ): KeyValue [K1 , V1 ] = {
8384 transformerS.punctuate(timestamp) match {
8485 case (k1, v1) => KeyValue .pair[K1 , V1 ](k1, v1)
@@ -91,6 +92,7 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
9192 }
9293 inner.transform(transformerSupplierJ, stateStoreNames : _* )
9394 }
95+ // scalastyle:on null
9496
9597 def transformValues [VR ](valueTransformerSupplier : () => ValueTransformer [V , VR ],
9698 stateStoreNames : String * ): KStreamS [K , VR ] = {
@@ -121,34 +123,34 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
121123 * implicit val stringSerde: Serde[String] = Serdes.String()
122124 * implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
123125 * - .groupByKey
124- */
126+ */
125127 def groupByKey (implicit serialized : Perhaps [Serialized [K , V ]]): KGroupedStreamS [K , V ] =
126128 serialized.fold[KGroupedStreamS [K , V ]] { inner.groupByKey } { implicit ev => inner.groupByKey(ev) }
127129
128130 def groupBy [KR ](selector : (K , V ) => KR )(implicit serialized : Perhaps [Serialized [KR , V ]]): KGroupedStreamS [KR , V ] = {
129- serialized.fold[KGroupedStreamS [KR , V ]] {
130- inner.groupBy(selector.asKeyValueMapper)
131- } { implicit ev =>
132- inner.groupBy(selector.asKeyValueMapper, ev)
131+ serialized.fold[KGroupedStreamS [KR , V ]] {
132+ inner.groupBy(selector.asKeyValueMapper)
133+ } { implicit ev =>
134+ inner.groupBy(selector.asKeyValueMapper, ev)
133135 }
134136 }
135137
136138 def join [VO , VR ](otherStream : KStreamS [K , VO ],
137139 joiner : (V , VO ) => VR ,
138140 windows : JoinWindows )(implicit joined : Perhaps [Joined [K , V , VO ]]): KStreamS [K , VR ] = {
139141
140- joined.fold[KStreamS [K , VR ]] {
141- inner.join[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
142- inner.join[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows, ev)
142+ joined.fold[KStreamS [K , VR ]] {
143+ inner.join[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
144+ inner.join[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows, ev)
143145 }
144146 }
145147
146148 def join [VT , VR ](table : KTableS [K , VT ],
147149 joiner : (V , VT ) => VR )(implicit joined : Perhaps [Joined [K , V , VT ]]): KStreamS [K , VR ] = {
148150
149- joined.fold[KStreamS [K , VR ]] {
150- inner.leftJoin[VT , VR ](table.inner, joiner.asValueJoiner) } { implicit ev =>
151- inner.leftJoin[VT , VR ](table.inner, joiner.asValueJoiner, ev)
151+ joined.fold[KStreamS [K , VR ]] {
152+ inner.leftJoin[VT , VR ](table.inner, joiner.asValueJoiner) } { implicit ev =>
153+ inner.leftJoin[VT , VR ](table.inner, joiner.asValueJoiner, ev)
152154 }
153155 }
154156
@@ -163,18 +165,18 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
163165 joiner : (V , VO ) => VR ,
164166 windows : JoinWindows )(implicit joined : Perhaps [Joined [K , V , VO ]]): KStreamS [K , VR ] = {
165167
166- joined.fold[KStreamS [K , VR ]] {
167- inner.leftJoin[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
168- inner.leftJoin[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows, ev)
168+ joined.fold[KStreamS [K , VR ]] {
169+ inner.leftJoin[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
170+ inner.leftJoin[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows, ev)
169171 }
170172 }
171173
172174 def leftJoin [VT , VR ](table : KTableS [K , VT ],
173175 joiner : (V , VT ) => VR )(implicit joined : Perhaps [Joined [K , V , VT ]]): KStreamS [K , VR ] = {
174176
175- joined.fold[KStreamS [K , VR ]] {
176- inner.leftJoin[VT , VR ](table.inner, joiner.asValueJoiner) } { implicit ev =>
177- inner.leftJoin[VT , VR ](table.inner, joiner.asValueJoiner, ev)
177+ joined.fold[KStreamS [K , VR ]] {
178+ inner.leftJoin[VT , VR ](table.inner, joiner.asValueJoiner) } { implicit ev =>
179+ inner.leftJoin[VT , VR ](table.inner, joiner.asValueJoiner, ev)
178180 }
179181 }
180182
@@ -189,9 +191,9 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
189191 joiner : (V , VO ) => VR ,
190192 windows : JoinWindows )(implicit joined : Perhaps [Joined [K , V , VO ]]): KStreamS [K , VR ] = {
191193
192- joined.fold[KStreamS [K , VR ]] {
193- inner.outerJoin[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
194- inner.outerJoin[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows, ev)
194+ joined.fold[KStreamS [K , VR ]] {
195+ inner.outerJoin[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
196+ inner.outerJoin[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows, ev)
195197 }
196198 }
197199
0 commit comments