@@ -8,211 +8,177 @@ const Adapter = require('socket.io-adapter');
88const Emitter = require ( 'events' ) . EventEmitter ;
99const Promise = require ( 'bluebird' ) ;
1010
11- /**
12- * Module exports.
13- */
14-
15- module . exports = adapter ;
16-
1711/**
1812 * Returns a redis Adapter class.
1913 *
20- * @param {String } optional, redis uri
21- * @return { RedisAdapter } adapter
14+ * @param {{pubClient: Promise.Redis, subClient: Promise.Redis, key: String, subEvent: String, uid2: integer} } opts
15+ * @returns { {new(*=): {onmessage: (function(String, String)), delAll: (function(String, Function)), add: (function(String, String, Function)), del: (function(String, String, Function)), broadcast: (function(Object, Object, Boolean))}} }
2216 * @api public
17+ * @export
2318 */
2419
25- function adapter ( uri , opts ) {
26- opts = opts || { } ;
27-
28- // handle options only
29- if ( 'object' == typeof uri ) {
30- opts = uri ;
31- uri = null ;
32- }
33-
34- // opts
20+ function adapter ( opts = { } ) {
3521 const pub = opts . pubClient ;
3622 const sub = opts . subClient ;
3723 const prefix = opts . key || 'socket.io' ;
3824 const subEvent = opts . subEvent || 'message' ;
39-
4025
4126 if ( ! pub ) throw new Error ( [ " No Pub specified" ] ) ;
4227 if ( ! sub ) throw new Error ( [ " No Sub specified" ] ) ;
4328
4429 // this server's key
4530 const uid = opts . uid2 ? uid2 ( opts . uid2 ) : uid2 ( 6 ) ;
46-
47- /**
48- * Adapter constructor.
49- *
50- * @param {String } namespace name
51- * @api public
52- */
53-
54- function Redis ( nsp ) {
55- Adapter . call ( this , nsp ) ;
56-
57- this . uid = uid ;
58- this . prefix = prefix ;
59- this . channel = prefix + '#' + nsp . name + '#' ;
60- this . channelMatches = ( messageChannel , subscribedChannel ) => messageChannel . startsWith ( subscribedChannel ) ;
61- this . pubClient = pub ;
62- this . subClient = sub ;
63-
64- sub . subscribe ( this . channel , ( err ) => {
65- if ( err ) this . emit ( 'error' , err ) ;
66- } ) ;
67- sub . on ( subEvent , this . onmessage . bind ( this ) ) ;
68- }
69-
70- /**
71- * Inherits from `Adapter`.
31+ /** Create Redis adapter
32+ * @constructor
33+ * @type {{new(*=): {onmessage: (function(String, String)), delAll: (function(String, Function)), add: (function(String, String, Function)), del: (function(String, String, Function)), broadcast: (function(Object, Object, Boolean))}} }
7234 */
35+ const Redis = class extends Adapter {
36+ constructor ( nsp ) {
37+ super ( nsp ) ;
38+ this . uid = uid ;
39+ this . prefix = prefix ;
40+ this . channel = prefix + '#' + nsp . name + '#' ;
41+ this . channelMatches = ( messageChannel , subscribedChannel ) => messageChannel . startsWith ( subscribedChannel ) ;
42+ this . pubClient = pub ;
43+ this . subClient = sub ;
44+
45+ sub . subscribe ( this . channel , ( err ) => {
46+ if ( err ) this . emit ( 'error' , err ) ;
47+ } ) ;
48+ sub . on ( subEvent , this . onmessage . bind ( this ) ) ;
49+ }
50+ /** Called with a subscription message
51+ * @param {String } channel
52+ * @param {String } msg
53+ * @api private
54+ */
55+ onmessage ( channel , msg ) {
56+ if ( ! this . channelMatches ( channel . toString ( ) , this . channel ) ) {
57+ return ;
58+ }
59+ const args = JSON . parse ( msg ) ;
7360
74- Redis . prototype . __proto__ = Adapter . prototype ;
61+ if ( uid == args . shift ( ) ) return ;
7562
76- /**
77- * Called with a subscription message
78- *
79- * @api private
80- */
63+ const packet = args [ 0 ] ;
8164
82- Redis . prototype . onmessage = function ( channel , msg ) {
83- if ( ! this . channelMatches ( channel . toString ( ) , this . channel ) ) {
84- return ;
85- }
86- const args = JSON . parse ( msg ) ;
87-
88- if ( uid == args . shift ( ) ) return ;
89-
90- const packet = args [ 0 ] ;
91-
92- if ( packet && packet . nsp === undefined ) {
93- packet . nsp = '/' ;
94- }
65+ if ( packet && packet . nsp === undefined ) {
66+ packet . nsp = '/' ;
67+ }
9568
96- if ( ! packet || packet . nsp != this . nsp . name ) {
69+ if ( ! packet || packet . nsp != this . nsp . name ) {
9770 return ;
98- }
99- args . push ( true ) ;
100-
101- this . broadcast . apply ( this , args ) ;
102- } ;
103-
104- /**
105- * Broadcasts a packet.
106- *
107- * @param {Object } packet to emit
108- * @param {Object } options
109- * @param {Boolean } whether the packet came from another node
110- * @api public
111- */
112-
113- Redis . prototype . broadcast = function ( packet , opts , remote ) {
114- const newPacket = Object . assign ( { } , packet ) ;
115- Adapter . prototype . broadcast . call ( this , packet , opts ) ;
116- newPacket . nsp = packet . nsp ;
117- newPacket . type = packet . type ;
118- if ( ! remote ) {
119- const chn = this . prefix + '#' + newPacket . nsp + '#' ;
120- const msg = JSON . stringify ( [ uid , newPacket , opts ] ) ;
121- if ( opts . rooms ) {
122- opts . rooms . map ( ( room ) => {
123- const chnRoom = chn + room + '#' ;
124- pub . publish ( chnRoom , msg ) ;
125- } ) ;
126- } else {
127- pub . publish ( chn , msg ) ;
12871 }
72+ args . push ( true ) ;
73+
74+ this . broadcast . apply ( this , args ) ;
12975 }
130- } ;
131-
132- /**
133- * Subscribe client to room messages.
134- *
135- * @param {String } client id
136- * @param {String } room
137- * @param {Function } callback (optional)
138- * @api public
139- */
14076
141- Redis . prototype . add = function ( id , room , fn ) {
142- Adapter . prototype . add . call ( this , id , room ) ;
143- const channel = this . prefix + '#' + this . nsp . name + '#' + room + '#' ;
144- sub . subscribe ( channel , ( err ) => {
145- if ( err ) {
146- this . emit ( 'error' , err ) ;
147- if ( fn ) fn ( err ) ;
148- return ;
77+ /** Broadcasts a packet.
78+ * @param {Object } packet to emit
79+ * @param {Object } opts
80+ * @param {Boolean } remote whether the packet came from another node
81+ * @api public
82+ */
83+ broadcast ( packet , opts , remote ) {
84+ const newPacket = Object . assign ( { } , packet ) ;
85+ Adapter . prototype . broadcast . call ( this , packet , opts ) ;
86+ newPacket . nsp = packet . nsp ;
87+ newPacket . type = packet . type ;
88+ if ( ! remote ) {
89+ const chn = this . prefix + '#' + newPacket . nsp + '#' ;
90+ const msg = JSON . stringify ( [ uid , newPacket , opts ] ) ;
91+ if ( opts . rooms ) {
92+ opts . rooms . map ( ( room ) => {
93+ const chnRoom = chn + room + '#' ;
94+ pub . publish ( chnRoom , msg ) ;
95+ } ) ;
96+ } else {
97+ pub . publish ( chn , msg ) ;
98+ }
14999 }
150- if ( fn ) fn ( null ) ;
151- } ) ;
152- } ;
153-
154- /**
155- * Unsubscribe client from room messages.
156- *
157- * @param {String } session id
158- * @param {String } room id
159- * @param {Function } callback (optional)
160- * @api public
161- */
162-
163- Redis . prototype . del = function ( id , room , fn ) {
164- const hasRoom = Object . keys ( this . rooms ) . includes ( room ) ; // this.rooms.hasOwnProperty(room);
165- Adapter . prototype . del . call ( this , id , room ) ;
100+ }
166101
167- if ( hasRoom && ! this . rooms [ room ] ) {
102+ /** Subscribe client to room messages.
103+ * @param {String } id client id
104+ * @param {String } room
105+ * @param {Function } fn callback (optional)
106+ * @api public
107+ */
108+ add ( id , room , fn ) {
109+ Adapter . prototype . add . call ( this , id , room ) ;
168110 const channel = this . prefix + '#' + this . nsp . name + '#' + room + '#' ;
169- sub . unsubscribe ( channel , ( err ) => {
111+ sub . subscribe ( channel , ( err ) => {
170112 if ( err ) {
171113 this . emit ( 'error' , err ) ;
172114 if ( fn ) fn ( err ) ;
173115 return ;
174116 }
175117 if ( fn ) fn ( null ) ;
176118 } ) ;
177- } else {
178- if ( fn ) process . nextTick ( fn . bind ( null , null ) ) ;
179119 }
180- } ;
181-
182- /**
183- * Unsubscribe client completely.
184- *
185- * @param {String } client id
186- * @param {Function } callback (optional)
187- * @api public
188- */
189-
190- Redis . prototype . delAll = function ( id , fn ) {
191- const rooms = this . sids [ id ] ;
192120
193- if ( ! rooms ) {
194- if ( fn ) process . nextTick ( fn . bind ( null , null ) ) ;
195- return ;
121+ /** Unsubscribe client from room messages.
122+ * @param {String } id session id
123+ * @param {String } room id
124+ * @param {Function } fn callback (optional)
125+ * @api public
126+ */
127+ del ( id , room , fn ) {
128+ const hasRoom = Object . keys ( this . rooms ) . includes ( room ) ; // this.rooms.hasOwnProperty(room);
129+ Adapter . prototype . del . call ( this , id , room ) ;
130+
131+ if ( hasRoom && ! this . rooms [ room ] ) {
132+ const channel = this . prefix + '#' + this . nsp . name + '#' + room + '#' ;
133+ sub . unsubscribe ( channel , ( err ) => {
134+ if ( err ) {
135+ this . emit ( 'error' , err ) ;
136+ if ( fn ) fn ( err ) ;
137+ return ;
138+ }
139+ if ( fn ) fn ( null ) ;
140+ } ) ;
141+ } else {
142+ if ( fn ) process . nextTick ( fn . bind ( null , null ) ) ;
143+ }
196144 }
197145
198- Promise . map ( Object . keys ( rooms ) , ( room ) => {
199- this . del ( id , room , ( ) => {
200- delete this . sids [ id ] ;
201- if ( fn ) fn ( null ) ;
202- } ) ;
203- } , { concurrency : Infinity } )
204- . catch ( ( err ) => {
205- this . emit ( 'error' , err ) ;
206- if ( fn ) fn ( err ) ;
207- } ) ;
146+ /** Unsubscribe client completely.
147+ * @param {String } id client id
148+ * @param {Function } fn callback (optional)
149+ * @api public
150+ */
208151
209- } ;
152+ delAll ( id , fn ) {
153+ const rooms = this . sids [ id ] ;
210154
155+ if ( ! rooms ) {
156+ if ( fn ) process . nextTick ( fn . bind ( null , null ) ) ;
157+ return ;
158+ }
159+ const room_keys = Object . keys ( rooms )
160+ Promise . map ( room_keys , ( room ) => {
161+ this . del ( id , room , ( ) => {
162+ delete this . sids [ id ] ;
163+ if ( fn ) fn ( null ) ;
164+ } ) ;
165+ } , { concurrency : room_keys . length } )
166+ . catch ( ( err ) => {
167+ this . emit ( 'error' , err ) ;
168+ if ( fn ) fn ( err ) ;
169+ } ) ;
170+
171+ }
172+ }
211173 Redis . uid = uid ;
212174 Redis . pubClient = pub ;
213175 Redis . subClient = sub ;
214176 Redis . prefix = prefix ;
215177
216178 return Redis ;
217-
218179}
180+ /**
181+ * Module exports.
182+ */
183+
184+ module . exports = adapter ;
0 commit comments