4545import java .security .NoSuchAlgorithmException ;
4646import java .util .ArrayList ;
4747import java .util .Arrays ;
48+ import java .util .Date ;
4849import java .util .HashMap ;
4950import java .util .HashSet ;
5051import java .util .Iterator ;
@@ -189,6 +190,10 @@ protected final MessageDigest initialValue() {
189190 // map to hold all available sockets
190191 Map <String , ConcurrentLinkedQueue <SchoonerSockIO >> socketPool ;
191192
193+ ConcurrentMap <String , Date > hostDead ;
194+
195+ ConcurrentMap <String , Long > hostDeadDur ;
196+
192197 private int maxConn = 32 ;
193198
194199 private Map <String , AtomicInteger > poolCurrentConn ;
@@ -269,6 +274,8 @@ public void initialize() {
269274 }
270275 // pools
271276 socketPool = new HashMap <String , ConcurrentLinkedQueue <SchoonerSockIO >>(servers .length );
277+ hostDead = new ConcurrentHashMap <String , Date >();
278+ hostDeadDur = new ConcurrentHashMap <String , Long >();
272279 poolCurrentConn = new HashMap <String , AtomicInteger >(servers .length );
273280 // only create up to maxCreate connections at once
274281
@@ -393,6 +400,14 @@ private void populateConsistentBuckets() {
393400 * @return SockIO obj or null if failed to create
394401 */
395402 protected final SchoonerSockIO createSocket (String host ) {
403+ if (!failback && hostDead .containsKey (host ) && hostDeadDur .containsKey (host )) {
404+
405+ Date store = hostDead .get (host );
406+ long expire = hostDeadDur .get (host ).longValue ();
407+
408+ if ((store .getTime () + expire ) > System .currentTimeMillis ())
409+ return null ;
410+ }
396411
397412 SchoonerSockIO socket = null ;
398413 try {
@@ -407,10 +422,33 @@ protected final SchoonerSockIO createSocket(String host) {
407422 socket = null ;
408423 }
409424
425+ if (socket == null ) {
426+ Date now = new Date ();
427+ hostDead .put (host , now );
428+
429+ long expire = (hostDeadDur .containsKey (host )) ? (((Long ) hostDeadDur .get (host )).longValue () * 2 ) : 1000 ;
430+
431+ if (expire > MAX_RETRY_DELAY )
432+ expire = MAX_RETRY_DELAY ;
433+
434+ hostDeadDur .put (host , new Long (expire ));
435+
436+ // also clear all entries for this host from availPool
437+ clearHostFromPool (host );
438+ }
439+
410440 return socket ;
411441 }
412442
413443 protected final SchoonerSockIO createSocketWithAdd (String host ) {
444+ if (!failback && hostDead .containsKey (host ) && hostDeadDur .containsKey (host )) {
445+
446+ Date store = hostDead .get (host );
447+ long expire = hostDeadDur .get (host ).longValue ();
448+
449+ if ((store .getTime () + expire ) > System .currentTimeMillis ())
450+ return null ;
451+ }
414452
415453 SchoonerSockIO socket = null ;
416454 try {
@@ -427,9 +465,50 @@ protected final SchoonerSockIO createSocketWithAdd(String host) {
427465 poolCurrentConn .get (host ).decrementAndGet ();
428466 }
429467
468+ if (socket == null ) {
469+ Date now = new Date ();
470+ hostDead .put (host , now );
471+
472+ long expire = (hostDeadDur .containsKey (host )) ? (((Long ) hostDeadDur .get (host )).longValue () * 2 ) : 1000 ;
473+
474+ if (expire > MAX_RETRY_DELAY )
475+ expire = MAX_RETRY_DELAY ;
476+
477+ hostDeadDur .put (host , new Long (expire ));
478+
479+ // also clear all entries for this host from availPool
480+ clearHostFromPool (host );
481+ }
482+
430483 return socket ;
431484 }
432485
486+ /**
487+ * Closes and removes all sockets from specified pool for host. THIS METHOD
488+ * IS NOT THREADSAFE, SO BE CAREFUL WHEN USING!
489+ *
490+ * Internal utility method.
491+ *
492+ * @param pool
493+ * pool to clear
494+ * @param host
495+ * host to clear
496+ */
497+ protected void clearHostFromPool (String host ) {
498+ ConcurrentLinkedQueue <SchoonerSockIO > pool = socketPool .get (host );
499+
500+ for (SchoonerSockIO ssock : pool ) {
501+ try {
502+ ssock .trueClose ();
503+ } catch (IOException ioe ) {
504+ log .error ("++++ failed to close socket: " + ioe .getMessage ());
505+ }
506+ }
507+
508+ pool .clear ();
509+ poolCurrentConn .put (host , new AtomicInteger (0 ));
510+ }
511+
433512 /**
434513 * Gets the host that a particular key / hashcode resides on.
435514 *
@@ -550,6 +629,7 @@ public final SchoonerSockIO getConnection(String host) {
550629 log .error ("attempting to get SockIO from uninitialized pool!" );
551630 return null ;
552631 }
632+
553633 if (host == null )
554634 return null ;
555635 // if we have items in the pool then we can return it
@@ -560,15 +640,13 @@ public final SchoonerSockIO getConnection(String host) {
560640 socket = createSocketWithAdd (host );
561641 } else {
562642 socket = createSocket (host );
563- if (socket == null )
564- return null ;
565- socket .setPooled (false );
643+ if (socket != null )
644+ socket .setPooled (false );
566645 }
567646 } else if (aliveCheck && !socket .isAlive ()) {
568647 socket = createSocket (host );
569- if (socket == null )
570- return null ;
571- socket .setPooled (false );
648+ if (socket != null )
649+ socket .setPooled (false );
572650 }
573651 return socket ;
574652 }
@@ -1735,4 +1813,4 @@ public ByteChannel getByteChannel() {
17351813 }
17361814 }
17371815
1738- }
1816+ }
0 commit comments