2121import java .util .Map ;
2222import java .util .Set ;
2323import java .util .concurrent .TimeUnit ;
24- import java .util .concurrent .atomic .AtomicLong ;
24+ import java .util .concurrent .atomic .AtomicInteger ;
2525import java .util .stream .Collectors ;
2626
2727@ Slf4j
@@ -36,7 +36,7 @@ public class BlobCleanupJob implements Runnable {
3636 @ Override
3737 public void run () {
3838 Stopwatch stopwatch = new Stopwatch ().start ();
39- AtomicLong blobsRemoved = new AtomicLong (0 );
39+ AtomicInteger blobsRemoved = new AtomicInteger (0 );
4040 try {
4141 Files .walk (blobDirectory )
4242 .parallel ()
@@ -72,17 +72,20 @@ public DateTime apply(@Nullable String input) {
7272 log .debug ("Determining which blobs to remove from {}" , dataDir );
7373 Map <String , DateTime > toRemove = Maps .filterEntries (lastAccessed , input -> input .getValue ().plusMillis ((int ) blobAccessTtl .toMilliseconds ()).isBefore (DateTime .now ()));
7474 log .info ("Identified {} blobs to remove in {}" , toRemove .size (), dataDir );
75+ AtomicInteger deletedInDir = new AtomicInteger (0 );
7576 toRemove .keySet ().parallelStream ().forEach (blobId -> {
7677 if (deleteEnabled ) {
7778 log .debug ("Deleting blob with id {}" , blobId );
7879 try {
7980 fileSystemJsonBlobManager .deleteBlob (blobId );
80- blobsRemoved .incrementAndGet ();
81+ deletedInDir .incrementAndGet ();
8182 } catch (BlobNotFoundException e ) {
8283 log .debug ("Couldn't delete blobId {} because it's already been deleted" , blobId );
8384 }
8485 }
8586 });
87+ log .info ("Removed {} blobs in {}" , deletedInDir .get (), dataDir );
88+ blobsRemoved .addAndGet (deletedInDir .get ());
8689 } catch (IOException e ) {
8790 log .warn ("Couldn't load metadata file from {}" , dataDir .toAbsolutePath (), e );
8891 }
0 commit comments