2121import java .util .concurrent .*;
2222import java .util .concurrent .atomic .AtomicReference ;
2323
24- import rx .Scheduler .Worker ;
25- import rx .functions .Action0 ;
26- import rx .internal .schedulers .SchedulerLifecycle ;
24+ import rx .internal .schedulers .*;
2725import rx .internal .util .unsafe .*;
28- import rx .schedulers .Schedulers ;
2926
3027public abstract class ObjectPool <T > implements SchedulerLifecycle {
3128 Queue <T > pool ;
3229 final int minSize ;
3330 final int maxSize ;
3431 private final long validationInterval ;
3532
36- private final AtomicReference <Worker > schedulerWorker ;
33+ private final AtomicReference <Future <?>> periodicTask ;
3734
3835 public ObjectPool () {
3936 this (0 , 0 , 67 );
@@ -55,7 +52,7 @@ private ObjectPool(final int min, final int max, final long validationInterval)
5552 this .minSize = min ;
5653 this .maxSize = max ;
5754 this .validationInterval = validationInterval ;
58- this .schedulerWorker = new AtomicReference <Worker >();
55+ this .periodicTask = new AtomicReference <Future <?> >();
5956 // initialize pool
6057 initialize (min );
6158
@@ -96,38 +93,51 @@ public void returnObject(T object) {
9693 */
9794 @ Override
9895 public void shutdown () {
99- Worker w = schedulerWorker .getAndSet (null );
100- if (w != null ) {
101- w . unsubscribe ( );
96+ Future <?> f = periodicTask .getAndSet (null );
97+ if (f != null ) {
98+ f . cancel ( false );
10299 }
103100 }
104101
105102 @ Override
106103 public void start () {
107- Worker w = Schedulers .computation ().createWorker ();
108- if (schedulerWorker .compareAndSet (null , w )) {
109- w .schedulePeriodically (new Action0 () {
110-
111- @ Override
112- public void call () {
113- int size = pool .size ();
114- if (size < minSize ) {
115- int sizeToBeAdded = maxSize - size ;
116- for (int i = 0 ; i < sizeToBeAdded ; i ++) {
117- pool .add (createObject ());
118- }
119- } else if (size > maxSize ) {
120- int sizeToBeRemoved = size - maxSize ;
121- for (int i = 0 ; i < sizeToBeRemoved ; i ++) {
122- // pool.pollLast();
123- pool .poll ();
104+ for (;;) {
105+ if (periodicTask .get () != null ) {
106+ return ;
107+ }
108+ ScheduledExecutorService w = GenericScheduledExecutorService .getInstance ();
109+
110+ Future <?> f ;
111+ try {
112+ f = w .scheduleAtFixedRate (new Runnable () {
113+
114+ @ Override
115+ public void run () {
116+ int size = pool .size ();
117+ if (size < minSize ) {
118+ int sizeToBeAdded = maxSize - size ;
119+ for (int i = 0 ; i < sizeToBeAdded ; i ++) {
120+ pool .add (createObject ());
121+ }
122+ } else if (size > maxSize ) {
123+ int sizeToBeRemoved = size - maxSize ;
124+ for (int i = 0 ; i < sizeToBeRemoved ; i ++) {
125+ // pool.pollLast();
126+ pool .poll ();
127+ }
124128 }
125129 }
126- }
127-
128- }, validationInterval , validationInterval , TimeUnit .SECONDS );
129- } else {
130- w .unsubscribe ();
130+
131+ }, validationInterval , validationInterval , TimeUnit .SECONDS );
132+ } catch (RejectedExecutionException ex ) {
133+ RxJavaPluginUtils .handleException (ex );
134+ break ;
135+ }
136+ if (!periodicTask .compareAndSet (null , f )) {
137+ f .cancel (false );
138+ } else {
139+ break ;
140+ }
131141 }
132142 }
133143
0 commit comments