11package org .logstash .plugins .inputs .http .util ;
22
3- import java .lang .invoke .MethodHandles ;
43import java .time .Duration ;
54import java .util .Optional ;
65import java .util .concurrent .atomic .AtomicReference ;
@@ -73,7 +72,7 @@ Stats stats() {
7372 if (!candidate .isComplete ) {
7473 executing += 1 ;
7574 }
76- candidate = candidate .getNextPlain ();
75+ candidate = candidate .next . get ();
7776 }
7877 return new Stats (nodes , executing );
7978 }
@@ -131,20 +130,11 @@ private Execution compactHead() {
131130 }
132131
133132 static class Execution {
134- private static final java .lang .invoke .VarHandle NEXT ;
135- static {
136- try {
137- MethodHandles .Lookup l = MethodHandles .lookup ();
138- NEXT = l .findVarHandle (Execution .class , "next" , Execution .class );
139- } catch (ReflectiveOperationException e ) {
140- throw new ExceptionInInitializerError (e );
141- }
142- }
143133
144134 private final long startNanos ;
145135
146136 private volatile boolean isComplete ;
147- private volatile Execution next ;
137+ private final AtomicReference < Execution > next = new AtomicReference <>() ;
148138
149139 Execution (long startNanos ) {
150140 this (startNanos , false );
@@ -163,39 +153,34 @@ static class Execution {
163153 boolean markComplete () {
164154 isComplete = true ;
165155
166- // concurrency: use plain memory for reads because we can tolerate
167- // completed nodes remaining as the result of a race
168- final Execution preCompletionNext = this .getNextPlain ();
156+ final Execution preCompletionNext = this .next .get ();
169157 if (preCompletionNext != null ) {
170- final Execution newNext = preCompletionNext . seekHead ( );
171- return ( newNext != preCompletionNext ) && NEXT . compareAndSet ( this , preCompletionNext , newNext ) ;
158+ final Execution result = this . next . updateAndGet ( Execution :: seekHead );
159+ return result != preCompletionNext ;
172160 }
161+
173162 return false ;
174163 }
175164
176165 private void linkNext (final Execution proposedNext ) {
177- final Execution witness = ( Execution ) NEXT . compareAndExchange ( this , null , proposedNext );
178- if (witness != null && witness != proposedNext ) {
166+ final Execution result = next . updateAndGet (( ex ) -> ex == null ? proposedNext : ex );
167+ if (result != proposedNext ) {
179168 throw new IllegalStateException ();
180169 }
181170 }
182171
183172 /**
184- * @return the next {@code Execution} that is either not yet complete
185- * or is the current tail, using plain memory access .
173+ * @return the first {@code Execution} that is either not yet complete
174+ * or is the current tail, possibly itself .
186175 */
187176 private Execution seekHead () {
188177 Execution compactedHead = this ;
189- Execution candidate = this .getNextPlain ();
178+ Execution candidate = this .next . get ();
190179 while (candidate != null && compactedHead .isComplete ) {
191180 compactedHead = candidate ;
192- candidate = candidate .getNextPlain ();
181+ candidate = candidate .next . get ();
193182 }
194183 return compactedHead ;
195184 }
196-
197- private Execution getNextPlain () {
198- return (Execution ) NEXT .get (this );
199- }
200185 }
201186}
0 commit comments