Skip to content

Commit b8fd44f

Browse files
committed
add transfermation api
1 parent ca3466a commit b8fd44f

File tree

11 files changed

+465
-3
lines changed

11 files changed

+465
-3
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
# see kafka.consumer.ConsumerConfig for more details
16+
17+
# Zookeeper connection string
18+
# comma separated host:port pairs, each corresponding to a zk
19+
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
20+
zookeeper.connect=192.168.247.103:2181/kafka011
21+
bootstrap.servers=192.168.247.103:9092
22+
23+
24+
# timeout in ms for connecting to zookeeper
25+
zookeeper.connection.timeout.ms=6000
26+
27+
#consumer group id
28+
group.id=test-consumer-group
29+
30+
#consumer timeout
31+
#consumer.timeout.ms=5000
32+
33+
auto.offset.reset=latest
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
sensor_1,1547718299,Redis,35.8,北京
2+
sensor_6,1547718201,Kafka,15.4,广州
3+
sensor_7,1547718202,Flume,6.7,武汉
4+
sensor_8,1547718205,Spark,38.1,深圳
5+
sensor_1,1547718399,Tomcat,32.8,乌鲁木齐
6+
sensor_1,1547718499,Hadoop,33.8,石家庄

src/main/scala/com/zhangblue/api/SourceTest.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
package com.zhangblue.api
22

3-
import java.util.Properties
3+
44
import java.util.concurrent.TimeUnit
55

6-
import org.apache.flink.api.common.serialization.SimpleStringSchema
76
import org.apache.flink.streaming.api.functions.source.SourceFunction
87
import org.apache.flink.streaming.api.scala.{DataStream, _}
9-
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
108

119
import scala.util.Random
1210

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.zhangblue.entity
2+
3+
/**
4+
* 温度传感器对象类
5+
*
6+
* @param id 传感器id
7+
* @param timestamp 时间戳
8+
* @param name 旅客的名字
9+
* @param temperature 温度
10+
* @param location 传感器位置
11+
*/
12+
case class TemperatureSensor(id: String, timestamp: Long, name: String, temperature: Double, location: String)
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.zhangblue.source.kafka
2+
3+
import java.util.Properties
4+
5+
import com.zhangblue.entity.TemperatureSensor
6+
import org.apache.flink.api.common.serialization.{DeserializationSchema, SimpleStringSchema}
7+
import org.apache.flink.streaming.api.scala._
8+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
9+
10+
/**
11+
* 以kafka中的数据作为dataStream的Source的来源
12+
*/
13+
object KafkaAsSourceDemo {
14+
def main(args: Array[String]): Unit = {
15+
//1. 环境
16+
val env = StreamExecutionEnvironment.getExecutionEnvironment
17+
18+
//2. 定义kafka作为source,计算,并显示结果
19+
val topic = "my-flink-topic"
20+
val valueDeserializationSchema: DeserializationSchema[String] = new SimpleStringSchema()
21+
val props: Properties = new Properties()
22+
props.load(this.getClass.getClassLoader.getResourceAsStream("kafka/home-consumer.properties"))
23+
val kafkaSource = env.addSource(new FlinkKafkaConsumer011[String](topic, valueDeserializationSchema, props))
24+
25+
kafkaSource.filter(_.trim.nonEmpty).map(fun = linedata => {
26+
val arr = linedata.split(",")
27+
val id = arr(0).trim
28+
val timestamp = arr(1).trim.toLong
29+
val name = arr(2).trim
30+
val temperature = arr(3).trim.toDouble
31+
val location = arr(4).trim
32+
TemperatureSensor(id, timestamp, name, temperature, location)
33+
}).print("source from kafka : ")
34+
35+
//3. 启动
36+
env.execute("source from kafka demo")
37+
38+
39+
}
40+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package com.zhangblue.source.self
2+
3+
import java.util.concurrent.TimeUnit
4+
5+
6+
import com.zhangblue.entity.TemperatureSensor
7+
import org.apache.flink.streaming.api.functions.source.SourceFunction
8+
import org.apache.flink.streaming.api.scala._
9+
10+
import scala.io.Source
11+
import scala.util.Random
12+
13+
/**
14+
* 自定义source
15+
*
16+
* 读取src/main/resources/temperature-sensor.txt文件,每1秒随机取文件中的一行内容,封装成自定义类,作为source发送给stream
17+
*/
18+
object SelfAsSourceDemo {
19+
def main(args: Array[String]): Unit = {
20+
//1. 环境
21+
val env = StreamExecutionEnvironment.getExecutionEnvironment
22+
//2. 自定义SourceFunction接口的子类
23+
//2.1 方式1: 匿名内部类
24+
//2.2 方式2: 成员内部类
25+
val sleepTime = 1
26+
val logFilePath = "src/main/resources/temperature-sensor.txt"
27+
val mySrc = new MySource(sleepTime, logFilePath)
28+
//3. 从自定义的Source中读取数据,计算,显示结果
29+
env.addSource(mySrc).print("自定义source结果 : ")
30+
//4. 启动
31+
env.execute("SelfAsSourceDemo")
32+
}
33+
34+
/**
35+
* 自定义source实现类
36+
*
37+
* @param sleepTimeSec 休息时间
38+
* @param logFilePath 日志文件的路径
39+
*/
40+
private class MySource(sleepTimeSec: Int, logFilePath: String) extends SourceFunction[TemperatureSensor] {
41+
42+
/**
43+
* 标识值:true->继续发送数据 false->停止发送数据
44+
*/
45+
private var flg = true
46+
47+
override def run(ctx: SourceFunction.SourceContext[TemperatureSensor]): Unit = {
48+
//1. 读取文件, 封装成TemperatureSensor的实例
49+
val lst: List[TemperatureSensor] = Source
50+
.fromFile(logFilePath).getLines().toList.map(lineData => {
51+
val arr = lineData.split(",")
52+
val id = arr(0).trim
53+
val timestamp = arr(1).trim.toLong
54+
val name = arr(2).trim
55+
val temperature = arr(3).trim.toDouble
56+
val location = arr(4).trim
57+
TemperatureSensor(id, timestamp, name, temperature, location)
58+
})
59+
//2. 如果没有cancel,通过循环来模拟每间隔1秒钟向Source发送一条数据。
60+
while (flg) {
61+
val randomIndex = Random.nextInt(lst.size)
62+
val randomInfo = lst(randomIndex)
63+
ctx.collect(randomInfo)
64+
TimeUnit.SECONDS.sleep(sleepTimeSec)
65+
}
66+
}
67+
68+
override def cancel(): Unit = flg = false
69+
}
70+
71+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package com.zhangblue.transformation
2+
3+
import java.util.Properties
4+
5+
import com.zhangblue.entity.TemperatureSensor
6+
import org.apache.flink.api.common.serialization.{DeserializationSchema, SimpleStringSchema}
7+
import org.apache.flink.streaming.api.functions.ProcessFunction
8+
import org.apache.flink.streaming.api.scala._
9+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
10+
import org.apache.flink.util.Collector
11+
12+
/**
13+
* connect demo
14+
*/
15+
object ConnectedStreamDemo {
16+
def main(args: Array[String]): Unit = {
17+
//需求:基于之前的侧输出流的案例,将主输出流与测输出流集中寄来,进行统一处理
18+
val env = StreamExecutionEnvironment.getExecutionEnvironment
19+
20+
val topic = "my-flink-topic"
21+
val valueDeserializationSchema: DeserializationSchema[String] = new SimpleStringSchema()
22+
val props: Properties = new Properties()
23+
props.load(this.getClass.getClassLoader.getResourceAsStream("kafka/home-consumer.properties"))
24+
val kafkaSource = env.addSource(new FlinkKafkaConsumer011[String](topic, valueDeserializationSchema, props))
25+
26+
//获得source
27+
val srcDataStream: DataStream[TemperatureSensor] = kafkaSource.filter(_.trim.nonEmpty).map(fun = lineData => {
28+
val arr = lineData.split(",")
29+
val id = arr(0).trim
30+
val timestamp = arr(1).trim.toLong
31+
val name = arr(2).trim
32+
val temperature = arr(3).trim.toDouble
33+
val location = arr(4).trim
34+
TemperatureSensor(id, timestamp, name, temperature, location)
35+
})
36+
37+
//针对DataStream调用侧输出流进行处理
38+
val outputTag: OutputTag[TemperatureSensor] = OutputTag("temperature_exception")
39+
val resultDataStream: DataStream[TemperatureSensor] = srcDataStream.process[TemperatureSensor](new MyProcessFunction(outputTag))
40+
41+
//主输出流
42+
val mainDataStream: DataStream[(String, String)] = resultDataStream.map(data => (data.id, data.name))
43+
//测输出流
44+
val sideOutputStream: DataStream[(String, String, Double, Long)] = resultDataStream.getSideOutput(outputTag).map(data => (data.id, data.name, data.temperature, data.timestamp))
45+
46+
//合并两个流中的数据,获得ConnectedStreams
47+
val connDataStream: ConnectedStreams[(String, String), (String, String, Double, Long)] = mainDataStream.connect(sideOutputStream)
48+
49+
//ConnectedStreams进行集中式处理
50+
val finalDataStream: DataStream[String] = connDataStream.map(
51+
mStreamData => s"传感器id = ${mStreamData._1} , 旅客名 = ${mStreamData._2} , 您的体温正常",
52+
oStreamData => s"传感器id = ${oStreamData._1} , 旅客名 = ${oStreamData._2} , 您的体温异常 = ${oStreamData._3}, 时间 = ${oStreamData._4}"
53+
)
54+
55+
finalDataStream.print()
56+
57+
env.execute(this.getClass.getSimpleName)
58+
}
59+
60+
/**
61+
* 自定义一个ProcessFunction子类
62+
*
63+
* @param outputTag 用来给侧输出流中的数据添加标签
64+
*
65+
*/
66+
private class MyProcessFunction(outputTag: OutputTag[TemperatureSensor]) extends ProcessFunction[TemperatureSensor, TemperatureSensor] {
67+
/**
68+
* 每分析DataStream中的一个元素,下述方法就执行一次
69+
*
70+
* @param value 当前的元素
71+
* @param ctx 上下文信息,用于向侧输出流中写入数据
72+
* @param out 用于向主输出流中写入数据
73+
*/
74+
override def processElement(value: TemperatureSensor, ctx: ProcessFunction[TemperatureSensor, TemperatureSensor]#Context, out: Collector[TemperatureSensor]): Unit = {
75+
if (value.temperature < 37) {
76+
//取出体温正常的信息
77+
out.collect(value)
78+
} else {
79+
//取出体温异常的信息
80+
ctx.output[TemperatureSensor](outputTag, value)
81+
}
82+
}
83+
}
84+
85+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.zhangblue.transformation
2+
3+
import java.util.Properties
4+
5+
import com.zhangblue.entity.TemperatureSensor
6+
import org.apache.flink.api.common.functions.ReduceFunction
7+
import org.apache.flink.api.common.serialization.{DeserializationSchema, SimpleStringSchema}
8+
import org.apache.flink.streaming.api.scala._
9+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
10+
11+
/**
12+
* reduce算子
13+
*/
14+
object ReduceDemo {
15+
16+
def main(args: Array[String]): Unit = {
17+
//需求:获取每个传感器获取的最新戳与最高的温度
18+
//1. 环境
19+
val env = StreamExecutionEnvironment.getExecutionEnvironment
20+
//2. 读取kafka源,根据传感器的id进行分组,计算每个传感器的最新时间戳与最高温度
21+
val topic = "my-flink-topic"
22+
val valueDeserializationSchema: DeserializationSchema[String] = new SimpleStringSchema()
23+
val props: Properties = new Properties()
24+
props.load(this.getClass.getClassLoader.getResourceAsStream("kafka/home-consumer.properties"))
25+
val kafkaSource = env.addSource(new FlinkKafkaConsumer011[String](topic, valueDeserializationSchema, props))
26+
27+
kafkaSource.filter(_.trim.nonEmpty).map(fun = lineData => {
28+
val arr = lineData.split(",")
29+
val id = arr(0).trim
30+
val timestamp = arr(1).trim.toLong
31+
val name = arr(2).trim
32+
val temperature = arr(3).trim.toDouble
33+
val location = arr(4).trim
34+
TemperatureSensor(id, timestamp, name, temperature, location)
35+
}).keyBy(data => data.id).reduce(new MyReduce).print("最终结果 :")
36+
//3. 启动
37+
env.execute(this.getClass.getSimpleName)
38+
}
39+
40+
//自定义reduce function
41+
class MyReduce extends ReduceFunction[TemperatureSensor] {
42+
override def reduce(value1: TemperatureSensor, value2: TemperatureSensor): TemperatureSensor = {
43+
TemperatureSensor(value1.id, value1.timestamp.max(value2.timestamp), value1.name, value1.temperature.max(value2.temperature), value1.location)
44+
}
45+
}
46+
47+
}
48+
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.zhangblue.transformation
2+
3+
import java.util.Properties
4+
5+
import com.zhangblue.entity.TemperatureSensor
6+
import org.apache.flink.api.common.serialization.{DeserializationSchema, SimpleStringSchema}
7+
import org.apache.flink.streaming.api.scala._
8+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
9+
10+
/**
11+
* 滚动聚合算子演示
12+
*
13+
*/
14+
object RollingAggregationDemo {
15+
def main(args: Array[String]): Unit = {
16+
//需求,求出传感器迄今为止探索到的最高温度的传感器数据
17+
//1. 环境
18+
val env = StreamExecutionEnvironment.getExecutionEnvironment
19+
//2. 读取kafka源,根据传感器的id进行分组,求每组中最大的温度值,并显示结果
20+
val topic = "my-flink-topic"
21+
val valueDeserializationSchema: DeserializationSchema[String] = new SimpleStringSchema()
22+
val props: Properties = new Properties()
23+
props.load(this.getClass.getClassLoader.getResourceAsStream("kafka/home-consumer.properties"))
24+
val kafkaSource = env.addSource(new FlinkKafkaConsumer011[String](topic, valueDeserializationSchema, props))
25+
26+
kafkaSource.filter(_.trim.nonEmpty).map(fun = lineData => {
27+
val arr = lineData.split(",")
28+
val id = arr(0).trim
29+
val timestamp = arr(1).trim.toLong
30+
val name = arr(2).trim
31+
val temperature = arr(3).trim.toDouble
32+
val location = arr(4).trim
33+
TemperatureSensor(id, timestamp, name, temperature, location)
34+
}).keyBy("id").maxBy("temperature").print("最高温度的传感器信息为:")
35+
//3. 启动
36+
env.execute(this.getClass.getSimpleName)
37+
}
38+
}

0 commit comments

Comments
 (0)