Skip to content

Commit 27e6e7b

Browse files
committed
@wjw_change: 为了加快在List类型的"p:a:s"里存取session,
添加一个存放所有session的集合类型的Set-"p:s:s",其成员值是session的id. 在往List里添加,删除指定的session时,先判断Set里是否包含此session.
1 parent 4807250 commit 27e6e7b

File tree

6 files changed

+226
-29
lines changed

6 files changed

+226
-29
lines changed

pushlet.jar

1.9 KB
Binary file not shown.

src/nl/justobjects/pushlet/core/Session.java

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33

44
package nl.justobjects.pushlet.core;
55

6+
import java.io.UnsupportedEncodingException;
7+
import java.util.HashMap;
8+
import java.util.Iterator;
9+
import java.util.Map;
10+
611
import nl.justobjects.pushlet.redis.RedisManager;
712
import nl.justobjects.pushlet.util.Log;
813
import nl.justobjects.pushlet.util.PushletException;
9-
import nl.justobjects.pushlet.util.Sys;
1014

1115
/**
1216
* Represents client pushlet session state.
@@ -68,8 +72,9 @@ public static Session create(String anId) throws PushletException {
6872
session.myHkey = PUSHLET_SESSION_PREFIX + session.id;
6973
if (session.isPersistence()) {
7074
session.readStatus();
75+
} else {
76+
session.saveStatus();
7177
}
72-
session.saveStatus();
7378

7479
return session;
7580
}
@@ -191,8 +196,14 @@ public void stop() {
191196
redis.hset(myHkey, "timeToLive", String.valueOf(timeToLive));
192197

193198
if (this.temporary) {
199+
//->先把ID从键为("p:s:s"的Set以及"p:a:s"的List)里删除
200+
if (redis.sismember(SessionManager.PUSHLET_SET_SESSION, id)) {
201+
redis.lrem(SessionManager.PUSHLET_ALL_SESSION, 0, id);
202+
redis.srem(SessionManager.PUSHLET_SET_SESSION, id);
203+
}
204+
//<-先把ID从键为("p:s:s"的Set以及"p:a:s"的List)里删除
205+
194206
redis.del(myHkey);
195-
redis.lrem(SessionManager.PUSHLET_ALL_SESSION, 0, id);
196207
}
197208

198209
subscriber.stop();
@@ -231,42 +242,66 @@ public boolean isPersistence() {
231242
}
232243

233244
public void saveStatus() {
234-
//->先把ID放到键为"p:a:s"的List里
235-
redis.lrem(SessionManager.PUSHLET_ALL_SESSION, 0, id);
236-
redis.lpush(SessionManager.PUSHLET_ALL_SESSION, id);
237-
//<-先把ID放到键为"p:a:s"的List里
245+
Map<String, String> keyValues = new HashMap<String, String>(8);
238246

239247
if (userAgent != null) {
240-
redis.hset(myHkey, "userAgent", userAgent);
248+
keyValues.put("userAgent", userAgent);
241249
}
242-
redis.hset(myHkey, "createDate", String.valueOf(createDate));
243-
redis.hset(myHkey, "temporary", String.valueOf(temporary));
244-
redis.hset(myHkey, "timeToLive", String.valueOf(timeToLive));
250+
keyValues.put("createDate", String.valueOf(createDate));
251+
keyValues.put("temporary", String.valueOf(temporary));
252+
keyValues.put("timeToLive", String.valueOf(timeToLive));
245253
if (address != null) {
246-
redis.hset(myHkey, "address", address);
254+
keyValues.put("address", address);
247255
}
248256
if (format != null) {
249-
redis.hset(myHkey, "format", format);
257+
keyValues.put("format", format);
258+
}
259+
260+
redis.hmset(myHkey, keyValues);
261+
262+
//->再把ID放到键为("p:s:s"的Set以及"p:a:s"的List)里
263+
if (redis.sismember(SessionManager.PUSHLET_SET_SESSION, id) == false) {
264+
redis.sadd(SessionManager.PUSHLET_SET_SESSION, id);
265+
redis.lpush(SessionManager.PUSHLET_ALL_SESSION, id);
250266
}
267+
//<-再把ID放到键为("p:s:s"的Set以及"p:a:s"的List)里
268+
269+
keyValues.clear();
251270
}
252271

253272
public void readStatus() {
273+
java.util.Map<byte[], byte[]> bhash = redis.hgetAll(myHkey);
274+
java.util.Map<String, String> keyValues = new HashMap<String, String>(bhash.size());
275+
Map.Entry<byte[], byte[]> entry;
276+
Iterator<Map.Entry<byte[], byte[]>> iterator = bhash.entrySet().iterator();
277+
while (iterator.hasNext()) {
278+
entry = iterator.next();
279+
try {
280+
keyValues.put(new String(entry.getKey(), RedisManager.REDIS_CHARSET), new String(entry.getValue(), RedisManager.REDIS_CHARSET));
281+
} catch (UnsupportedEncodingException e) {
282+
e.printStackTrace();
283+
}
284+
}
285+
bhash.clear();
286+
254287
String tmpStr;
255-
userAgent = redis.hget(myHkey, "userAgent");
256-
tmpStr = redis.hget(myHkey, "createDate");
288+
userAgent = keyValues.get("userAgent");
289+
tmpStr = keyValues.get("createDate");
257290
if (tmpStr != null) {
258291
createDate = Long.parseLong(tmpStr);
259292
}
260-
tmpStr = redis.hget(myHkey, "temporary");
293+
tmpStr = keyValues.get("temporary");
261294
if (tmpStr != null) {
262295
temporary = Boolean.parseBoolean(tmpStr);
263296
}
264-
tmpStr = redis.hget(myHkey, "timeToLive");
297+
tmpStr = keyValues.get("timeToLive");
265298
if (tmpStr != null) {
266299
timeToLive = Long.parseLong(tmpStr);
267300
}
268-
address = redis.hget(myHkey, "address");
269-
format = redis.hget(myHkey, "format");
301+
address = keyValues.get("address");
302+
format = keyValues.get("format");
303+
304+
keyValues.clear();
270305
}
271306

272307
}

src/nl/justobjects/pushlet/core/SessionManager.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class SessionManager implements ConfigDefs {
2828
static RedisManager redis = RedisManager.getInstance();
2929

3030
static final String PUSHLET_ALL_SESSION = "p:a:s";
31+
static final String PUSHLET_SET_SESSION = "p:s:s";
3132

3233
/**
3334
* Singleton pattern: single instance.
@@ -96,11 +97,11 @@ public void apply(Object visitor, Method method, Object[] args) {
9697
Log.warn("apply: method invoke: ", e);
9798
}
9899
}
99-
100+
100101
//@wjw_add 当服务器发出E_ABORT消息时,只发给当前节点的Session
101-
if(args.length==2 && args[1] instanceof Event) {
102+
if (args.length == 2 && args[1] instanceof Event) {
102103
Event event = (Event) args[1];
103-
if(event.getEventType().equals(Protocol.E_ABORT)) {
104+
if (event.getEventType().equals(Protocol.E_ABORT)) {
104105
return;
105106
}
106107
}
@@ -116,7 +117,7 @@ public void apply(Object visitor, Method method, Object[] args) {
116117
if (allSessions.size() == 0) {
117118
break;
118119
}
119-
120+
120121
start = start + allSessions.size();
121122
for (byte[] bb : allSessions) {
122123
try {
@@ -153,7 +154,7 @@ public void apply(Object visitor, Method method, Object[] args) {
153154
// Log.warn("apply: method invoke: ", e);
154155
// }
155156
// }
156-
157+
157158
}
158159

159160
/**

src/nl/justobjects/pushlet/core/Subscriber.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,9 @@ public static Subscriber create(Session aSession) throws PushletException {
7979

8080
if (subscriber.isPersistence()) {
8181
subscriber.readStatus();
82+
} else {
83+
subscriber.saveStatus();
8284
}
83-
subscriber.saveStatus();
8485

8586
return subscriber;
8687
}
@@ -336,7 +337,7 @@ public Subscription match(Event event) {
336337
return null;
337338
}
338339

339-
Subscription subscription = (Subscription) redis.fromXML(strSubscription);
340+
Subscription subscription = (Subscription) redis.fromXML(strSubscription); //TODO@wjw_note 可以优化的地方
340341

341342
return subscription;
342343

src/nl/justobjects/pushlet/redis/RedisManager.java

Lines changed: 163 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
import internal.org.apache.commons.pool.impl.GenericObjectPool;
66

77
import java.io.IOException;
8+
import java.util.HashMap;
9+
import java.util.Iterator;
10+
import java.util.Map;
811

912
import nl.justobjects.pushlet.core.Config;
1013
import nl.justobjects.pushlet.core.ConfigDefs;
@@ -116,6 +119,7 @@ public Object fromXML(String xml) {
116119
return _xstream.fromXML(xml);
117120
}
118121

122+
//基本操作
119123
public java.util.Set<byte[]> keys(String pattern) {
120124
if (_pool != null) {
121125
Jedis jedis = null;
@@ -376,12 +380,12 @@ public Long hset(String hkey, String field, String value) {
376380
}
377381
}
378382

379-
public Long hdel(String key, String field) {
383+
public Long hdel(String hkey, String field) {
380384
if (_pool != null) {
381385
Jedis jedis = null;
382386
try {
383387
jedis = _pool.getResource();
384-
return jedis.hdel(key.getBytes(REDIS_CHARSET), field.getBytes(REDIS_CHARSET));
388+
return jedis.hdel(hkey.getBytes(REDIS_CHARSET), field.getBytes(REDIS_CHARSET));
385389
} catch (IOException e) {
386390
throw new JedisConnectionException(e);
387391
} finally {
@@ -396,7 +400,7 @@ public Long hdel(String key, String field) {
396400
ShardedJedis jedis = null;
397401
try {
398402
jedis = _shardedPool.getResource();
399-
return jedis.hdel(key.getBytes(REDIS_CHARSET), field.getBytes(REDIS_CHARSET));
403+
return jedis.hdel(hkey.getBytes(REDIS_CHARSET), field.getBytes(REDIS_CHARSET));
400404
} catch (IOException e) {
401405
throw new JedisConnectionException(e);
402406
} finally {
@@ -444,6 +448,59 @@ public java.util.Map<byte[], byte[]> hgetAll(String hkey) {
444448
}
445449
}
446450

451+
public String hmset(String hkey, java.util.Map<String, String> hash) {
452+
if (_pool != null) {
453+
Jedis jedis = null;
454+
try {
455+
jedis = _pool.getResource();
456+
457+
Map<byte[], byte[]> bhash = new HashMap<byte[], byte[]>(hash.size());
458+
Map.Entry<String, String> entry;
459+
Iterator<Map.Entry<String, String>> iterator = hash.entrySet().iterator();
460+
while (iterator.hasNext()) {
461+
entry = iterator.next();
462+
bhash.put(entry.getKey().getBytes(REDIS_CHARSET), entry.getValue().getBytes(REDIS_CHARSET));
463+
}
464+
465+
return jedis.hmset(hkey.getBytes(REDIS_CHARSET), bhash);
466+
} catch (IOException e) {
467+
throw new JedisConnectionException(e);
468+
} finally {
469+
if (jedis != null) {
470+
try {
471+
_pool.returnResource(jedis);
472+
} catch (Throwable thex) {
473+
}
474+
}
475+
}
476+
} else {
477+
ShardedJedis jedis = null;
478+
try {
479+
jedis = _shardedPool.getResource();
480+
481+
Map<byte[], byte[]> bhash = new HashMap<byte[], byte[]>(hash.size());
482+
Map.Entry<String, String> entry;
483+
Iterator<Map.Entry<String, String>> iterator = hash.entrySet().iterator();
484+
while (iterator.hasNext()) {
485+
entry = iterator.next();
486+
bhash.put(entry.getKey().getBytes(REDIS_CHARSET), entry.getValue().getBytes(REDIS_CHARSET));
487+
}
488+
489+
return jedis.hmset(hkey.getBytes(REDIS_CHARSET), bhash);
490+
} catch (IOException e) {
491+
throw new JedisConnectionException(e);
492+
} finally {
493+
if (jedis != null) {
494+
try {
495+
_shardedPool.returnResource(jedis);
496+
} catch (Throwable thex) {
497+
}
498+
}
499+
}
500+
}
501+
502+
}
503+
447504
public Long hlen(String hkey) {
448505
if (_pool != null) {
449506
Jedis jedis = null;
@@ -806,4 +863,107 @@ public Long lrem(String lkey, int count, String value) {
806863
}
807864
}
808865
}
866+
867+
//Set操作
868+
public Boolean sismember(String skey, String member) {
869+
if (_pool != null) {
870+
Jedis jedis = null;
871+
try {
872+
jedis = _pool.getResource();
873+
return jedis.sismember(skey.getBytes(REDIS_CHARSET), member.getBytes(REDIS_CHARSET));
874+
} catch (IOException e) {
875+
throw new JedisConnectionException(e);
876+
} finally {
877+
if (jedis != null) {
878+
try {
879+
_pool.returnResource(jedis);
880+
} catch (Throwable thex) {
881+
}
882+
}
883+
}
884+
} else {
885+
ShardedJedis jedis = null;
886+
try {
887+
jedis = _shardedPool.getResource();
888+
return jedis.sismember(skey.getBytes(REDIS_CHARSET), member.getBytes(REDIS_CHARSET));
889+
} catch (IOException e) {
890+
throw new JedisConnectionException(e);
891+
} finally {
892+
if (jedis != null) {
893+
try {
894+
_shardedPool.returnResource(jedis);
895+
} catch (Throwable thex) {
896+
}
897+
}
898+
}
899+
}
900+
}
901+
902+
public Long sadd(String skey, String member) {
903+
if (_pool != null) {
904+
Jedis jedis = null;
905+
try {
906+
jedis = _pool.getResource();
907+
return jedis.sadd(skey.getBytes(REDIS_CHARSET), member.getBytes(REDIS_CHARSET));
908+
} catch (IOException e) {
909+
throw new JedisConnectionException(e);
910+
} finally {
911+
if (jedis != null) {
912+
try {
913+
_pool.returnResource(jedis);
914+
} catch (Throwable thex) {
915+
}
916+
}
917+
}
918+
} else {
919+
ShardedJedis jedis = null;
920+
try {
921+
jedis = _shardedPool.getResource();
922+
return jedis.sadd(skey.getBytes(REDIS_CHARSET), member.getBytes(REDIS_CHARSET));
923+
} catch (IOException e) {
924+
throw new JedisConnectionException(e);
925+
} finally {
926+
if (jedis != null) {
927+
try {
928+
_shardedPool.returnResource(jedis);
929+
} catch (Throwable thex) {
930+
}
931+
}
932+
}
933+
}
934+
}
935+
936+
public Long srem(String skey, String member) {
937+
if (_pool != null) {
938+
Jedis jedis = null;
939+
try {
940+
jedis = _pool.getResource();
941+
return jedis.srem(skey.getBytes(REDIS_CHARSET), member.getBytes(REDIS_CHARSET));
942+
} catch (IOException e) {
943+
throw new JedisConnectionException(e);
944+
} finally {
945+
if (jedis != null) {
946+
try {
947+
_pool.returnResource(jedis);
948+
} catch (Throwable thex) {
949+
}
950+
}
951+
}
952+
} else {
953+
ShardedJedis jedis = null;
954+
try {
955+
jedis = _shardedPool.getResource();
956+
return jedis.srem(skey.getBytes(REDIS_CHARSET), member.getBytes(REDIS_CHARSET));
957+
} catch (IOException e) {
958+
throw new JedisConnectionException(e);
959+
} finally {
960+
if (jedis != null) {
961+
try {
962+
_shardedPool.returnResource(jedis);
963+
} catch (Throwable thex) {
964+
}
965+
}
966+
}
967+
}
968+
}
809969
}

web-app/WEB-INF/lib/pushlet.jar

1.9 KB
Binary file not shown.

0 commit comments

Comments
 (0)