Skip to content

Commit 374a5cd

Browse files
author
centmeng
committed
storm与redis
1 parent edb8413 commit 374a5cd

File tree

4 files changed

+200
-0
lines changed

4 files changed

+200
-0
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.msj.storm.redis;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import redis.clients.jedis.Jedis;
5+
6+
import java.io.Serializable;
7+
import java.util.Map;
8+
9+
10+
/**
11+
* @author Vincent.M [email protected]
12+
* @date 2017/5/11 下午2:57
13+
* @copyright ©2017 孟少杰 All Rights Reserved
14+
* @desc Redis与storm-初始化Jedis
15+
*/
16+
public class RedisOperations implements Serializable {
17+
18+
private static final long serialVersionUID = 1L;
19+
Jedis jedis = null;
20+
21+
public RedisOperations(String redisIP, int port) {
22+
// Connecting to Redis on localhost,也可以使用集群
23+
jedis = new Jedis(redisIP, port);
24+
}
25+
26+
public void insert(Map<String, Object> record, String id) {
27+
try {
28+
jedis.set(id, new ObjectMapper().writeValueAsString(record));
29+
} catch (Exception e) {
30+
System.out.println("Record not persist into datastore : ");
31+
}
32+
}
33+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package com.msj.storm.redis;
2+
3+
import backtype.storm.spout.SpoutOutputCollector;
4+
import backtype.storm.task.TopologyContext;
5+
import backtype.storm.topology.OutputFieldsDeclarer;
6+
import backtype.storm.topology.base.BaseRichSpout;
7+
import backtype.storm.tuple.Fields;
8+
import backtype.storm.tuple.Values;
9+
10+
import java.util.HashMap;
11+
import java.util.Map;
12+
import java.util.Random;
13+
14+
public class SampleSpout extends BaseRichSpout {
15+
private static final long serialVersionUID = 1L;
16+
private SpoutOutputCollector spoutOutputCollector;
17+
18+
private static final Map<Integer, String> FIRSTNAMEMAP = new HashMap<Integer, String>();
19+
static {
20+
FIRSTNAMEMAP.put(0, "john");
21+
FIRSTNAMEMAP.put(1, "nick");
22+
FIRSTNAMEMAP.put(2, "mick");
23+
FIRSTNAMEMAP.put(3, "tom");
24+
FIRSTNAMEMAP.put(4, "jerry");
25+
}
26+
27+
private static final Map<Integer, String> LASTNAME = new HashMap<Integer, String>();
28+
static {
29+
LASTNAME.put(0, "anderson");
30+
LASTNAME.put(1, "watson");
31+
LASTNAME.put(2, "ponting");
32+
LASTNAME.put(3, "dravid");
33+
LASTNAME.put(4, "lara");
34+
}
35+
36+
private static final Map<Integer, String> COMPANYNAME = new HashMap<Integer, String>();
37+
static {
38+
COMPANYNAME.put(0, "abc");
39+
COMPANYNAME.put(1, "dfg");
40+
COMPANYNAME.put(2, "pqr");
41+
COMPANYNAME.put(3, "ecd");
42+
COMPANYNAME.put(4, "awe");
43+
}
44+
45+
public void open(Map conf, TopologyContext context,
46+
SpoutOutputCollector spoutOutputCollector) {
47+
// Open the spout
48+
this.spoutOutputCollector = spoutOutputCollector;
49+
}
50+
51+
public void nextTuple() {
52+
// Storm cluster repeatedly call this method to emit the continuous //
53+
// stream of tuples.
54+
final Random rand = new Random();
55+
// generate the random number from 0 to 4.
56+
int randomNumber = rand.nextInt(5);
57+
spoutOutputCollector.emit (new Values(FIRSTNAMEMAP.get(randomNumber),LASTNAME.get(randomNumber),COMPANYNAME.get(randomNumber)));
58+
}
59+
60+
public void declareOutputFields(OutputFieldsDeclarer declarer) {
61+
// emit the field site.
62+
declarer.declare(new Fields("firstName","lastName","companyName"));
63+
}
64+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.msj.storm.redis;
2+
3+
import backtype.storm.task.TopologyContext;
4+
import backtype.storm.topology.BasicOutputCollector;
5+
import backtype.storm.topology.IBasicBolt;
6+
import backtype.storm.topology.OutputFieldsDeclarer;
7+
import backtype.storm.tuple.Tuple;
8+
9+
import java.util.HashMap;
10+
import java.util.Map;
11+
import java.util.UUID;
12+
13+
public class StormRedisBolt implements IBasicBolt{
14+
15+
private static final long serialVersionUID = 2L;
16+
private RedisOperations redisOperations = null;
17+
private String redisIP = null;
18+
private int port;
19+
public StormRedisBolt(String redisIP, int port) {
20+
this.redisIP = redisIP;
21+
this.port = port;
22+
}
23+
24+
public void execute(Tuple input, BasicOutputCollector collector) {
25+
Map<String, Object> record = new HashMap<String, Object>();
26+
//"firstName","lastName","companyName")
27+
record.put("firstName", input.getValueByField("firstName"));
28+
record.put("lastName", input.getValueByField("lastName"));
29+
record.put("companyName", input.getValueByField("companyName"));
30+
redisOperations.insert(record, UUID.randomUUID().toString());
31+
}
32+
33+
public void declareOutputFields(OutputFieldsDeclarer declarer) {
34+
35+
}
36+
37+
public Map<String, Object> getComponentConfiguration() {
38+
return null;
39+
}
40+
41+
public void prepare(Map stormConf, TopologyContext context) {
42+
redisOperations = new RedisOperations(this.redisIP, this.port);
43+
}
44+
45+
public void cleanup() {
46+
47+
}
48+
49+
}
50+
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package com.msj.storm.redis;
2+
3+
import backtype.storm.Config;
4+
import backtype.storm.LocalCluster;
5+
import backtype.storm.generated.AlreadyAliveException;
6+
import backtype.storm.generated.InvalidTopologyException;
7+
import backtype.storm.topology.TopologyBuilder;
8+
9+
import java.util.ArrayList;
10+
import java.util.List;
11+
12+
/**
13+
* @author Vincent.M [email protected]
14+
* @date 2017/5/11 下午2:57
15+
* @copyright ©2017 孟少杰 All Rights Reserved
16+
* @desc Redis与storm
17+
*/
18+
public class Topology {
19+
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
20+
21+
TopologyBuilder builder = new TopologyBuilder();
22+
23+
List<String> zks = new ArrayList<String>();
24+
zks.add("192.168.1.114");
25+
26+
List<String> cFs = new ArrayList<String>();
27+
cFs.add("personal");
28+
cFs.add("company");
29+
30+
// set the spout class
31+
builder.setSpout("spout", new SampleSpout(), 2);
32+
// set the bolt class
33+
builder.setBolt("bolt", new StormRedisBolt("192.168.1.114",6379), 2).shuffleGrouping("spout");
34+
35+
Config conf = new Config();
36+
conf.setDebug(true);
37+
// create an instance of LocalCluster class for
38+
// executing topology in local mode.
39+
LocalCluster cluster = new LocalCluster();
40+
41+
// StormRedisTopology is the name of submitted topology.
42+
cluster.submitTopology("StormRedisTopology", conf, builder.createTopology());
43+
try {
44+
Thread.sleep(10000);
45+
} catch (Exception exception) {
46+
System.out.println("Thread interrupted exception : " + exception);
47+
}
48+
// kill the StormRedisTopology
49+
cluster.killTopology("StormRedisTopology");
50+
// shutdown the storm test cluster
51+
cluster.shutdown();
52+
}
53+
}

0 commit comments

Comments
 (0)