Skip to content

Commit f2caf31

Browse files
committed
kafka sink
1 parent 1cc109d commit f2caf31

File tree

2 files changed

+53
-16
lines changed

2 files changed

+53
-16
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
# see kafka.producer.ProducerConfig for more details
16+
17+
############################# Producer Basics #############################
18+
19+
# list of brokers used for bootstrapping knowledge about the rest of the cluster
20+
# format: host1:port1,host2:port2 ...
21+
bootstrap.servers=192.168.247.103:9092
22+
23+
# specify the compression codec for all data generated: none, gzip, snappy, lz4
24+
compression.type=none
25+
26+
# name of the partitioner class for partitioning events; default partition spreads data randomly
27+
#partitioner.class=
28+
29+
# the maximum amount of time the client will wait for the response of a request
30+
#request.timeout.ms=
31+
32+
# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
33+
#max.block.ms=
34+
35+
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
36+
#linger.ms=
37+
38+
# the maximum size of a request in bytes
39+
#max.request.size=
40+
41+
# the default batch size in bytes when batching multiple records sent to a partition
42+
#batch.size=
43+
44+
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
45+
#buffer.memory=

src/main/scala/com/zhangblue/sink/SinkKafkaDemo.scala

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@ package com.zhangblue.sink
22

33
import java.util.Properties
44

5-
import com.alibaba.fastjson.JSONObject
65
import com.zhangblue.entity.TemperatureSensor
76
import org.apache.flink.api.common.serialization.{DeserializationSchema, SimpleStringSchema}
87
import org.apache.flink.streaming.api.scala._
9-
import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer
108
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
119

1210
/**
@@ -21,11 +19,14 @@ object SinkKafkaDemo {
2119
//需求,求出传感器迄今为止探索到的最高温度的传感器数据
2220
//1. 环境
2321
val env = StreamExecutionEnvironment.getExecutionEnvironment
22+
//获取类加载器
23+
val classLoader = this.getClass.getClassLoader;
24+
2425
//2. 读取kafka源,根据传感器的id进行分组,求每组中最大的温度值,并显示结果
2526
val topic = "my-topic"
2627
val valueDeserializationSchema: DeserializationSchema[String] = new SimpleStringSchema()
2728
val props: Properties = new Properties()
28-
props.load(this.getClass.getClassLoader.getResourceAsStream("kafka/localhost-consumer.properties"))
29+
props.load(classLoader.getResourceAsStream("kafka/home-consumer.properties"))
2930
val kafkaSource = env.addSource(new FlinkKafkaConsumer011[String](topic, valueDeserializationSchema, props))
3031

3132
val srcDataStream: DataStream[TemperatureSensor] = kafkaSource.filter(_.trim.nonEmpty).map(fun = lineData => {
@@ -39,21 +40,12 @@ object SinkKafkaDemo {
3940
}).keyBy("id").maxBy("temperature")
4041

4142

42-
val producerConfig:Properties = new Properties();
43-
producerConfig.load(this.getClass.getClassLoader.getResourceAsStream("kafka/localhost-producer.properties"))
43+
//将DataStream 输出到Kafka中
44+
val producerConfig: Properties = new Properties();
45+
producerConfig.load(classLoader.getResourceAsStream("kafka/home-producer.properties"))
4446
srcDataStream.map(data => data.toString).addSink {
45-
sinkFunction = new FlinkKafkaProducer[String]("target-topic", new SimpleStringSchema(), new Properties())
47+
new FlinkKafkaProducer011[String]("flink-target-topic", new SimpleStringSchema(), producerConfig)
4648
}
47-
48-
49-
50-
51-
52-
53-
54-
55-
56-
5749
//3. 启动
5850
env.execute(this.getClass.getSimpleName)
5951
}

0 commit comments

Comments
 (0)