From efa2a46a486ed11a99e084cec71a318750460245 Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Thu, 8 Mar 2012 05:35:20 -0800 Subject: [PATCH 1/8] clearing out cluster coordinator stuff --- config/cluster.conf.py | 27 ------ config/gateway.conf.py | 1 - gtutorial/cluster.py | 195 ------------------------------------- gtutorial/cluster_.py | 211 ----------------------------------------- gtutorial/gateway.py | 5 +- 5 files changed, 1 insertion(+), 438 deletions(-) delete mode 100644 config/cluster.conf.py delete mode 100644 gtutorial/cluster.py delete mode 100644 gtutorial/cluster_.py diff --git a/config/cluster.conf.py b/config/cluster.conf.py deleted file mode 100644 index 74d591c..0000000 --- a/config/cluster.conf.py +++ /dev/null @@ -1,27 +0,0 @@ -def service(): - import sys, logging, os - from ginkgo.core import Service - from gtutorial.cluster import ClusterCoordinator - - logger = logging.getLogger(__name__) - - class ClusterTest(Service): - def __init__(self, identity, leader=None): - self.cluster = ClusterCoordinator(identity, leader) - - self.add_service(self.cluster) - - def do_start(self): - def show_cluster(add=None, remove=None): - logger.info(self.cluster.set) - self.cluster.set.attach(show_cluster) - logger.info(self.cluster.set) - self.spawn(self.wait_for_promotion) - - def wait_for_promotion(self): - self.cluster.wait_for_promotion() - logger.info("Promoted to leader") - - return ClusterTest( - os.environ.get("IDENTITY", "127.0.0.1"), - os.environ.get("LEADER")) diff --git a/config/gateway.conf.py b/config/gateway.conf.py index 3ae2062..a1e5857 100644 --- a/config/gateway.conf.py +++ b/config/gateway.conf.py @@ -1,7 +1,6 @@ import os identity = os.environ.get('IDENTITY', '127.0.0.1') -leader = os.environ.get('LEADER') cluster = os.environ.get('CLUSTER', '127.0.0.1').split(',') def service(): diff --git a/gtutorial/cluster.py b/gtutorial/cluster.py deleted file mode 100644 index b6bd42b..0000000 --- a/gtutorial/cluster.py +++ /dev/null @@ -1,195 +0,0 @@ -import logging -import time -import json - -import gevent -from gevent import Timeout -import gevent.server -from gevent.event import Event -from gevent_zeromq import zmq - -from ginkgo.core import Service, autospawn, NOT_READY -from ginkgo.config import Setting -from ginkgo import util - -from .util import ObservableSet - -CLIENT_TIMEOUT_SECONDS = 10 -SERVER_KEEPALIVE_SECONDS = 5 - -logger = logging.getLogger(__name__) - -class ClusterError(Exception): pass -class NewLeader(Exception): pass - -class ClusterCoordinator(Service): - port = Setting('cluster_port', default=4440) - - def __init__(self, identity, leader=None, cluster=None): - leader = leader or identity - self.server = PeerServer(self, identity) - self.client = PeerClient(self, leader, identity) - self.set = cluster or ObservableSet() - self.promoted = Event() - - self.add_service(self.server) - if leader != identity: - self.add_service(self.client) - self.is_leader = False - else: - self.is_leader = True - - def wait_for_promotion(self): - self.promoted.wait() - - @property - def leader(self): - return self.client.leader - - @property - def identity(self): - return self.client.identity - -class PeerServer(Service): - def __init__(self, coordinator, identity): - self.c = coordinator - self.identity = identity - self.clients = {} - self.server = gevent.server.StreamServer((identity, self.c.port), - handle=self.handle, spawn=self.spawn) - - self.add_service(self.server) - - def do_start(self): - if self.c.is_leader: - self.c.set.add(self.identity) - - def handle(self, socket, address): - """ - If not a leader, a node will simply return a single item list pointing - to the leader. Otherwise, it will add the host of the connected client - to the cluster roster, broadcast to all nodes the new roster, and wait - for keepalives. If no keepalive within timeout or the client drops, it - drops it from the roster and broadcasts to all remaining nodes. - """ - if not self.c.is_leader: - socket.send(json.dumps({'leader': self.c.client.leader, - 'port': self.c.port})) - socket.close() - logger.debug("Redirected to %s:%s" % (self.c.client.leader, self.c.port)) - else: - socket.send(self._cluster_message()) - sockfile = socket.makefile() - name = sockfile.readline() - if not name: - return - if name == '\n': - name = address[0] - else: - name = name.strip() - logger.debug('New connection from %s' % name) - self._update(add={'host': name, 'socket': socket}) - # TODO: Use TCP keepalives - timeout = self._client_timeout(socket) - for line in util.line_protocol(sockfile, strip=False): - timeout.kill() - timeout = self._client_timeout(socket) - socket.send('\n') - #logger.debug("Keepalive from %s:%s" % address) - #logger.debug("Client disconnected from %s:%s" % address) - self._update(remove=name) - - def _client_timeout(self, socket): - def shutdown(socket): - try: - socket.shutdown(0) - except IOError: - pass - return self.spawn_later(CLIENT_TIMEOUT_SECONDS, - lambda: shutdown(socket)) - - def _cluster_message(self): - return '%s\n' % json.dumps({'cluster': list(self.c.set)}) - - def _update(self, add=None, remove=None): - """ Used by leader to manage and broadcast roster """ - if add is not None: - self.c.set.add(add['host']) - self.clients[add['host']] = add['socket'] - #logger.debug("Added to cluster: %s" % add['host']) - if remove is not None: - self.c.set.remove(remove) - del self.clients[remove] - #logger.debug("Removed from cluster: %s" % remove) - for client in self.clients: - self.clients[client].send(self._cluster_message()) - - -class PeerClient(Service): - def __init__(self, coordinator, leader, identity): - self.c = coordinator - self.leader = leader - self.identity = identity - - def do_start(self): - self.spawn(self.connect) - return NOT_READY - - def connect(self): - while self.leader != self.identity: - address = (self.leader, self.c.port) - logger.debug("Connecting to leader at %s:%s" % address) - try: - socket = util.connect_and_retry(address, max_retries=5) - except IOError: - raise ClusterError("Unable to connect to leader %s:%s" % - address) - self.handle(socket) - - def handle(self, socket): - self.set_ready() - #logger.debug("Connected to leader") - client_address = self.identity or socket.getsockname()[0] - socket.send('%s\n' % client_address) - # TODO: Use TCP keepalives - keepalive = self._server_keepalive(socket) - try: - for line in util.line_protocol(socket, strip=False): - if line == '\n': - # Keepalive ack from leader - keepalive.kill() - keepalive = self._server_keepalive(socket) - else: - cluster = json.loads(line) - if 'leader' in cluster: - # Means you have the wrong leader, redirect - self.leader = cluster['leader'] - logger.info("Redirected to %s:%s..." % - (self.leader, self.c.port)) - raise NewLeader() - elif client_address in cluster['cluster']: - # Only report cluster once I'm a member - self.c.set.replace(set(cluster['cluster'])) - self.c.set.remove(self.leader) - self._leader_election() - except NewLeader: - #self.manager.trigger_callback() - if self.leader == client_address: - self.c.is_leader = True - self.c.promoted.set() - self.stop() # doesn't work - else: - return - - def _server_keepalive(self, socket): - return self.spawn_later(SERVER_KEEPALIVE_SECONDS, - lambda: socket.send('\n')) - - def _leader_election(self): - candidates = list(self.c.set) - candidates.sort() - self.leader = candidates[0] - logger.info("New leader %s:%s..." % (self.leader, self.c.port)) - # TODO: if i end up thinking i'm the leader when i'm not - # then i will not rejoin the cluster - raise NewLeader() diff --git a/gtutorial/cluster_.py b/gtutorial/cluster_.py deleted file mode 100644 index 633a1aa..0000000 --- a/gtutorial/cluster_.py +++ /dev/null @@ -1,211 +0,0 @@ -import logging -import time - -import gevent -from gevent import Timeout -from gevent.event import Event -from gevent_zeromq import zmq - -from ginkgo.core import Service, autospawn -from ginkgo.config import Setting - -from .util import ObservableSet - -logger = logging.getLogger(__name__) - -class ClusterCoordinator(Service): - updates_port = Setting('cluster_updates_port', default=4440) - heartbeat_port = Setting('cluster_heartbeat_port', default=4441) - greeter_port = Setting('cluster_greeter_port', default=4442) - heartbeat_interval = Setting('cluster_heartbeat_interval_secs', default=2) - - def __init__(self, identity, leader=None, cluster=None, zmq_=None): - self._zmq = zmq_ or zmq.Context() - self._cluster = cluster or ObservableSet() - self._leader = leader or identity - self._identity = identity - self._promoted = Event() - - self._server = PeerServer(self) - self._client = PeerClient(self) - - self._greeter = self._zmq.socket(zmq.REP) - - self.add_service(self._server) - - if self.is_leader: - self._promoted.set() - else: - self.add_service(self._client) - - @property - def is_leader(self): - return self._identity == self._leader - - def wait_for_promotion(self): - self._promoted.wait() - - def do_start(self): - self._cluster.add(self._identity) - self._greeter.bind("tcp://{}:{}".format(self._identity, self.greeter_port)) - self._greet() - - @autospawn - def _greet(self): - while True: - self._greeter.recv() # HELLO - if self.is_leader: - self._greeter.send_multipart(['WELCOME', '']) - else: - response = self.scout(self._leader, 1) - if len(response): - logger.debug("A follower is redirected to {}.".format(self._leader)) - self._greeter.send_multipart(['REDIRECT', self._leader]) - else: - logger.debug("A follower triggers new leader election.") - self._client._next_leader() - self._greeter.send_multipart(['RETRY', '']) - - def scout(self, leader, timeout=2): - scout = self._zmq.socket(zmq.REQ) - scout.connect("tcp://{}:{}".format(leader, self.greeter_port)) - scout.send('HELLO') - response = [] - with Timeout(timeout, False): - response = scout.recv_multipart() - scout.close() - return response - -class PeerClient(Service): - def __init__(self, coordinator): - self.c = coordinator - self._following = Event() - self._listener = None - self._heartbeater = None - - def do_start(self): - self._follow_leader() - self._send_heartbeats() - self._listen_for_updates() - self._poll_leader() - - def _follow_leader(self): - self.c._leader = self._confirm_leader(self.c._leader) - - if self._listener is not None: - self._listener.close() - self._listener = self.c._zmq.socket(zmq.SUB) - self._listener.setsockopt(zmq.SUBSCRIBE, '') - self._listener.connect("tcp://{}:{}".format(self.c._leader, - self.c.updates_port)) - - if self._heartbeater is not None: - self._heartbeater.close() - self._heartbeater = self.c._zmq.socket(zmq.PUSH) - self._heartbeater.connect("tcp://{}:{}".format(self.c._leader, - self.c.heartbeat_port)) - - self._following.set() - - def _confirm_leader(self, leader): - response = self.c.scout(leader) - if len(response): - reply, redirect_address = response - if reply == 'WELCOME': - logger.debug("Leader {} confirmed with warm welcome.".format(leader)) - return leader - elif reply == 'REDIRECT': - logger.debug("Leader {} is not actual leader, redirecting...".format(leader)) - return self._confirm_leader(redirect_address) - elif reply == 'RETRY': - logger.debug("Leader {} is confused, try again...".format(leader)) - return self._confirm_leader(leader) - raise Exception("Unable to confirm leader") - - def _next_leader(self): - self._following.clear() - self.c._cluster.remove(self.c._leader) - candidates = sorted(list(self.c._cluster)) - self.c._leader = candidates[0] - logger.debug("A new leader is decided: {}".format(candidates[0])) - if self.c.is_leader: - self.c._promoted.set() - else: - self._follow_leader() - - @autospawn - def _send_heartbeats(self): - while True: - self._following.wait() - self._heartbeater.send(self.c._identity) - gevent.sleep(self.c.heartbeat_interval) - - @autospawn - def _listen_for_updates(self): - while True: - self._following.wait() - cluster = self._listener.recv_multipart() - logger.debug("Got cluster update") - self.c._cluster.replace(set(cluster)) - - @autospawn - def _poll_leader(self): - while True: - gevent.sleep(self.c.heartbeat_interval) - response = self.c.scout(self.c._leader) - if not response: - self._next_leader() - - -class PeerServer(Service): - def __init__(self, coordinator): - self.c = coordinator - self._updates = self.c._zmq.socket(zmq.PUB) - self._heartbeats = self.c._zmq.socket(zmq.PULL) - self._latest_heartbeats = {} - - def updater(add=None, remove=None): - if add: self._broadcast_cluster() - self.c._cluster.attach(updater) - - - def do_start(self): - self._updates.bind("tcp://{}:{}".format(self.c._identity, self.c.updates_port)) - self._heartbeats.bind("tcp://{}:{}".format(self.c._identity, self.c.heartbeat_port)) - self._send_heartbeats() - self._receive_heartbeats() - self._timeout_peers() - - def _broadcast_cluster(self): - self._updates.send_multipart([self.c._identity] + list(self.c._cluster)) - - @autospawn - def _send_heartbeats(self): - while True: - self.c.wait_for_promotion() - logger.debug("Broadcasting cluster.") - self._broadcast_cluster() - gevent.sleep(self.c.heartbeat_interval) - - @autospawn - def _receive_heartbeats(self): - while True: - self.c.wait_for_promotion() - follower = self._heartbeats.recv() - self.c._cluster.add(follower) # ignored if already added - self._latest_heartbeats[follower] = time.time() - - @autospawn - def _timeout_peers(self): - while True: - to_remove = [] - for follower in self._latest_heartbeats: - time_since_last = time.time() - self._latest_heartbeats[follower] - if time_since_last > self.c.heartbeat_interval * 2: - to_remove.append(follower) - for follower in to_remove: - self.c._cluster.remove(follower) - del self._latest_heartbeats[follower] - gevent.sleep(self.c.heartbeat_interval) - - diff --git a/gtutorial/gateway.py b/gtutorial/gateway.py index f9ebd61..343ec9e 100644 --- a/gtutorial/gateway.py +++ b/gtutorial/gateway.py @@ -9,20 +9,17 @@ from .messaging.hub import MessageHub from .coordination import Announcer from .coordination import Leadership -from .cluster import ClusterCoordinator from .util import ObservableSet logger = logging.getLogger(__name__) class NumberGateway(Service): identity = Setting('identity', default='127.0.0.1') - leader = Setting('leader', default=None) cluster_ = Setting('cluster', default=['127.0.0.1']) def __init__(self): self.client = NumberClient(('127.0.0.1', 7776)) - self.cluster = ClusterCoordinator(self.identity, self.leader) - #self.cluster = Leadership(self.identity, ObservableSet(self.cluster_)) + self.cluster = Leadership(self.identity, ObservableSet(self.cluster_)) self.hub = MessageHub(self.cluster.set, self.identity) self.announcer = Announcer(self.hub, self.cluster) From e08c88f220993c462d17221d3e54898b5ef9d2ae Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Thu, 8 Mar 2012 05:37:06 -0800 Subject: [PATCH 2/8] dropping announcer --- gtutorial/coordination.py | 23 ----------------------- gtutorial/gateway.py | 3 --- 2 files changed, 26 deletions(-) diff --git a/gtutorial/coordination.py b/gtutorial/coordination.py index 4ba4816..d517fae 100644 --- a/gtutorial/coordination.py +++ b/gtutorial/coordination.py @@ -59,26 +59,3 @@ def _listen_for_heartbeats(self): self._next_leader() -class Announcer(Service): - def __init__(self, hub, cluster): - self.hub = hub - self.cluster = cluster - - def do_start(self): - self._announce() - - @autospawn - def _announce(self): - while True: - if self.cluster.identity in self.cluster.set: - cluster_snapshot = sorted(list(self.cluster.set)) - identity_index = cluster_snapshot.index(self.cluster.identity) - announcer_index = int(time.time()) % len(cluster_snapshot) - if announcer_index is identity_index: - if self.cluster.is_leader: - announcement = "{}*".format(self.cluster.identity) - else: - announcement = self.cluster.identity - self.hub.publish("/announce", announcement) - gevent.sleep(1) - diff --git a/gtutorial/gateway.py b/gtutorial/gateway.py index 343ec9e..853c20a 100644 --- a/gtutorial/gateway.py +++ b/gtutorial/gateway.py @@ -7,7 +7,6 @@ from .numbers import NumberClient from .messaging.hub import MessageHub -from .coordination import Announcer from .coordination import Leadership from .util import ObservableSet @@ -21,11 +20,9 @@ def __init__(self): self.client = NumberClient(('127.0.0.1', 7776)) self.cluster = Leadership(self.identity, ObservableSet(self.cluster_)) self.hub = MessageHub(self.cluster.set, self.identity) - self.announcer = Announcer(self.hub, self.cluster) self.add_service(self.cluster) self.add_service(self.hub) - self.add_service(self.announcer) self.add_service(self.client) def do_start(self): From 78d72b28b1ff0efd3ec49d7ab4e80399369b8495 Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Thu, 8 Mar 2012 05:56:26 -0800 Subject: [PATCH 3/8] dropped coordination --- config/messagehub.conf.py | 5 ---- gtutorial/coordination.py | 61 -------------------------------------- gtutorial/gateway.py | 10 ++----- gtutorial/messaging/hub.py | 2 +- 4 files changed, 4 insertions(+), 74 deletions(-) delete mode 100644 gtutorial/coordination.py diff --git a/config/messagehub.conf.py b/config/messagehub.conf.py index 38aa417..e750cf1 100644 --- a/config/messagehub.conf.py +++ b/config/messagehub.conf.py @@ -2,9 +2,4 @@ def service(): from gtutorial.messaging.hub import MessageHub from gtutorial.util import ObservableSet - import sys, logging - logging.basicConfig( - format="%(asctime)s %(levelname) 7s %(module)s: %(message)s", - stream=sys.stdout, - level=logging.DEBUG) return MessageHub(ObservableSet(['127.0.0.1'])) diff --git a/gtutorial/coordination.py b/gtutorial/coordination.py deleted file mode 100644 index d517fae..0000000 --- a/gtutorial/coordination.py +++ /dev/null @@ -1,61 +0,0 @@ -import time -import gevent -from gevent import Timeout -from gevent.event import Event -from gevent_zeromq import zmq - -from ginkgo.core import Service, autospawn -from ginkgo.config import Setting - -class Leadership(Service): - port = Setting('leader_port', default=12345) - heartbeat_interval = Setting('leader_heartbeat_interval_secs', default=3) - - def __init__(self, identity, cluster, zmq_=None): - zmq_ = zmq_ or zmq.Context() - self.identity = identity - self.leader = None - self.set = cluster - self._candidates = sorted(list(cluster)) - self._promoted = Event() - self._broadcaster = zmq_.socket(zmq.PUB) - self._listener = zmq_.socket(zmq.SUB) - self._listener.setsockopt(zmq.SUBSCRIBE, '') - - @property - def is_leader(self): - return self.identity == self.leader - - def wait_for_promotion(self): - self._promoted.wait() - - def do_start(self): - self._broadcaster.bind("tcp://{}:{}".format(self.identity, self.port)) - self._broadcast_when_promoted() - self._listen_for_heartbeats() - self._next_leader() - - def _next_leader(self): - self.leader = self._candidates.pop(0) - if self.is_leader: - self._promoted.set() - else: - self._listener.connect("tcp://{}:{}".format(self.leader, self.port)) - - @autospawn - def _broadcast_when_promoted(self): - self.wait_for_promotion() - while self.is_leader: - self._broadcaster.send(self.identity) - gevent.sleep(self.heartbeat_interval) - - @autospawn - def _listen_for_heartbeats(self): - while not self.is_leader: - leader = None - with Timeout(self.heartbeat_interval * 2, False) as timeout: - leader = self._listener.recv() - if leader is None: - self._next_leader() - - diff --git a/gtutorial/gateway.py b/gtutorial/gateway.py index 853c20a..4b4ad0d 100644 --- a/gtutorial/gateway.py +++ b/gtutorial/gateway.py @@ -7,21 +7,18 @@ from .numbers import NumberClient from .messaging.hub import MessageHub -from .coordination import Leadership from .util import ObservableSet logger = logging.getLogger(__name__) class NumberGateway(Service): identity = Setting('identity', default='127.0.0.1') - cluster_ = Setting('cluster', default=['127.0.0.1']) + cluster = Setting('cluster', default=['127.0.0.1']) def __init__(self): self.client = NumberClient(('127.0.0.1', 7776)) - self.cluster = Leadership(self.identity, ObservableSet(self.cluster_)) - self.hub = MessageHub(self.cluster.set, self.identity) + self.hub = MessageHub(ObservableSet(self.cluster), self.identity) - self.add_service(self.cluster) self.add_service(self.hub) self.add_service(self.client) @@ -31,6 +28,5 @@ def do_start(self): @autospawn def _bridge(self): for number in self.client: - if self.cluster.is_leader: - self.hub.publish('/numbers', number) + self.hub.publish('/numbers', number) gevent.sleep(0) diff --git a/gtutorial/messaging/hub.py b/gtutorial/messaging/hub.py index a728369..c84b849 100644 --- a/gtutorial/messaging/hub.py +++ b/gtutorial/messaging/hub.py @@ -8,7 +8,7 @@ class MessageHub(Service): def __init__(self, cluster=None, bind_interface=None, zmq=None): - self.bind_interface = bind_interface + self.bind_interface = bind_interface or '0.0.0.0' self.backend = MessageBackend(cluster, bind_interface, zmq) self.add_service(self.backend) From d42a387b2698feebe456365a9a42846167afc248 Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Thu, 8 Mar 2012 06:02:23 -0800 Subject: [PATCH 4/8] reverting gateway back to pre-distributed message hub --- gtutorial/gateway.py | 10 +++++----- gtutorial/messaging/hub.py | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/gtutorial/gateway.py b/gtutorial/gateway.py index 4b4ad0d..7882477 100644 --- a/gtutorial/gateway.py +++ b/gtutorial/gateway.py @@ -7,19 +7,19 @@ from .numbers import NumberClient from .messaging.hub import MessageHub -from .util import ObservableSet +from .messaging.websocket import WebSocketStreamer logger = logging.getLogger(__name__) class NumberGateway(Service): - identity = Setting('identity', default='127.0.0.1') - cluster = Setting('cluster', default=['127.0.0.1']) - def __init__(self): self.client = NumberClient(('127.0.0.1', 7776)) - self.hub = MessageHub(ObservableSet(self.cluster), self.identity) + self.hub = MessageHub() + self.ws = WebSocketStreamer(self.hub) + self.add_service(self.hub) + self.add_service(self.ws) self.add_service(self.client) def do_start(self): diff --git a/gtutorial/messaging/hub.py b/gtutorial/messaging/hub.py index c84b849..d926521 100644 --- a/gtutorial/messaging/hub.py +++ b/gtutorial/messaging/hub.py @@ -7,10 +7,10 @@ from .backend import MessageBackend class MessageHub(Service): - def __init__(self, cluster=None, bind_interface=None, zmq=None): + def __init__(self, cluster=None, bind_interface=None): self.bind_interface = bind_interface or '0.0.0.0' - self.backend = MessageBackend(cluster, bind_interface, zmq) + self.backend = MessageBackend(cluster, bind_interface) self.add_service(self.backend) self.add_service(HttpStreamer(self)) From 51ea350cb80aeff581d186a9e080fdfe33ca75e4 Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Thu, 8 Mar 2012 06:14:52 -0800 Subject: [PATCH 5/8] reverting to old, clusterless messagehub --- config/messagehub.conf.py | 3 +-- gtutorial/messaging/http.py | 4 ++-- gtutorial/messaging/hub.py | 25 +++++++++++++++++-------- gtutorial/messaging/websocket.py | 2 +- 4 files changed, 21 insertions(+), 13 deletions(-) diff --git a/config/messagehub.conf.py b/config/messagehub.conf.py index e750cf1..6c65730 100644 --- a/config/messagehub.conf.py +++ b/config/messagehub.conf.py @@ -1,5 +1,4 @@ def service(): from gtutorial.messaging.hub import MessageHub - from gtutorial.util import ObservableSet - return MessageHub(ObservableSet(['127.0.0.1'])) + return MessageHub() diff --git a/gtutorial/messaging/http.py b/gtutorial/messaging/http.py index 1f5e712..a66ac09 100644 --- a/gtutorial/messaging/http.py +++ b/gtutorial/messaging/http.py @@ -23,7 +23,7 @@ def __init__(self, hub): self.add_service( gevent.pywsgi.WSGIServer( - listener=(self.hub.bind_interface, self.port), + listener=('0.0.0.0', self.port), application=self.handle, spawn=self.spawn, log=None)) @@ -84,7 +84,7 @@ def __init__(self, hub): self.add_service( gevent.pywsgi.WSGIServer( - listener=(self.hub.bind_interface, self.port), + listener=('0.0.0.0', self.port), application=self.handle, spawn=self.spawn, log=None)) diff --git a/gtutorial/messaging/hub.py b/gtutorial/messaging/hub.py index d926521..9d28500 100644 --- a/gtutorial/messaging/hub.py +++ b/gtutorial/messaging/hub.py @@ -4,22 +4,31 @@ from .http import HttpStreamer from .http import HttpTailViewer from .websocket import WebSocketStreamer -from .backend import MessageBackend -class MessageHub(Service): - def __init__(self, cluster=None, bind_interface=None): - self.bind_interface = bind_interface or '0.0.0.0' +class Subscription(gevent.queue.Queue): + def __init__(self, channel): + super(Subscription, self).__init__(maxsize=64) + self.channel = channel + + def cancel(self): + self.channel = None - self.backend = MessageBackend(cluster, bind_interface) - self.add_service(self.backend) +class MessageHub(Service): + def __init__(self): + self.subscriptions = {} self.add_service(HttpStreamer(self)) self.add_service(HttpTailViewer(self)) self.add_service(WebSocketStreamer(self)) def publish(self, channel, message): - self.backend.publish(channel, message) + for subscription in self.subscriptions.get(channel, []): + subscription.put(message) def subscribe(self, channel): - return self.backend.subscribe(channel) + if channel not in self.subscriptions: + self.subscriptions[channel] = [] + subscription = Subscription(channel) + self.subscriptions[channel].append(subscription) + return subscription diff --git a/gtutorial/messaging/websocket.py b/gtutorial/messaging/websocket.py index 4212907..1af4d3b 100644 --- a/gtutorial/messaging/websocket.py +++ b/gtutorial/messaging/websocket.py @@ -15,7 +15,7 @@ def __init__(self, hub): self.hub = hub self.add_service( - WebSocketServer((self.hub.bind_interface, self.port), self.handle)) + WebSocketServer(('0.0.0.0', self.port), self.handle)) def handle(self, websocket, environ): channel = environ.get('PATH_INFO') From 2de40ac02ac8c20a0f10ed22702d2e482138259a Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Fri, 9 Mar 2012 01:47:33 -0800 Subject: [PATCH 6/8] checkout script for iterator over chapter branches --- checkout.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 checkout.py diff --git a/checkout.py b/checkout.py new file mode 100644 index 0000000..d5e3051 --- /dev/null +++ b/checkout.py @@ -0,0 +1,24 @@ +#!/usr/bin/python + +import commands + +BRANCHES = [ + 'chapter1', + 'chapter2a', + 'chapter2b', + 'chapter3a', + #'chapter3b', + 'chapter4a', + 'chapter4b', + 'chapter4c', + #'chapter5a', + #'chapter5b', + 'chapter6', + ] + +def checkout(branch): + return commands.getoutput('git checkout %s' % branch) + +for branch in BRANCHES: + output = checkout(branch) + raw_input('%-32s [continue] ' % output) From a47beff0a76312a283d9dfff7db4992ca64ef176 Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Fri, 9 Mar 2012 08:38:30 -0800 Subject: [PATCH 7/8] removing websocket from gateway (in hub now) --- gtutorial/gateway.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/gtutorial/gateway.py b/gtutorial/gateway.py index 7882477..2d09590 100644 --- a/gtutorial/gateway.py +++ b/gtutorial/gateway.py @@ -7,7 +7,6 @@ from .numbers import NumberClient from .messaging.hub import MessageHub -from .messaging.websocket import WebSocketStreamer logger = logging.getLogger(__name__) @@ -15,11 +14,8 @@ class NumberGateway(Service): def __init__(self): self.client = NumberClient(('127.0.0.1', 7776)) self.hub = MessageHub() - self.ws = WebSocketStreamer(self.hub) - self.add_service(self.hub) - self.add_service(self.ws) self.add_service(self.client) def do_start(self): From 5b02d077d6ec150d8778314b42462926175ab6d8 Mon Sep 17 00:00:00 2001 From: Jeff Lindsay Date: Fri, 9 Mar 2012 08:41:11 -0800 Subject: [PATCH 8/8] whitespace for readability --- gtutorial/messaging/backend.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/gtutorial/messaging/backend.py b/gtutorial/messaging/backend.py index a0f878f..520e27d 100644 --- a/gtutorial/messaging/backend.py +++ b/gtutorial/messaging/backend.py @@ -8,6 +8,7 @@ from ..util import ObservableSet + class Subscription(gevent.queue.Queue): def __init__(self, receiver, channel): super(Subscription, self).__init__(maxsize=64) @@ -19,6 +20,7 @@ def cancel(self): self.receiver.unsubscribe(self.channel, self) self.channel = None + class MessageBackend(Service): port = Setting('backend_port', default=2222) @@ -38,6 +40,7 @@ def publish(self, channel, message): def subscribe(self, channel): return Subscription(self.receiver, channel) + class PeerTransmitter(Service): def __init__(self, backend): self.cluster = backend.cluster @@ -58,6 +61,7 @@ def connect(self, host): def broadcast(self, channel, message): self.socket.send_multipart([str(channel).lower(), msgpack.packb(message)]) + class PeerReceiver(Service): def __init__(self, backend, bind_interface=None): self.bind_address = (bind_interface or '0.0.0.0', backend.port)