Skip to content

Commit 60c26ee

Browse files
[split] Cache specific cluster implentation Adding a cache specific cluster implementation which does: - monitoring underlying cache pool source changes - ready() would only return when all expected cache servers have been fetched from the underlying cache pool source - snap() will always return complete set of cache servers even during pool migration/expansion - for ZK based cache pool, use serverset parent node data as cache pool config data
RB_ID=83259
1 parent db29f89 commit 60c26ee

File tree

4 files changed

+554
-31
lines changed

4 files changed

+554
-31
lines changed

finagle-memcached/pom.xml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
<groupId>com.twitter</groupId>
2727
<artifactId>finagle-serversets</artifactId>
2828
<version>5.3.10</version>
29-
<scope>test</scope>
3029
</dependency>
3130
<dependency>
3231
<groupId>com.twitter</groupId>
@@ -42,8 +41,7 @@
4241
<dependency>
4342
<groupId>com.twitter.common_internal</groupId>
4443
<artifactId>zookeeper-utils</artifactId>
45-
<version>0.0.15</version>
46-
<scope>test</scope>
44+
<version>0.0.21</version>
4745
</dependency>
4846
<dependency>
4947
<groupId>com.google.code.findbugs</groupId>
Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
package com.twitter.finagle.memcached
2+
3+
import _root_.java.io.ByteArrayInputStream
4+
import _root_.java.net.InetSocketAddress
5+
import com.google.gson.GsonBuilder
6+
import com.twitter.common.io.{Codec,JsonCodec}
7+
import com.twitter.common.quantity.{Amount,Time}
8+
import com.twitter.common.zookeeper.ZooKeeperClient
9+
import com.twitter.common_internal.zookeeper.TwitterServerSet
10+
import com.twitter.common_internal.zookeeper.TwitterServerSet.Service
11+
import com.twitter.concurrent.{Broker, Spool}
12+
import com.twitter.concurrent.Spool.*::
13+
import com.twitter.conversions.time._
14+
import com.twitter.finagle.builder.Cluster
15+
import com.twitter.finagle.service.Backoff
16+
import com.twitter.finagle.zookeeper.ZookeeperServerSetCluster
17+
import com.twitter.util._
18+
import org.apache.zookeeper.Watcher.Event.KeeperState
19+
import org.apache.zookeeper.{WatchedEvent, Watcher}
20+
import scala.collection.mutable
21+
22+
// Type definition representing a cache node
23+
case class CacheNode(host: String, port: Int, weight: Int)
24+
25+
/**
26+
* Cache specific cluster implementation.
27+
* - A cache pool is a Cluster of cache nodes.
28+
* - cache pool requires a underlying pool manager as the source of the cache nodes
29+
* - the underlying pool manager encapsulates logic of monitoring the cache node changes and
30+
* deciding when to update the cache pool cluster
31+
*/
32+
object CachePoolCluster {
33+
val timer = new JavaTimer(isDaemon = true)
34+
35+
/**
36+
* Cache pool based on a static list
37+
* @param cacheNodeSeq static set of cache nodes to construct the cluster
38+
*/
39+
def newStaticCluster(cacheNodeSeq: Seq[CacheNode]) = new StaticCachePoolCluster(cacheNodeSeq)
40+
41+
/**
42+
* Zookeeper based cache pool cluster.
43+
* The cluster will monitor the underlying serverset changes and report the detected underlying
44+
* pool size. The cluster snapshot will be updated during cache-team's managed operation, and
45+
* the Future spool will be updated with corresponding changes
46+
*
47+
* @param sdService the SD service token representing the cache pool
48+
* @param backupPool Optional, the backup static pool to use in case of ZK failure
49+
*/
50+
def newZkCluster(sdService: Service, backupPool: Option[Set[CacheNode]]=None) =
51+
new ZookeeperCachePoolCluster(sdService, TwitterServerSet.createClient(sdService), backupPool)
52+
}
53+
54+
trait CachePoolCluster extends Cluster[CacheNode] {
55+
/**
56+
* Cache pool snapshot and future changes
57+
* These two should only change when a key-ring rehashing is needed (e.g. cache pool
58+
* initialization, migration, expansion, etc), thus we only let the underlying pool manager
59+
* to change them
60+
*/
61+
private[this] val cachePool = new mutable.HashSet[CacheNode]
62+
private[this] var cachePoolChanges = new Promise[Spool[Cluster.Change[CacheNode]]]
63+
64+
def snap: (Seq[CacheNode], Future[Spool[Cluster.Change[CacheNode]]]) = cachePool synchronized {
65+
(cachePool.toSeq, cachePoolChanges)
66+
}
67+
68+
/**
69+
* TODO: pick up new rev of Cluster once it's ready
70+
* Soon enough the Cluster will be defined in a way that we can directly managing the managers
71+
* in a more flexible way, by then we should be able to do batch update we want here. For now,
72+
* the updating pool is still done one by one.
73+
*/
74+
final protected[this] def updatePool(newSet: Set[CacheNode]) = cachePool synchronized {
75+
val added = newSet &~ cachePool
76+
val removed = cachePool &~ newSet
77+
78+
// modify cachePool and cachePoolChanges
79+
removed foreach { node =>
80+
cachePool -= node
81+
appendUpdate(Cluster.Rem(node))
82+
}
83+
added foreach { node =>
84+
cachePool += node
85+
appendUpdate(Cluster.Add(node))
86+
}
87+
}
88+
89+
private[this] def appendUpdate(update: Cluster.Change[CacheNode]) = cachePool synchronized {
90+
val newTail = new Promise[Spool[Cluster.Change[CacheNode]]]
91+
cachePoolChanges() = Return(update *:: newTail)
92+
cachePoolChanges = newTail
93+
}
94+
}
95+
96+
97+
/**
98+
* Cache pool config data object
99+
*/
100+
object CachePoolConfig {
101+
val jsonCodec: Codec[CachePoolConfig] =
102+
JsonCodec.create(classOf[CachePoolConfig],
103+
new GsonBuilder().setExclusionStrategies(JsonCodec.getThriftExclusionStrategy()).create())
104+
}
105+
106+
/**
107+
* Cache pool config data format
108+
* Currently this data format is only used by ZookeeperCachePoolManager to read the config data
109+
* from zookeeper serverset parent node, and the expected cache pool size is the only attribute
110+
* we need for now. In the future this can be extended for other config attributes like cache
111+
* pool migrating state, backup cache servers list, or replication role, etc
112+
*/
113+
case class CachePoolConfig(cachePoolSize: Int)
114+
115+
/**
116+
* Cache pool based on a static list
117+
* @param cacheNodeSeq static set of cache nodes to construct the cluster
118+
*/
119+
class StaticCachePoolCluster(cacheNodeSeq: Seq[CacheNode]) extends CachePoolCluster {
120+
// TODO: stats collecting here
121+
private[this] var underlyingSizeGauge = cacheNodeSeq.size
122+
123+
// The cache pool will updated once and only once as the underlying pool never changes
124+
updatePool(cacheNodeSeq.toSet)
125+
}
126+
127+
/**
128+
* ZooKeeper based cache pool cluster companion object
129+
*/
130+
object ZookeeperCachePoolCluster {
131+
private val DefaultZkConnectionRetryBackoff =
132+
(Backoff.exponential(1.second, 2) take 6) ++ Backoff.const(60.seconds)
133+
private val CachePoolWaitCompleteTimeout = 10.seconds
134+
private val BackupPoolFallBackTimeout = 10.seconds
135+
}
136+
137+
/**
138+
* SD cluster based cache pool cluster with a zookeeper serverset as the underlying pool.
139+
* It will monitor the underlying serverset changes and report the detected underlying pool size.
140+
* It will also monitor the serverset parent node for cache pool config data, cache pool cluster
141+
* update will be triggered whenever cache config data change event happens.
142+
*
143+
* @param sdService the SD service token representing the cache pool
144+
* @param zkClient zookeeper client talking to the SD cluster
145+
* @param backupPool Optional, the backup static pool to use in case of ZK failure
146+
*/
147+
class ZookeeperCachePoolCluster private[memcached](
148+
sdService: Service,
149+
zkClient: ZooKeeperClient,
150+
backupPool: Option[Set[CacheNode]] = None)
151+
extends CachePoolCluster {
152+
private[this] val futurePool = FuturePool.defaultPool
153+
154+
private[this] val zkServerSetCluster =
155+
new ZookeeperServerSetCluster(TwitterServerSet.create(zkClient, sdService)) map {
156+
case addr: InetSocketAddress =>
157+
CacheNode(addr.getHostName, addr.getPort, 1)
158+
}
159+
160+
// continuously gauging underlying cluster size
161+
// TODO: stats collecting here
162+
private[this] var underlyingSizeGauge = 0
163+
zkServerSetCluster.snap match {
164+
case (underlyingSeq, underlyingChanges) =>
165+
underlyingSizeGauge = underlyingSeq.size
166+
underlyingChanges foreach { spool =>
167+
spool foreach {
168+
case Cluster.Add(node) => underlyingSizeGauge += 1
169+
case Cluster.Rem(node) => underlyingSizeGauge -= 1
170+
}
171+
}
172+
}
173+
174+
private sealed trait ZKPoolWork
175+
private case object LoadCachePoolConfig extends ZKPoolWork
176+
private case object ReEstablishZKConn extends ZKPoolWork
177+
178+
private[this] val zookeeperWorkQueue = new Broker[ZKPoolWork]
179+
180+
/**
181+
* Read work items of the broker and schedule the work with future pool. If the scheduled work
182+
* failed, it will repeatedly retry itself in a backoff manner (with a timer) until succeeded,
183+
* which will recursively call this method to restart the process again.
184+
*
185+
* This function guarantees that at any given time there will be only one thread (future pool thread)
186+
* blocking on zookeeper IO work. Multiple ZK connection events or cache pool change events would only
187+
* queue up the work, and each work will be picked up only after the previous one finished successfully
188+
*/
189+
private[this] def loopZookeeperWork {
190+
def scheduleReadCachePoolConfig(
191+
alsoUpdatePool: Boolean,
192+
backoff: Stream[Duration] = ZookeeperCachePoolCluster.DefaultZkConnectionRetryBackoff
193+
): Unit = {
194+
futurePool {
195+
readCachePoolConfigData(alsoUpdatePool)
196+
} onFailure { ex =>
197+
// TODO: stat here
198+
backoff match {
199+
case wait #:: rest =>
200+
CachePoolCluster.timer.doLater(wait) { scheduleReadCachePoolConfig(alsoUpdatePool, rest) }
201+
}
202+
} onSuccess { _ =>
203+
// If succeeded, loop back to consume the next work queued at the broker
204+
loopZookeeperWork
205+
}
206+
}
207+
208+
// get one work item off the broker and schedule it into the future pool
209+
// if there's no work available yet, this only registers a call back to the broker so that whoever
210+
// provides work item would then do the scheduling
211+
zookeeperWorkQueue.recv.sync() onSuccess {
212+
case LoadCachePoolConfig => scheduleReadCachePoolConfig(alsoUpdatePool = true)
213+
case ReEstablishZKConn => scheduleReadCachePoolConfig(alsoUpdatePool = false)
214+
}
215+
}
216+
217+
// Kick off the loop to process zookeeper work queue
218+
loopZookeeperWork
219+
220+
private[this] val zkConnectionWatcher: Watcher = new Watcher() {
221+
// NOTE: Ensure that the processing of events is not a blocking operation.
222+
override def process(event: WatchedEvent) = {
223+
if (event.getType == Watcher.Event.EventType.None)
224+
event.getState match {
225+
case KeeperState.Disconnected | KeeperState.SyncConnected =>
226+
// TODO: stat here
227+
case KeeperState.Expired =>
228+
// TODO: stat here
229+
// we only need to re-establish the zk connection and re-register the config data
230+
// watcher when Expired event happens because this is the event that will close
231+
// the zk client (which will lose all attached watchers). We could also do this
232+
// only when SyncConnected happens instead of keep retrying, but that would be
233+
// assuming other components sharing the zk client (e.g. ZookeeperServerSetCluster)
234+
// will always actively attempts to re-establish the zk connection. For now, I'm
235+
// making it actively re-establishing the connection itself here.
236+
// TODO: signaling that the underlying zk pool management in unhealthy and let
237+
// the scheduled work to set it back to healthy once the work is done
238+
zookeeperWorkQueue ! ReEstablishZKConn
239+
}
240+
}
241+
}
242+
243+
private[this] val cachePoolConfigDataWatcher: Watcher = new Watcher() {
244+
// NOTE: Ensure that the processing of events is not a blocking operation.
245+
override def process(event: WatchedEvent) = {
246+
if (event.getState == KeeperState.SyncConnected) {
247+
event.getType match {
248+
case Watcher.Event.EventType.NodeDataChanged =>
249+
// handle node data change event
250+
// TODO: stat here
251+
zookeeperWorkQueue ! LoadCachePoolConfig
252+
}
253+
}
254+
}
255+
}
256+
257+
// Register top-level connection watcher to monitor zk change.
258+
// This watcher will live across different zk connection
259+
zkClient.register(zkConnectionWatcher)
260+
261+
// Read the config data and update the pool for the first time during constructing
262+
// This will also attach a data change event watcher (cachePoolConfigDataWatcher) to
263+
// the zk client so that future config data change event will trigger this again.
264+
zookeeperWorkQueue ! LoadCachePoolConfig
265+
266+
// Falling back to use the backup pool (if provided) after a certain timeout.
267+
// Meanwhile, the first time invoke of updating pool will still proceed once it successfully
268+
// get the underlying pool config data and a complete pool members ready, by then it
269+
// will overwrite the backup pool.
270+
// This backup pool is mainly provided in case of long time SD cluster outage during which
271+
// cache client needs to be restarted.
272+
backupPool foreach { pool =>
273+
ready within (CachePoolCluster.timer, ZookeeperCachePoolCluster.BackupPoolFallBackTimeout) onFailure {
274+
_ => updatePool(pool)
275+
}
276+
}
277+
278+
/**
279+
* Read the immediate parent zookeeper node data for the cache pool config data, then invoke
280+
* the cache pool update function once the configured requirement is met, as well as attaching
281+
* a config data watcher which will perform the same action automatically whenever the config
282+
* data is changed again
283+
*/
284+
private[this] def readCachePoolConfigData(alsoUpdatePool: Boolean): Unit = synchronized {
285+
// read cache pool config data and attach the node data change watcher
286+
val data = zkClient
287+
.get(Amount.of(ZookeeperCachePoolCluster.CachePoolWaitCompleteTimeout.inMilliseconds, Time.MILLISECONDS))
288+
.getData(TwitterServerSet.getPath(sdService), cachePoolConfigDataWatcher, null)
289+
290+
if (alsoUpdatePool && data != null) {
291+
val cachePoolConfig = CachePoolConfig.jsonCodec.deserialize(new ByteArrayInputStream(data))
292+
293+
// apply the cache pool config to the cluster
294+
val expectedClusterSize = cachePoolConfig.cachePoolSize
295+
val (snapshotSeq, snapshotChanges) = zkServerSetCluster.snap
296+
297+
// TODO: this can be blocking or non-blocking, depending on the protocol
298+
// for now I'm making it blocking call as the current known scenario is that cache config data
299+
// should be always exactly matching existing memberships, controlled by cache-team operator.
300+
// It will only block for 10 seconds after which it should trigger alerting metrics and schedule
301+
// another try
302+
val newSet = waitForClusterComplete(snapshotSeq.toSet, expectedClusterSize, snapshotChanges)
303+
.get(ZookeeperCachePoolCluster.CachePoolWaitCompleteTimeout)()
304+
305+
updatePool(newSet.toSet)
306+
}
307+
}
308+
309+
/**
310+
* Wait for the current set to contain expected size of members.
311+
* If the underlying zk cluster change is triggered by operator (for migration/expansion etc), the
312+
* config data change should always happen after the operator has verified that this zk pool manager
313+
* already see expected size of members, in which case this method would immediately return;
314+
* however during the first time this pool manager is initialized, it's possible that the zkServerSetCluster
315+
* hasn't caught up all existing members yet hence this method may need to wait for the future changes.
316+
*/
317+
private[this] def waitForClusterComplete(
318+
currentSet: Set[CacheNode],
319+
expectedSize: Int,
320+
spoolChanges: Future[Spool[Cluster.Change[CacheNode]]]
321+
): Future[Set[CacheNode]] = {
322+
if (expectedSize == currentSet.size) {
323+
Future.value(currentSet)
324+
} else spoolChanges flatMap { spool =>
325+
spool match {
326+
case Cluster.Add(node) *:: tail =>
327+
waitForClusterComplete(currentSet + node, expectedSize, tail)
328+
case Cluster.Rem(node) *:: tail =>
329+
// this should not happen in general as this code generally is only for first time pool
330+
// manager initialization
331+
waitForClusterComplete(currentSet - node, expectedSize, tail)
332+
}
333+
}
334+
}
335+
}

0 commit comments

Comments
 (0)