File tree Expand file tree Collapse file tree 1 file changed +72
-0
lines changed Expand file tree Collapse file tree 1 file changed +72
-0
lines changed Original file line number Diff line number Diff line change 1+ # Flink Demo
2+
3+ ## 1. Word Count
4+ ### 1.1 批处理
5+ ``` scala
6+ package com .zhangblue .wc
7+
8+ import org .apache .flink .api .scala ._
9+
10+ // 批处理word count
11+ object WordCount {
12+
13+ def main (args : Array [String ]): Unit = {
14+ // 创建一个批处理的执行环境
15+ val env = ExecutionEnvironment .getExecutionEnvironment
16+ // 从文件中读取数据
17+ val inputPath = " D:\\ software\\ workspace\\ FlinkTutorial\\ src\\ main\\ resources\\ hello.txt"
18+ val inputDataSet = env.readTextFile(inputPath)
19+
20+ // 分词之后做count
21+ val wordCountDataSet = inputDataSet.flatMap(_.split(" " ))
22+ .map((_,1 ))
23+ .groupBy(0 )
24+ .sum(1 )
25+
26+ // 打印输出
27+ wordCountDataSet.print()
28+ }
29+ }
30+ ```
31+
32+ ### 1.2 流式处理
33+ ``` scala
34+ package com .zhangblue .wc
35+
36+ import org .apache .flink .api .java .utils .ParameterTool
37+ import org .apache .flink .streaming .api .scala ._
38+
39+ /**
40+ * 流式处理word count
41+ */
42+ object StreamWordCount {
43+
44+ def main (args : Array [String ]): Unit = {
45+
46+ // 通过参数进行传递. 参数传递方式:--host localhost --port 7777
47+ val parameter = ParameterTool .fromArgs(args)
48+ val host : String = parameter.get(" host" )
49+ val port : Int = parameter.getInt(" port" )
50+
51+
52+ // 创建一个流处理的执行环境
53+ val env = StreamExecutionEnvironment .getExecutionEnvironment
54+
55+ // 接收socket数据流
56+ val testDataStream = env.socketTextStream(host,port)
57+
58+ // 逐一读取数据
59+ val wordCountDataStream = testDataStream.flatMap(_.split(" " ))
60+ .filter(_.nonEmpty)
61+ .map((_,1 ))
62+ .keyBy(0 )
63+ .sum(1 )
64+
65+ // 打印输出
66+ wordCountDataStream.print()
67+
68+ // 执行任务
69+ env.execute(" Stream WordCount Example" )
70+ }
71+ }
72+ ```
You can’t perform that action at this time.
0 commit comments