11package com .zhangblue .api
22
33import java .util .Properties
4+ import java .util .concurrent .TimeUnit
45
56import org .apache .flink .api .common .serialization .SimpleStringSchema
7+ import org .apache .flink .streaming .api .functions .source .SourceFunction
68import org .apache .flink .streaming .api .scala .{DataStream , _ }
79import org .apache .flink .streaming .connectors .kafka .FlinkKafkaConsumer011
810
11+ import scala .util .Random
12+
913case class SensorReading (id : String , timestamp : Long , temperature : Double )
1014
1115object SourceTest {
@@ -24,24 +28,57 @@ object SourceTest {
2428 ))
2529
2630 // 从文件中读取数据
27- val stream2 : DataStream [String ] = evn.readTextFile(" D:\\ software\\ workspace\\ FlinkTutorial\\ src\\ main\\ resources\\ sensor" )
31+ // val stream2: DataStream[String] = evn.readTextFile("D:\\software\\workspace\\FlinkTutorial\\src\\main\\resources\\sensor")
2832
2933
3034 // 从socket文本流
31- val stream3 : DataStream [String ] = evn.socketTextStream(" localhost" , 7777 )
35+ // val stream3: DataStream[String] = evn.socketTextStream("localhost", 7777)
3236
3337
3438 // 从kafka读取数据
35- val properties = new Properties ()
36- properties.setProperty(" bootstrap.servers" , " localhost:9092" )
37- properties.setProperty(" group.id" , " consumer-group" )
38- properties.setProperty(" key.deserializer" , " org.apache.kafka.common.serialization.StringDeserializer" )
39- properties.setProperty(" value.deserializer" , " org.apache.kafka.common.serialization.StringDeserializer" )
40- properties.setProperty(" auto.offset.reset" , " latest" )
41-
42- val stream4 = evn.addSource(new FlinkKafkaConsumer011 [String ](" topic1" , new SimpleStringSchema (), properties))
43- stream2.print(" stream2" )
39+ // val properties = new Properties()
40+ // properties.setProperty("bootstrap.servers", "localhost:9092")
41+ // properties.setProperty("group.id", "consumer-group")
42+ // properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
43+ // properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
44+ // properties.setProperty("auto.offset.reset", "latest")
45+ //
46+ // val stream4 = evn.addSource(new FlinkKafkaConsumer011[String]("my-topic", new SimpleStringSchema(), properties))
47+
48+
49+ // 自定义source
50+ val stream5 = evn.addSource(new MySource ())
51+
52+ stream5.print(" stream5" )
4453 evn.execute(" source demo" )
4554
4655 }
4756}
57+
58+ // 实现一个自定义的SourceFunction, 自动生成测试数据
59+ class MySource () extends SourceFunction [SensorReading ] {
60+ // 定义一个flag,表示数据源是否正常运行
61+ var running : Boolean = true
62+
63+ override def cancel (): Unit = running = false
64+
65+ override def run (sourceContext : SourceFunction .SourceContext [SensorReading ]): Unit = {
66+ // 定义一个随机数发生器
67+ val random = Random
68+ // 随机生成10个传感器的温度值,并且不停更新(随机上线波动)
69+ // 首先生成10个传感器的初始温度
70+ var currentTemps = 1 .to(10 ).map(i => (" sensor_" + i, 60 + random.nextGaussian() * 20 ))
71+
72+ // 无限循环,生成随机数据
73+ while (running) {
74+ // 在单钱温度基础上,随机生成微小波动
75+ currentTemps = currentTemps.map(data => (data._1, data._2 + random.nextGaussian()))
76+ // 获取当前系统时间
77+ val currentTime = System .currentTimeMillis();
78+ // 包装成样例类,适用sourceContext发出数据
79+ currentTemps.foreach(data => sourceContext.collect(new SensorReading (data._1, currentTime, data._2)))
80+ // 定义间隔时间
81+ TimeUnit .SECONDS .sleep(1L )
82+ }
83+ }
84+ }
0 commit comments