Skip to content

Commit 74b6037

Browse files
committed
add redis sink
1 parent 4a2010b commit 74b6037

File tree

3 files changed

+71
-1
lines changed

3 files changed

+71
-1
lines changed

pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@
7272

7373
<netty-all.version>4.1.50.Final</netty-all.version>
7474

75+
76+
<bahir-redis.version>1.0</bahir-redis.version>
77+
7578
<scope>compile</scope>
7679

7780

@@ -516,6 +519,14 @@
516519
<scope>${scope}</scope>
517520
</dependency>
518521

522+
<!--flink redis connector-->
523+
<dependency>
524+
<groupId>org.apache.bahir</groupId>
525+
<artifactId>flink-connector-redis_${scala.version}</artifactId>
526+
<version>${bahir-redis.version}</version>
527+
<scope>${scope}</scope>
528+
</dependency>
529+
519530

520531
</dependencies>
521532

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.zhangblue.sink
2+
3+
import com.zhangblue.entity.TemperatureSensor
4+
import org.apache.flink.api.common.functions.MapFunction
5+
import org.apache.flink.streaming.api.scala._
6+
import org.apache.flink.streaming.connectors.redis.RedisSink
7+
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
8+
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
9+
10+
/**
11+
* @author di.zhang
12+
* @date 2020/7/14
13+
* @time 23:52
14+
**/
15+
object RedisSinkDemo {
16+
def main(args: Array[String]): Unit = {
17+
18+
val env = StreamExecutionEnvironment.getExecutionEnvironment
19+
20+
env.setParallelism(1)
21+
22+
val inputStream: DataStream[String] = env.readTextFile("src/main/resources/temperature-sensor.txt")
23+
24+
25+
val dataStream: DataStream[TemperatureSensor] = inputStream.map(new MyMapFunction())
26+
27+
//定义一个redis 的配置陪
28+
val conf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()
29+
30+
31+
dataStream.addSink(new RedisSink[TemperatureSensor](conf, new MyRedisMapper))
32+
33+
env.execute("redis sink demo")
34+
}
35+
36+
37+
private class MyMapFunction extends MapFunction[String, TemperatureSensor] {
38+
override def map(value: String): TemperatureSensor = {
39+
val splitData = value.split(",")
40+
TemperatureSensor(splitData(0), splitData(1).toLong, splitData(2), splitData(3).toDouble, splitData(4))
41+
}
42+
}
43+
44+
private class MyRedisMapper extends RedisMapper[TemperatureSensor] {
45+
//定义保存数据到redis的命令
46+
override def getCommandDescription: RedisCommandDescription = {
47+
new RedisCommandDescription(RedisCommand.HSET, "temperatureSensor_tmp")
48+
}
49+
50+
override def getKeyFromData(data: TemperatureSensor): String = {
51+
data.id
52+
}
53+
54+
override def getValueFromData(data: TemperatureSensor): String = {
55+
data.temperature.toString
56+
}
57+
}
58+
59+
}

src/main/scala/com/zhangblue/wc/WordCount.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ object WordCount {
99
//创建一个批处理的执行环境
1010
val env = ExecutionEnvironment.getExecutionEnvironment
1111
//从文件中读取数据
12-
val inputPath = "D:\\software\\workspace\\FlinkTutorial\\src\\main\\resources\\hello.txt"
12+
val inputPath = "src/main/resources/hello.txt"
1313
val inputDataSet = env.readTextFile(inputPath)
1414

1515
//分词之后做count

0 commit comments

Comments
 (0)