@@ -505,38 +505,27 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
505505 }
506506
507507 test(" in-memory LRU storage" ) {
508- store = makeBlockManager(12000 )
509- val a1 = new Array [Byte ](4000 )
510- val a2 = new Array [Byte ](4000 )
511- val a3 = new Array [Byte ](4000 )
512- store.putSingle(" a1" , a1, StorageLevel .MEMORY_ONLY )
513- store.putSingle(" a2" , a2, StorageLevel .MEMORY_ONLY )
514- store.putSingle(" a3" , a3, StorageLevel .MEMORY_ONLY )
515- assert(store.getSingle(" a2" ).isDefined, " a2 was not in store" )
516- assert(store.getSingle(" a3" ).isDefined, " a3 was not in store" )
517- assert(store.getSingle(" a1" ) === None , " a1 was in store" )
518- assert(store.getSingle(" a2" ).isDefined, " a2 was not in store" )
519- // At this point a2 was gotten last, so LRU will getSingle rid of a3
520- store.putSingle(" a1" , a1, StorageLevel .MEMORY_ONLY )
521- assert(store.getSingle(" a1" ).isDefined, " a1 was not in store" )
522- assert(store.getSingle(" a2" ).isDefined, " a2 was not in store" )
523- assert(store.getSingle(" a3" ) === None , " a3 was in store" )
508+ testInMemoryLRUStorage(StorageLevel .MEMORY_ONLY )
524509 }
525510
526511 test(" in-memory LRU storage with serialization" ) {
512+ testInMemoryLRUStorage(StorageLevel .MEMORY_ONLY_SER )
513+ }
514+
515+ private def testInMemoryLRUStorage (storageLevel : StorageLevel ): Unit = {
527516 store = makeBlockManager(12000 )
528517 val a1 = new Array [Byte ](4000 )
529518 val a2 = new Array [Byte ](4000 )
530519 val a3 = new Array [Byte ](4000 )
531- store.putSingle(" a1" , a1, StorageLevel . MEMORY_ONLY_SER )
532- store.putSingle(" a2" , a2, StorageLevel . MEMORY_ONLY_SER )
533- store.putSingle(" a3" , a3, StorageLevel . MEMORY_ONLY_SER )
520+ store.putSingle(" a1" , a1, storageLevel )
521+ store.putSingle(" a2" , a2, storageLevel )
522+ store.putSingle(" a3" , a3, storageLevel )
534523 assert(store.getSingle(" a2" ).isDefined, " a2 was not in store" )
535524 assert(store.getSingle(" a3" ).isDefined, " a3 was not in store" )
536525 assert(store.getSingle(" a1" ) === None , " a1 was in store" )
537526 assert(store.getSingle(" a2" ).isDefined, " a2 was not in store" )
538527 // At this point a2 was gotten last, so LRU will getSingle rid of a3
539- store.putSingle(" a1" , a1, StorageLevel . MEMORY_ONLY_SER )
528+ store.putSingle(" a1" , a1, storageLevel )
540529 assert(store.getSingle(" a1" ).isDefined, " a1 was not in store" )
541530 assert(store.getSingle(" a2" ).isDefined, " a2 was not in store" )
542531 assert(store.getSingle(" a3" ) === None , " a3 was in store" )
@@ -618,62 +607,35 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
618607 }
619608
620609 test(" disk and memory storage" ) {
621- store = makeBlockManager(12000 )
622- val a1 = new Array [Byte ](4000 )
623- val a2 = new Array [Byte ](4000 )
624- val a3 = new Array [Byte ](4000 )
625- store.putSingle(" a1" , a1, StorageLevel .MEMORY_AND_DISK )
626- store.putSingle(" a2" , a2, StorageLevel .MEMORY_AND_DISK )
627- store.putSingle(" a3" , a3, StorageLevel .MEMORY_AND_DISK )
628- assert(store.getSingle(" a2" ).isDefined, " a2 was not in store" )
629- assert(store.getSingle(" a3" ).isDefined, " a3 was not in store" )
630- assert(store.memoryStore.getValues(" a1" ) == None , " a1 was in memory store" )
631- assert(store.getSingle(" a1" ).isDefined, " a1 was not in store" )
632- assert(store.memoryStore.getValues(" a1" ).isDefined, " a1 was not in memory store" )
610+ testDiskAndMemoryStorage(StorageLevel .MEMORY_AND_DISK , _.getSingle)
633611 }
634612
635613 test(" disk and memory storage with getLocalBytes" ) {
636- store = makeBlockManager(12000 )
637- val a1 = new Array [Byte ](4000 )
638- val a2 = new Array [Byte ](4000 )
639- val a3 = new Array [Byte ](4000 )
640- store.putSingle(" a1" , a1, StorageLevel .MEMORY_AND_DISK )
641- store.putSingle(" a2" , a2, StorageLevel .MEMORY_AND_DISK )
642- store.putSingle(" a3" , a3, StorageLevel .MEMORY_AND_DISK )
643- assert(store.getLocalBytes(" a2" ).isDefined, " a2 was not in store" )
644- assert(store.getLocalBytes(" a3" ).isDefined, " a3 was not in store" )
645- assert(store.memoryStore.getValues(" a1" ) == None , " a1 was in memory store" )
646- assert(store.getLocalBytes(" a1" ).isDefined, " a1 was not in store" )
647- assert(store.memoryStore.getValues(" a1" ).isDefined, " a1 was not in memory store" )
614+ testDiskAndMemoryStorage(StorageLevel .MEMORY_AND_DISK , _.getLocalBytes)
648615 }
649616
650617 test(" disk and memory storage with serialization" ) {
651- store = makeBlockManager(12000 )
652- val a1 = new Array [Byte ](4000 )
653- val a2 = new Array [Byte ](4000 )
654- val a3 = new Array [Byte ](4000 )
655- store.putSingle(" a1" , a1, StorageLevel .MEMORY_AND_DISK_SER )
656- store.putSingle(" a2" , a2, StorageLevel .MEMORY_AND_DISK_SER )
657- store.putSingle(" a3" , a3, StorageLevel .MEMORY_AND_DISK_SER )
658- assert(store.getSingle(" a2" ).isDefined, " a2 was not in store" )
659- assert(store.getSingle(" a3" ).isDefined, " a3 was not in store" )
660- assert(store.memoryStore.getValues(" a1" ) == None , " a1 was in memory store" )
661- assert(store.getSingle(" a1" ).isDefined, " a1 was not in store" )
662- assert(store.memoryStore.getValues(" a1" ).isDefined, " a1 was not in memory store" )
618+ testDiskAndMemoryStorage(StorageLevel .MEMORY_AND_DISK_SER , _.getSingle)
663619 }
664620
665621 test(" disk and memory storage with serialization and getLocalBytes" ) {
622+ testDiskAndMemoryStorage(StorageLevel .MEMORY_AND_DISK_SER , _.getLocalBytes)
623+ }
624+
625+ def testDiskAndMemoryStorage (
626+ storageLevel : StorageLevel ,
627+ accessMethod : BlockManager => BlockId => Option [_]): Unit = {
666628 store = makeBlockManager(12000 )
667629 val a1 = new Array [Byte ](4000 )
668630 val a2 = new Array [Byte ](4000 )
669631 val a3 = new Array [Byte ](4000 )
670- store.putSingle(" a1" , a1, StorageLevel . MEMORY_AND_DISK_SER )
671- store.putSingle(" a2" , a2, StorageLevel . MEMORY_AND_DISK_SER )
672- store.putSingle(" a3" , a3, StorageLevel . MEMORY_AND_DISK_SER )
673- assert(store.getLocalBytes (" a2" ).isDefined, " a2 was not in store" )
674- assert(store.getLocalBytes (" a3" ).isDefined, " a3 was not in store" )
675- assert(store.memoryStore.getValues(" a1" ) == None , " a1 was in memory store" )
676- assert(store.getLocalBytes (" a1" ).isDefined, " a1 was not in store" )
632+ store.putSingle(" a1" , a1, storageLevel )
633+ store.putSingle(" a2" , a2, storageLevel )
634+ store.putSingle(" a3" , a3, storageLevel )
635+ assert(accessMethod( store) (" a2" ).isDefined, " a2 was not in store" )
636+ assert(accessMethod( store) (" a3" ).isDefined, " a3 was not in store" )
637+ assert(store.memoryStore.getValues(" a1" ).isEmpty , " a1 was in memory store" )
638+ assert(accessMethod( store) (" a1" ).isDefined, " a1 was not in store" )
677639 assert(store.memoryStore.getValues(" a1" ).isDefined, " a1 was not in memory store" )
678640 }
679641
0 commit comments