Skip to content

Commit a893646

Browse files
Paul DevinePaul Devine
authored andcommitted
initial version playing with structure
1 parent a00afb7 commit a893646

File tree

6 files changed

+576
-147
lines changed

6 files changed

+576
-147
lines changed

.nvm

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
8.7.0

lib/bus.js

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,38 @@
1-
var util = require("util");
2-
var queue = require("node-resque").queue;
3-
var utils = require(__dirname + '/sections/utils');
4-
5-
var bus = function(options, jobs){
6-
queue.call(this, options, jobs);
7-
8-
var busDefaults = this.busDefaults();
9-
for(var i in busDefaults){
10-
if(this.options[i] === undefined){
11-
this.options[i] = busDefaults[i];
12-
}
13-
}
14-
};
1+
const util = require("util");
2+
const Queue = require('node-resque').Queue;
3+
const utils = require('./sections/utils');
4+
5+
class Bus extends Queue {
6+
constructor (options, jobs) {
7+
super()
8+
if (!jobs) { jobs = {} }
9+
10+
this.options = options;
1511

16-
util.inherits(bus, queue);
12+
var busDefaults = utils.defaults;
13+
for(var i in busDefaults){
14+
if(this.options[i] === undefined){
15+
this.options[i] = busDefaults[i];
16+
}
17+
}
1718

18-
bus.prototype.busDefaults = utils.defaults;
19+
this.jobs = jobs;
1920

20-
bus.prototype.subscriptions = require(__dirname + '/sections/subscriptions.js').subscriptions;
21-
bus.prototype.subscribe = require(__dirname + '/sections/subscriptions.js').subscribe;
22-
bus.prototype.unsubscribe = require(__dirname + '/sections/subscriptions.js').unsubscribe;
23-
bus.prototype.unsubscribeAll = require(__dirname + '/sections/subscriptions.js').unsubscribeAll;
21+
this.connection = new Connection(this.options.connection);
22+
this.connection.on('error', (error) => { this.emit('error', error) })
2423

25-
bus.prototype.publish = require(__dirname + '/sections/publish.js').publish;
26-
bus.prototype.publishAt = require(__dirname + '/sections/publish.js').publishAt;
27-
bus.prototype.publishIn = require(__dirname + '/sections/publish.js').publishIn;
24+
this.subscriptions = require(__dirname + '/sections/subscriptions.js').subscriptions;
25+
this.subscriptions = require(__dirname + '/sections/subscriptions.js').subscriptions;
26+
this.subscribe = require(__dirname + '/sections/subscriptions.js').subscribe;
27+
this.unsubscribe = require(__dirname + '/sections/subscriptions.js').unsubscribe;
28+
this.unsubscribeAll = require(__dirname + '/sections/subscriptions.js').unsubscribeAll;
2829

29-
bus.prototype.publishHeartbeat = require(__dirname + '/sections/publish.js').publishHeartbeat;
30+
this.publish = require(__dirname + '/sections/publish.js').publish;
31+
this.publishAt = require(__dirname + '/sections/publish.js').publishAt;
32+
this.publishIn = require(__dirname + '/sections/publish.js').publishIn;
33+
34+
this.publishHeartbeat = require(__dirname + '/sections/publish.js').publishHeartbeat;
35+
}
36+
}
3037

31-
exports.bus = bus;
38+
exports.Bus = Bus;

lib/rider.js

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,41 @@
1-
var util = require("util");
2-
var worker = require("node-resque").worker;
3-
var utils = require(__dirname + '/sections/utils.js');
4-
var bus = require(__dirname + '/bus.js').bus;
5-
6-
var rider = function(options, jobs){
7-
worker.call(this, options, jobs);
8-
9-
var busClassKey = this.busDefaults().busClassKey;
10-
this.jobs[busClassKey] = this.busJob();
11-
12-
this.bus = new bus(options, jobs);
13-
14-
var busDefaults = this.busDefaults();
15-
for(var i in busDefaults){
16-
if(this.options[i] === undefined){
17-
this.options[i] = busDefaults[i];
1+
const util = require("util");
2+
const Worker = require("node-resque").Worker;
3+
const utils = require(__dirname + '/sections/utils.js');
4+
const Bus = require(__dirname + '/bus.js').Bus;
5+
6+
class Rider extends Worker {
7+
constructor (options, jobs) {
8+
super()
9+
if (!jobs) { jobs = {} }
10+
11+
this.options = options;
12+
13+
let busClassKey = this.busDefaults().busClassKey;
14+
this.jobs[busClassKey] = this.busJob();
15+
16+
this.bus = new Bus(options, jobs);
17+
18+
let busDefaults = utils.defaults;
19+
for(var i in busDefaults){
20+
if(this.options[i] === undefined){
21+
this.options[i] = busDefaults[i];
22+
}
23+
}
24+
25+
if(this.options.toDrive === true){
26+
if(this.queues instanceof Array && this.queues.indexOf(this.options.incomingQueue) < 0){
27+
this.queues.push( this.options.incomingQueue );
28+
}
1829
}
19-
}
2030

21-
if(this.options.toDrive === true){
22-
if(this.queues instanceof Array && this.queues.indexOf(this.options.incomingQueue) < 0){
23-
this.queues.push( this.options.incomingQueue );
24-
}
31+
this.bus.on('error', (error, queue, job) => { self.emit(error); });
32+
this.busJob = require(__dirname + '/sections/driver.js').busJob;
2533
}
2634

27-
this.bus.connect(function(error) {
28-
if (error) {
29-
throw error;
30-
}
31-
});
32-
var self = this;
33-
this.bus.on('error', function(error){
34-
self.emit(error);
35-
});
36-
};
37-
38-
util.inherits(rider, worker);
35+
async connect() {
36+
await this.bus.connect();
37+
}
3938

40-
rider.prototype.busDefaults = utils.defaults;
41-
rider.prototype.busJob = require(__dirname + '/sections/driver.js').busJob;
39+
}
4240

43-
exports.rider = rider;
41+
exports.Rider = Rider;

lib/sections/subscriptions.js

Lines changed: 81 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,96 +1,96 @@
1-
var utils = require(__dirname + '/utils.js');
1+
const utils = require(__dirname + '/utils.js');
22

3-
var subscriptions = function(callback){
4-
var self = this;
5-
var subscriptions = {};
6-
var count = 0;
3+
class Subscriptions {
4+
subscriptions(callback){
5+
var self = this;
6+
var subscriptions = {};
7+
var count = 0;
78

8-
var now = new Date().getTime();
9-
self.connection.redis.smembers(fullSubscriptionSet(self), function(err, applications){
10-
if(err || applications.length === 0){
11-
callback(err, subscriptions, count);
12-
}else{
13-
var started = 0;
14-
applications.forEach(function(app){
15-
started++;
16-
self.connection.redis.hgetall(fullAppPrefix(self) + app , function(err, subscription){
17-
for(var i in subscription){
18-
if(subscriptions[app] === null || subscriptions[app] === undefined){ subscriptions[app] = {}; }
19-
subscriptions[app][i] = JSON.parse(subscription[i]);
20-
count++;
21-
}
22-
started--;
23-
if(started === 0){
24-
callback(err, subscriptions, count);
25-
}
9+
var now = new Date().getTime();
10+
self.connection.redis.smembers(fullSubscriptionSet(self), function(err, applications){
11+
if(err || applications.length === 0){
12+
callback(err, subscriptions, count);
13+
}else{
14+
var started = 0;
15+
applications.forEach(function(app){
16+
started++;
17+
self.connection.redis.hgetall(fullAppPrefix(self) + app , function(err, subscription){
18+
for(var i in subscription){
19+
if(subscriptions[app] === null || subscriptions[app] === undefined){ subscriptions[app] = {}; }
20+
subscriptions[app][i] = JSON.parse(subscription[i]);
21+
count++;
22+
}
23+
started--;
24+
if(started === 0){
25+
callback(err, subscriptions, count);
26+
}
27+
});
2628
});
27-
});
28-
}
29-
});
30-
};
31-
32-
var fullAppPrefix = function(self){
33-
return self.options.connection.namespace + self.options.appPrefix;
34-
};
29+
}
30+
});
31+
};
3532

36-
var fullSubscriptionSet = function(self){
37-
return self.options.connection.namespace + self.options.subscriptionSet;
38-
};
33+
var fullAppPrefix = function(self){
34+
return self.options.connection.namespace + self.options.appPrefix;
35+
};
3936

40-
var rubyizeMatcher = function(matcher){
41-
for(var i in matcher){
42-
matcher[i] = utils.toRubyRegExp(matcher[i]);
43-
}
44-
return matcher;
45-
};
37+
var fullSubscriptionSet = function(self){
38+
return self.options.connection.namespace + self.options.subscriptionSet;
39+
};
4640

47-
var subscribe = function(appKey, priority, job, matcher, callback){
48-
var self = this;
49-
appKey = utils.normalize(appKey);
50-
var key = utils.hashKey(appKey, priority, job);
51-
var combined_queue_name = appKey + "_" + priority;
52-
var subscription = {
53-
queue_name : combined_queue_name,
54-
key : key,
55-
class : job,
56-
matcher : rubyizeMatcher(matcher)
41+
var rubyizeMatcher = function(matcher){
42+
for(var i in matcher){
43+
matcher[i] = utils.toRubyRegExp(matcher[i]);
44+
}
45+
return matcher;
5746
};
5847

59-
self.connection.redis.hset(fullAppPrefix(self) + appKey, key, JSON.stringify(subscription), function(err){
60-
self.connection.redis.sadd(fullSubscriptionSet(self), appKey, function(err){
61-
if(typeof callback === 'function'){ callback(err, combined_queue_name); }
48+
var subscribe = function(appKey, priority, job, matcher, callback){
49+
var self = this;
50+
appKey = utils.normalize(appKey);
51+
var key = utils.hashKey(appKey, priority, job);
52+
var combined_queue_name = appKey + "_" + priority;
53+
var subscription = {
54+
queue_name : combined_queue_name,
55+
key : key,
56+
class : job,
57+
matcher : rubyizeMatcher(matcher)
58+
};
59+
60+
self.connection.redis.hset(fullAppPrefix(self) + appKey, key, JSON.stringify(subscription), function(err){
61+
self.connection.redis.sadd(fullSubscriptionSet(self), appKey, function(err){
62+
if(typeof callback === 'function'){ callback(err, combined_queue_name); }
63+
});
6264
});
63-
});
64-
};
65+
};
6566

66-
var unsubscribe = function(appKey, priority, job, callback){
67-
var self = this;
68-
appKey = utils.normalize(appKey);
69-
var key = utils.hashKey(appKey, priority, job);
70-
self.connection.redis.hdel(fullAppPrefix(self) + appKey, key, function(err){
71-
self.connection.redis.hkeys(fullAppPrefix(self) + appKey, function(err, keys){
72-
if(keys.length === 0){
73-
self.unsubscribeAll(appKey, function(){
67+
var unsubscribe = function(appKey, priority, job, callback){
68+
var self = this;
69+
appKey = utils.normalize(appKey);
70+
var key = utils.hashKey(appKey, priority, job);
71+
self.connection.redis.hdel(fullAppPrefix(self) + appKey, key, function(err){
72+
self.connection.redis.hkeys(fullAppPrefix(self) + appKey, function(err, keys){
73+
if(keys.length === 0){
74+
self.unsubscribeAll(appKey, function(){
75+
if(typeof callback === 'function'){ callback(); }
76+
});
77+
}else{
7478
if(typeof callback === 'function'){ callback(); }
75-
});
76-
}else{
77-
if(typeof callback === 'function'){ callback(); }
78-
}
79+
}
80+
});
7981
});
80-
});
81-
};
82+
};
8283

83-
var unsubscribeAll = function(appKey, callback){
84-
var self = this;
85-
appKey = utils.normalize(appKey);
86-
self.connection.redis.srem(fullSubscriptionSet(self), appKey, function(err){
87-
self.connection.redis.del(fullAppPrefix(self) + appKey, function(err){
88-
if(typeof callback === 'function'){ callback(); }
84+
var unsubscribeAll = function(appKey, callback){
85+
var self = this;
86+
appKey = utils.normalize(appKey);
87+
self.connection.redis.srem(fullSubscriptionSet(self), appKey, function(err){
88+
self.connection.redis.del(fullAppPrefix(self) + appKey, function(err){
89+
if(typeof callback === 'function'){ callback(); }
90+
});
8991
});
90-
});
91-
};
92+
};
93+
94+
}
95+
exports.Subscriptions = Subscriptions;
9296

93-
exports.subscriptions = subscriptions;
94-
exports.subscribe = subscribe;
95-
exports.unsubscribe = unsubscribe;
96-
exports.unsubscribeAll = unsubscribeAll;

0 commit comments

Comments
 (0)