2222
2323import java .util .AbstractMap .SimpleImmutableEntry ;
2424import java .util .ArrayList ;
25- import java .util .Collection ;
2625import java .util .List ;
27- import java .util .Map ;
2826import java .util .concurrent .CompletableFuture ;
29- import java .util .concurrent .CompletionStage ;
3027import java .util .stream .Collectors ;
3128
32- import static java .util .Collections .emptyList ;
33- import static java .util .Collections .singletonList ;
34- import static org .dataloader .impl .Assertions .assertState ;
3529import static org .dataloader .impl .Assertions .nonNull ;
3630
3731/**
6357 */
6458public class DataLoader <K , V > {
6559
66- private final Object batchLoadFunction ;
60+ private final DataLoaderHelper < K , V > helper ;
6761 private final DataLoaderOptions loaderOptions ;
6862 private final CacheMap <Object , CompletableFuture <V >> futureCache ;
6963 private final List <SimpleImmutableEntry <K , CompletableFuture <V >>> loaderQueue ;
@@ -352,12 +346,13 @@ public DataLoader(BatchLoader<K, V> batchLoadFunction, DataLoaderOptions options
352346 }
353347
354348 private DataLoader (Object batchLoadFunction , DataLoaderOptions options ) {
355- this .batchLoadFunction = nonNull (batchLoadFunction );
356349 this .loaderOptions = options == null ? new DataLoaderOptions () : options ;
357350 this .futureCache = determineCacheMap (loaderOptions );
358351 // order of keys matter in data loader
359352 this .loaderQueue = new ArrayList <>();
360353 this .stats = nonNull (this .loaderOptions .getStatisticsCollector ());
354+
355+ this .helper = new DataLoaderHelper <>(this , batchLoadFunction ,this .loaderOptions , futureCache , loaderQueue , stats );
361356 }
362357
363358 @ SuppressWarnings ("unchecked" )
@@ -377,33 +372,7 @@ private CacheMap<Object, CompletableFuture<V>> determineCacheMap(DataLoaderOptio
377372 * @return the future of the value
378373 */
379374 public CompletableFuture <V > load (K key ) {
380- synchronized (this ) {
381- Object cacheKey = getCacheKey (nonNull (key ));
382- stats .incrementLoadCount ();
383-
384- boolean batchingEnabled = loaderOptions .batchingEnabled ();
385- boolean cachingEnabled = loaderOptions .cachingEnabled ();
386-
387- if (cachingEnabled ) {
388- if (futureCache .containsKey (cacheKey )) {
389- stats .incrementCacheHitCount ();
390- return futureCache .get (cacheKey );
391- }
392- }
393-
394- CompletableFuture <V > future = new CompletableFuture <>();
395- if (batchingEnabled ) {
396- loaderQueue .add (new SimpleImmutableEntry <>(key , future ));
397- } else {
398- stats .incrementBatchLoadCountBy (1 );
399- // immediate execution of batch function
400- future = invokeLoaderImmediately (key );
401- }
402- if (cachingEnabled ) {
403- futureCache .set (cacheKey , future );
404- }
405- return future ;
406- }
375+ return helper .load (key );
407376 }
408377
409378 /**
@@ -436,183 +405,7 @@ public CompletableFuture<List<V>> loadMany(List<K> keys) {
436405 * @return the promise of the queued load requests
437406 */
438407 public CompletableFuture <List <V >> dispatch () {
439- boolean batchingEnabled = loaderOptions .batchingEnabled ();
440- //
441- // we copy the pre-loaded set of futures ready for dispatch
442- final List <K > keys = new ArrayList <>();
443- final List <CompletableFuture <V >> queuedFutures = new ArrayList <>();
444- synchronized (this ) {
445- loaderQueue .forEach (entry -> {
446- keys .add (entry .getKey ());
447- queuedFutures .add (entry .getValue ());
448- });
449- loaderQueue .clear ();
450- }
451- if (!batchingEnabled || keys .size () == 0 ) {
452- return CompletableFuture .completedFuture (emptyList ());
453- }
454- //
455- // order of keys -> values matter in data loader hence the use of linked hash map
456- //
457- // See https://github.com/facebook/dataloader/blob/master/README.md for more details
458- //
459-
460- //
461- // when the promised list of values completes, we transfer the values into
462- // the previously cached future objects that the client already has been given
463- // via calls to load("foo") and loadMany(["foo","bar"])
464- //
465- int maxBatchSize = loaderOptions .maxBatchSize ();
466- if (maxBatchSize > 0 && maxBatchSize < keys .size ()) {
467- return sliceIntoBatchesOfBatches (keys , queuedFutures , maxBatchSize );
468- } else {
469- return dispatchQueueBatch (keys , queuedFutures );
470- }
471- }
472-
473- private CompletableFuture <List <V >> sliceIntoBatchesOfBatches (List <K > keys , List <CompletableFuture <V >> queuedFutures , int maxBatchSize ) {
474- // the number of keys is > than what the batch loader function can accept
475- // so make multiple calls to the loader
476- List <CompletableFuture <List <V >>> allBatches = new ArrayList <>();
477- int len = keys .size ();
478- int batchCount = (int ) Math .ceil (len / (double ) maxBatchSize );
479- for (int i = 0 ; i < batchCount ; i ++) {
480-
481- int fromIndex = i * maxBatchSize ;
482- int toIndex = Math .min ((i + 1 ) * maxBatchSize , len );
483-
484- List <K > subKeys = keys .subList (fromIndex , toIndex );
485- List <CompletableFuture <V >> subFutures = queuedFutures .subList (fromIndex , toIndex );
486-
487- allBatches .add (dispatchQueueBatch (subKeys , subFutures ));
488- }
489- //
490- // now reassemble all the futures into one that is the complete set of results
491- return CompletableFuture .allOf (allBatches .toArray (new CompletableFuture [allBatches .size ()]))
492- .thenApply (v -> allBatches .stream ()
493- .map (CompletableFuture ::join )
494- .flatMap (Collection ::stream )
495- .collect (Collectors .toList ()));
496- }
497-
498- @ SuppressWarnings ("unchecked" )
499- private CompletableFuture <List <V >> dispatchQueueBatch (List <K > keys , List <CompletableFuture <V >> queuedFutures ) {
500- stats .incrementBatchLoadCountBy (keys .size ());
501- CompletionStage <List <V >> batchLoad = invokeLoader (keys );
502- return batchLoad
503- .toCompletableFuture ()
504- .thenApply (values -> {
505- assertResultSize (keys , values );
506-
507- for (int idx = 0 ; idx < queuedFutures .size (); idx ++) {
508- Object value = values .get (idx );
509- CompletableFuture <V > future = queuedFutures .get (idx );
510- if (value instanceof Throwable ) {
511- stats .incrementLoadErrorCount ();
512- future .completeExceptionally ((Throwable ) value );
513- // we don't clear the cached view of this entry to avoid
514- // frequently loading the same error
515- } else if (value instanceof Try ) {
516- // we allow the batch loader to return a Try so we can better represent a computation
517- // that might have worked or not.
518- Try <V > tryValue = (Try <V >) value ;
519- if (tryValue .isSuccess ()) {
520- future .complete (tryValue .get ());
521- } else {
522- stats .incrementLoadErrorCount ();
523- future .completeExceptionally (tryValue .getThrowable ());
524- }
525- } else {
526- V val = (V ) value ;
527- future .complete (val );
528- }
529- }
530- return values ;
531- }).exceptionally (ex -> {
532- stats .incrementBatchLoadExceptionCount ();
533- for (int idx = 0 ; idx < queuedFutures .size (); idx ++) {
534- K key = keys .get (idx );
535- CompletableFuture <V > future = queuedFutures .get (idx );
536- future .completeExceptionally (ex );
537- // clear any cached view of this key because they all failed
538- clear (key );
539- }
540- return emptyList ();
541- });
542- }
543-
544- private CompletableFuture <V > invokeLoaderImmediately (K key ) {
545- List <K > keys = singletonList (key );
546- CompletionStage <V > singleLoadCall ;
547- try {
548- BatchLoaderEnvironment environment = loaderOptions .getBatchLoaderEnvironmentProvider ().get ();
549- if (isMapLoader ()) {
550- singleLoadCall = invokeMapBatchLoader (keys , environment ).thenApply (list -> list .get (0 ));
551- } else {
552- singleLoadCall = invokeListBatchLoader (keys , environment ).thenApply (list -> list .get (0 ));
553- }
554- return singleLoadCall .toCompletableFuture ();
555- } catch (Exception e ) {
556- return CompletableFutureKit .failedFuture (e );
557- }
558- }
559-
560- private CompletionStage <List <V >> invokeLoader (List <K > keys ) {
561- CompletionStage <List <V >> batchLoad ;
562- try {
563- BatchLoaderEnvironment environment = loaderOptions .getBatchLoaderEnvironmentProvider ().get ();
564- if (isMapLoader ()) {
565- batchLoad = invokeMapBatchLoader (keys , environment );
566- } else {
567- batchLoad = invokeListBatchLoader (keys , environment );
568- }
569- } catch (Exception e ) {
570- batchLoad = CompletableFutureKit .failedFuture (e );
571- }
572- return batchLoad ;
573- }
574-
575- @ SuppressWarnings ("unchecked" )
576- private CompletionStage <List <V >> invokeListBatchLoader (List <K > keys , BatchLoaderEnvironment environment ) {
577- CompletionStage <List <V >> loadResult ;
578- if (batchLoadFunction instanceof BatchLoaderWithContext ) {
579- loadResult = ((BatchLoaderWithContext <K , V >) batchLoadFunction ).load (keys , environment );
580- } else {
581- loadResult = ((BatchLoader <K , V >) batchLoadFunction ).load (keys );
582- }
583- return nonNull (loadResult , "Your batch loader function MUST return a non null CompletionStage promise" );
584- }
585-
586- /*
587- * Turns a map of results that MAY be smaller than the key list back into a list by mapping null
588- * to missing elements.
589- */
590-
591- @ SuppressWarnings ("unchecked" )
592- private CompletionStage <List <V >> invokeMapBatchLoader (List <K > keys , BatchLoaderEnvironment environment ) {
593- CompletionStage <Map <K , V >> loadResult ;
594- if (batchLoadFunction instanceof MappedBatchLoaderWithContext ) {
595- loadResult = ((MappedBatchLoaderWithContext <K , V >) batchLoadFunction ).load (keys , environment );
596- } else {
597- loadResult = ((MappedBatchLoader <K , V >) batchLoadFunction ).load (keys );
598- }
599- CompletionStage <Map <K , V >> mapBatchLoad = nonNull (loadResult , "Your batch loader function MUST return a non null CompletionStage promise" );
600- return mapBatchLoad .thenApply (map -> {
601- List <V > values = new ArrayList <>();
602- for (K key : keys ) {
603- V value = map .get (key );
604- values .add (value );
605- }
606- return values ;
607- });
608- }
609-
610- private boolean isMapLoader () {
611- return batchLoadFunction instanceof MappedBatchLoader || batchLoadFunction instanceof MappedBatchLoaderWithContext ;
612- }
613-
614- private void assertResultSize (List <K > keys , List <V > values ) {
615- assertState (keys .size () == values .size (), "The size of the promised values MUST be the same size as the key list" );
408+ return helper .dispatch ();
616409 }
617410
618411 /**
@@ -718,10 +511,8 @@ public DataLoader<K, V> prime(K key, Exception error) {
718511 *
719512 * @return the cache key after the input is transformed with the cache key function
720513 */
721- @ SuppressWarnings ("unchecked" )
722514 public Object getCacheKey (K key ) {
723- return loaderOptions .cacheKeyFunction ().isPresent () ?
724- loaderOptions .cacheKeyFunction ().get ().getKey (key ) : key ;
515+ return helper .getCacheKey (key );
725516 }
726517
727518 /**
0 commit comments