11package com .lowtuna .jsonblob .core ;
22
3- import com .codahale .metrics .Gauge ;
4- import com .codahale .metrics .MetricRegistry ;
3+ import com .fasterxml .jackson .core .JsonParseException ;
4+ import com .fasterxml .jackson .databind .JsonMappingException ;
5+ import com .fasterxml .jackson .databind .ObjectMapper ;
6+ import com .google .common .base .Optional ;
57import com .google .common .base .Stopwatch ;
68import io .dropwizard .util .Duration ;
79import lombok .extern .slf4j .Slf4j ;
810import org .apache .commons .io .DirectoryWalker ;
11+ import org .joda .time .DateTime ;
912
1013import java .io .File ;
1114import java .io .IOException ;
1215import java .nio .file .Files ;
1316import java .nio .file .Path ;
1417import java .time .LocalDate ;
1518import java .util .Collection ;
16- import java .util .concurrent .BlockingQueue ;
1719import java .util .concurrent .TimeUnit ;
1820import java .util .concurrent .atomic .AtomicInteger ;
1921
2426public class BlobCleanupProducer extends DirectoryWalker <Void > implements Runnable {
2527 private final Path dataDirectoryPath ;
2628 private final Duration blobAccessTtl ;
27- private final BlockingQueue <File > filesToProcess ;
29+ private final FileSystemJsonBlobManager fileSystemJsonBlobManager ;
30+ private final ObjectMapper om ;
2831
29- public BlobCleanupProducer (Path dataDirectoryPath , Duration blobAccessTtl , BlockingQueue < File > filesToProcess , MetricRegistry metricRegistry ) {
32+ public BlobCleanupProducer (Path dataDirectoryPath , Duration blobAccessTtl , FileSystemJsonBlobManager fileSystemJsonBlobManager , ObjectMapper om ) {
3033 super (null , 3 );
3134 this .dataDirectoryPath = dataDirectoryPath ;
3235 this .blobAccessTtl = blobAccessTtl ;
33- this .filesToProcess = filesToProcess ;
34- metricRegistry . register ( MetricRegistry . name ( getClass (), "filesToProcessCount" ), ( Gauge < Integer >) () -> filesToProcess . size ()) ;
36+ this .fileSystemJsonBlobManager = fileSystemJsonBlobManager ;
37+ this . om = om ;
3538 }
3639
3740
@@ -51,10 +54,41 @@ protected boolean handleDirectory(File directory, int depth, Collection<Void> re
5154 if (file .getName ().startsWith (FileSystemJsonBlobManager .BLOB_METADATA_FILE_NAME )) {
5255 return ;
5356 }
57+
5458 try {
55- filesToProcess .put (file );
56- } catch (InterruptedException e ) {
57- log .warn ("Interrupted while trying to add file to be processed at {}" , file .getAbsolutePath (), e );
59+ log .debug ("Processing {}" , file .getAbsolutePath ());
60+ String blobId = file .getName ().split ("\\ ." , 2 )[0 ];
61+ File metadataFile = fileSystemJsonBlobManager .getMetaDataFile (file .getParentFile ());
62+
63+ if (file .equals (metadataFile )) {
64+ return ;
65+ }
66+
67+ BlobMetadataContainer metadataContainer = metadataFile .exists () ? om .readValue (fileSystemJsonBlobManager .readFile (metadataFile ), BlobMetadataContainer .class ) : new BlobMetadataContainer ();
68+
69+ Optional <DateTime > lastAccessed = fileSystemJsonBlobManager .resolveTimestamp (blobId );
70+ if (metadataContainer .getLastAccessedByBlobId ().containsKey (blobId )) {
71+ lastAccessed = Optional .of (metadataContainer .getLastAccessedByBlobId ().get (blobId ));
72+ }
73+
74+ if (!lastAccessed .isPresent ()) {
75+ log .warn ("Couldn't get last accessed timestamp for blob {}" , blobId );
76+ return ;
77+ }
78+
79+ log .debug ("Blob {} was last accessed {}" , blobId , lastAccessed .get ());
80+
81+ if (lastAccessed .get ().plusMillis ((int ) blobAccessTtl .toMilliseconds ()).isBefore (DateTime .now ())) {
82+ if (file .delete ()) {
83+ log .info ("Blob {} hasn't been accessed in {} (last accessed {}), so it's going to be deleted" , blobId , blobAccessTtl , lastAccessed .get ());
84+ }
85+ }
86+ } catch (JsonParseException e ) {
87+ log .warn ("Couldn't parse JSON from BlobMetadataContainer" , e );
88+ } catch (JsonMappingException e ) {
89+ log .warn ("Couldn't map JSON from BlobMetadataContainer" , e );
90+ } catch (IOException e ) {
91+ log .warn ("Couldn't read json for BlobMetadataContainer file" , e );
5892 }
5993 });
6094
0 commit comments