11package com .lowtuna .jsonblob .core ;
22
33import com .fasterxml .jackson .databind .ObjectMapper ;
4- import com .google .common .base .Function ;
5- import com .google .common .base .Stopwatch ;
64import com .google .common .collect .Lists ;
7- import com .google .common .collect .Maps ;
8- import com .google .common .collect .Sets ;
95import io .dropwizard .util .Duration ;
106import lombok .RequiredArgsConstructor ;
117import lombok .extern .slf4j .Slf4j ;
12- import org .joda .time .DateTime ;
138
14- import javax .annotation .Nullable ;
159import java .io .File ;
16- import java .io .IOException ;
1710import java .nio .file .Files ;
1811import java .nio .file .Path ;
19- import java .util .Arrays ;
2012import java .util .List ;
21- import java .util .Map ;
22- import java .util .Set ;
23- import java .util .concurrent .TimeUnit ;
24- import java .util .concurrent .atomic .AtomicInteger ;
25- import java .util .stream .Collectors ;
13+ import java .util .concurrent .ExecutorService ;
2614
2715@ Slf4j
2816@ RequiredArgsConstructor
@@ -32,69 +20,31 @@ public class BlobCleanupJob implements Runnable {
3220 private final FileSystemJsonBlobManager fileSystemJsonBlobManager ;
3321 private final ObjectMapper om ;
3422 private final boolean deleteEnabled ;
23+ private final ExecutorService executorService ;
3524
3625 @ Override
3726 public void run () {
38- Stopwatch stopwatch = new Stopwatch ().start ();
39- AtomicInteger blobsRemoved = new AtomicInteger (0 );
4027 try {
41- Set <String > blobsToDelete = Sets .newCopyOnWriteArraySet ();
28+ List <String > dataDirs = Lists .newCopyOnWriteArrayList ();
29+
4230 Files .walk (blobDirectory )
4331 .parallel ()
4432 .filter (p -> !p .toFile ().isDirectory ())
4533 .map (Path ::getParent )
4634 .distinct ()
47- .forEach (dataDir -> {
48- log .info ("Checking for blobs not accessed in the last {} in {}" , blobAccessTtl , dataDir .toAbsolutePath ());
49- if (!dataDir .toFile ().exists () || !dataDir .toFile ().isDirectory ()) {
50- return ;
51- }
52- try {
53- List <File > files = Arrays .asList (dataDir .toFile ().listFiles ()).parallelStream ().filter (File ::exists ).collect (Collectors .toList ());
54- Set <String > blobs = Sets
55- .newHashSet (Lists .transform (files , f -> f .getName ().split ("\\ ." , 2 )[0 ]))
56- .parallelStream ()
57- .filter (f -> fileSystemJsonBlobManager .resolveTimestamp (f ).isPresent ()).collect (Collectors .toSet ());
58- log .info ("Identified {} blobs in {}" , blobs .size (), dataDir );
59- Map <String , DateTime > lastAccessed = Maps .newHashMap (Maps .asMap (blobs , new Function <String , DateTime >() {
60- @ Nullable
61- @ Override
62- public DateTime apply (@ Nullable String input ) {
63- return fileSystemJsonBlobManager .resolveTimestamp (input ).get ();
64- }
65- }));
66- log .debug ("Completed building map of {} last accessed timestamps in {}" , lastAccessed .size (), dataDir );
35+ .forEach (dataDir -> dataDirs .add (dataDir .toFile ().getAbsolutePath ()));
6736
68- File metadataFile = fileSystemJsonBlobManager .getMetaDataFile (dataDir .toFile ());
69- try {
70- BlobMetadataContainer metadataContainer = metadataFile .exists () ? om .readValue (fileSystemJsonBlobManager .readFile (metadataFile ), BlobMetadataContainer .class ) : new BlobMetadataContainer ();
71- log .debug ("Adding {} last accessed timestamp from metadata {}" , metadataContainer .getLastAccessedByBlobId ().size (), metadataFile .getAbsolutePath ());
72- lastAccessed .putAll (metadataContainer .getLastAccessedByBlobId ());
73- log .debug ("Determining which blobs to remove from {}" , dataDir );
74- Map <String , DateTime > toRemove = Maps .filterEntries (lastAccessed , input -> input .getValue ().plusMillis ((int ) blobAccessTtl .toMilliseconds ()).isBefore (DateTime .now ()));
75- log .info ("Identified {} blobs to remove in {}" , toRemove .size (), dataDir );
76- blobsToDelete .addAll (toRemove .keySet ());
77- } catch (IOException e ) {
78- log .warn ("Couldn't load metadata file from {}" , dataDir .toAbsolutePath (), e );
79- }
80- } catch (Exception e ) {
81- log .warn ("Caught Exception while trying to remove un-accessed blobs in {}" , dataDir , e );
82- }
83- });
37+ log .debug ("Found {} data directories" , dataDirs .size ());
8438
85- log .info ("Deleting {} blobs" , blobsToDelete .size ());
86- blobsToDelete .parallelStream ().forEach (blobId -> {
87- if (deleteEnabled ) {
88- log .debug ("Deleting blob with id {}" , blobId );
89- try {
90- fileSystemJsonBlobManager .deleteBlob (blobId );
91- blobsRemoved .incrementAndGet ();
92- } catch (BlobNotFoundException e ) {
93- log .debug ("Couldn't delete blobId {} because it's already been deleted" , blobId );
94- }
39+ for (String dataDirPath : dataDirs ) {
40+ File dir = new File (dataDirPath );
41+ if (dir .listFiles ().length == 0 ) {
42+ dir .delete ();
9543 }
96- });
97- log .info ("Completed cleanup of {} blobs in {}ms" , blobsRemoved .get (), stopwatch .elapsed (TimeUnit .MILLISECONDS ));
44+
45+ log .debug ("Submitting DataDirectoryCleanupJob for {}" , dataDirPath );
46+ executorService .submit (new DataDirectoryCleanupJob (dataDirPath , executorService , fileSystemJsonBlobManager ,blobAccessTtl , om , deleteEnabled ));
47+ }
9848 } catch (Exception e ) {
9949 log .warn ("Couldn't remove old blobs" , e );
10050 }
0 commit comments