File tree Expand file tree Collapse file tree 5 files changed +165
-0
lines changed Expand file tree Collapse file tree 5 files changed +165
-0
lines changed Original file line number Diff line number Diff line change 1+ # Created by .ignore support plugin (hsz.mobi)
2+ # ## Scala template
3+ * .class
4+ * .log
5+
6+ # ## Java template
7+ # Compiled class file
8+ * .class
9+
10+ # Log file
11+ * .log
12+
13+ # BlueJ files
14+ * .ctxt
15+
16+ # Mobile Tools for Java (J2ME)
17+ .mtj.tmp /
18+
19+ # Package Files #
20+ * .jar
21+ * .war
22+ * .nar
23+ * .ear
24+ * .zip
25+ * .tar.gz
26+ * .rar
27+
28+ # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
29+ hs_err_pid *
30+ * .iml
31+ .idea /
32+ target /
Original file line number Diff line number Diff line change 1+ <?xml version =" 1.0" encoding =" UTF-8" ?>
2+ <project xmlns =" http://maven.apache.org/POM/4.0.0"
3+ xmlns : xsi =" http://www.w3.org/2001/XMLSchema-instance"
4+ xsi : schemaLocation =" http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
5+ <modelVersion >4.0.0</modelVersion >
6+
7+ <groupId >com.zhangblue</groupId >
8+ <artifactId >FlinkTutorial</artifactId >
9+ <version >1.0-SNAPSHOT</version >
10+
11+ <dependencies >
12+ <dependency >
13+ <groupId >org.apache.flink</groupId >
14+ <artifactId >flink-scala_2.11</artifactId >
15+ <version >1.7.2</version >
16+ </dependency >
17+
18+ <dependency >
19+ <groupId >org.apache.flink</groupId >
20+ <artifactId >flink-streaming-scala_2.11</artifactId >
21+ <version >1.7.2</version >
22+ </dependency >
23+ </dependencies >
24+
25+ <build >
26+ <plugins >
27+ <!-- 该插件用于将scala代码编译成class文件-->
28+ <plugin >
29+ <groupId >net.alchim31.maven</groupId >
30+ <artifactId >scala-maven-plugin</artifactId >
31+ <version >3.4.6</version >
32+ <executions >
33+ <execution >
34+ <!-- 声明绑定到maven的compile阶段-->
35+ <goals >
36+ <goal >testCompile</goal >
37+ </goals >
38+ </execution >
39+ </executions >
40+ </plugin >
41+ <plugin >
42+ <groupId >org.apache.maven.plugins</groupId >
43+ <artifactId >maven-assembly-plugin</artifactId >
44+ <version >3.0.0</version >
45+ <configuration >
46+ <descriptorRefs >
47+ <descriptorRef >jar-with-dependencies</descriptorRef >
48+ </descriptorRefs >
49+ </configuration >
50+ <executions >
51+ <execution >
52+ <id >make-assembly</id >
53+ <phase >package</phase >
54+ <goals >
55+ <goal >single</goal >
56+ </goals >
57+ </execution >
58+ </executions >
59+ </plugin >
60+ </plugins >
61+ </build >
62+ </project >
Original file line number Diff line number Diff line change 1+ hello flink
2+ hello word
3+ hello scala
4+ how are you
5+ fine thank you
6+ and you
Original file line number Diff line number Diff line change 1+ package com .zhangblue .wc
2+
3+ import org .apache .flink .api .java .utils .ParameterTool
4+ import org .apache .flink .streaming .api .scala ._
5+
6+ /**
7+ * 流式处理word count
8+ */
9+ object StreamWordCount {
10+
11+ def main (args : Array [String ]): Unit = {
12+
13+ // 通过参数进行传递. 参数传递方式:--host localhost --port 7777
14+ val parameter = ParameterTool .fromArgs(args)
15+ val host : String = parameter.get(" host" )
16+ val port : Int = parameter.getInt(" port" )
17+
18+
19+ // 创建一个流处理的执行环境
20+ val env = StreamExecutionEnvironment .getExecutionEnvironment
21+
22+ // 接收socket数据流
23+ val testDataStream = env.socketTextStream(host,port)
24+
25+ // 逐一读取数据
26+ val wordCountDataStream = testDataStream.flatMap(_.split(" " ))
27+ .filter(_.nonEmpty)
28+ .map((_,1 ))
29+ .keyBy(0 )
30+ .sum(1 )
31+
32+ // 打印输出
33+ wordCountDataStream.print()
34+
35+ // 执行任务
36+ env.execute(" Stream WordCount Example" )
37+ }
38+ }
Original file line number Diff line number Diff line change 1+ package com .zhangblue .wc
2+
3+ import org .apache .flink .api .scala ._
4+
5+ // 批处理word count
6+ object WordCount {
7+
8+ def main (args : Array [String ]): Unit = {
9+ // 创建一个批处理的执行环境
10+ val env = ExecutionEnvironment .getExecutionEnvironment
11+ // 从文件中读取数据
12+ val inputPath = " D:\\ software\\ workspace\\ FlinkTutorial\\ src\\ main\\ resources\\ hello.txt"
13+ val inputDataSet = env.readTextFile(inputPath)
14+
15+ // 分词之后做count
16+ val wordCountDataSet = inputDataSet.flatMap(_.split(" " ))
17+ .map((_,1 ))
18+ .groupBy(0 )
19+ .sum(1 )
20+
21+
22+ // 打印输出
23+ wordCountDataSet.print()
24+ }
25+
26+
27+ }
You can’t perform that action at this time.
0 commit comments