Skip to content

Commit 4232d2e

Browse files
committed
Limit the amount of file processed at a time.
1 parent cc66498 commit 4232d2e

File tree

5 files changed

+156
-127
lines changed

5 files changed

+156
-127
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package com.lowtuna.jsonblob.core;
2+
3+
import com.fasterxml.jackson.core.JsonParseException;
4+
import com.fasterxml.jackson.databind.JsonMappingException;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import com.google.common.base.Optional;
7+
import io.dropwizard.util.Duration;
8+
import lombok.RequiredArgsConstructor;
9+
import lombok.extern.slf4j.Slf4j;
10+
import org.joda.time.DateTime;
11+
12+
import java.io.File;
13+
import java.io.IOException;
14+
import java.util.concurrent.BlockingQueue;
15+
16+
/**
17+
* Created by tburch on 8/20/17.
18+
*/
19+
@Slf4j
20+
@RequiredArgsConstructor
21+
public class BlobCleanupConsumer implements Runnable {
22+
private final BlockingQueue<File> filesToProcess;
23+
private final Duration blobAccessTtl;
24+
private final FileSystemJsonBlobManager fileSystemJsonBlobManager;
25+
private final ObjectMapper om;
26+
27+
@Override
28+
public void run() {
29+
try {
30+
File file = filesToProcess.take();
31+
log.debug("Processing {}", file.getAbsolutePath());
32+
String blobId = file.getName().split("\\.", 2)[0];
33+
File metadataFile = fileSystemJsonBlobManager.getMetaDataFile(file.getParentFile());
34+
35+
if (file.equals(metadataFile)) {
36+
return;
37+
}
38+
39+
BlobMetadataContainer metadataContainer = metadataFile.exists() ? om.readValue(fileSystemJsonBlobManager.readFile(metadataFile), BlobMetadataContainer.class) : new BlobMetadataContainer();
40+
41+
Optional<DateTime> lastAccessed = fileSystemJsonBlobManager.resolveTimestamp(blobId);
42+
if (metadataContainer.getLastAccessedByBlobId().containsKey(blobId)) {
43+
lastAccessed = Optional.of(metadataContainer.getLastAccessedByBlobId().get(blobId));
44+
}
45+
46+
if (!lastAccessed.isPresent()) {
47+
log.warn("Couldn't get last accessed timestamp for blob {}", blobId);
48+
return;
49+
}
50+
51+
log.debug("Blob {} was last accessed {}", blobId, lastAccessed.get());
52+
53+
if (lastAccessed.get().plusMillis((int) blobAccessTtl.toMilliseconds()).isBefore(DateTime.now())) {
54+
if (file.delete()) {
55+
log.info("Blob {} is older than {} (last accessed {}), so it's going to be deleted", blobId, blobAccessTtl, lastAccessed.get());
56+
}
57+
}
58+
} catch (InterruptedException e) {
59+
e.printStackTrace();
60+
} catch (JsonParseException e) {
61+
e.printStackTrace();
62+
} catch (JsonMappingException e) {
63+
e.printStackTrace();
64+
} catch (IOException e) {
65+
e.printStackTrace();
66+
}
67+
}
68+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package com.lowtuna.jsonblob.core;
2+
3+
import com.google.common.base.Stopwatch;
4+
import io.dropwizard.util.Duration;
5+
import lombok.AllArgsConstructor;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.apache.commons.io.DirectoryWalker;
8+
9+
import java.io.File;
10+
import java.io.IOException;
11+
import java.nio.file.Path;
12+
import java.time.LocalDate;
13+
import java.util.Collection;
14+
import java.util.concurrent.BlockingQueue;
15+
import java.util.concurrent.TimeUnit;
16+
17+
/**
18+
* Created by tburch on 8/18/17.
19+
*/
20+
@AllArgsConstructor
21+
@Slf4j
22+
public class BlobCleanupProducer extends DirectoryWalker<Void> implements Runnable {
23+
private final Path dataDirectoryPath;
24+
private final Duration blobAccessTtl;
25+
private final BlockingQueue<File> filesToProcess;
26+
27+
@Override
28+
protected boolean handleDirectory(File directory, int depth, Collection<Void> results) throws IOException {
29+
if (directory.listFiles() != null && directory.listFiles().length == 0) {
30+
if (directory.delete()) log.info("{} has no files, so it's being deleted", directory.getAbsolutePath());
31+
return false;
32+
}
33+
34+
if (directory.listFiles().length == 1) {
35+
if (directory.listFiles()[0].getName().startsWith(FileSystemJsonBlobManager.BLOB_METADATA_FILE_NAME)) {
36+
if (directory.delete()) log.info("{} has only a metadata file, so it's being deleted", directory.getAbsolutePath());
37+
return false;
38+
}
39+
}
40+
41+
boolean process = true;
42+
if (isDataDir(directory.getAbsolutePath())) {
43+
String[] dateParts = directory.getAbsolutePath().replace(dataDirectoryPath.toFile().getAbsolutePath(), "").split("/", 4);
44+
LocalDate localDate = LocalDate.of(Integer.parseInt(dateParts[1]), Integer.parseInt(dateParts[2]), Integer.parseInt(dateParts[3]));
45+
process = localDate.isBefore(LocalDate.now().minusDays(blobAccessTtl.toDays()));
46+
if (process) {
47+
log.info("Processing {} with {} blobs for un-accessed blobs", directory.getAbsolutePath(), directory.listFiles().length - 1);
48+
for (File file: directory.listFiles()) {
49+
try {
50+
filesToProcess.put(file);
51+
} catch (InterruptedException e) {
52+
log.warn("Interrupted while trying to add file to be processed at {}", file.getAbsolutePath(), e);
53+
}
54+
}
55+
process = false;
56+
}
57+
}
58+
59+
return process;
60+
}
61+
62+
private boolean isDataDir(String path) {
63+
return path.replace(dataDirectoryPath.toFile().getAbsolutePath(), "").split("/").length == 4;
64+
}
65+
66+
@Override
67+
public void run() {
68+
Stopwatch stopwatch = new Stopwatch().start();
69+
try {
70+
walk(dataDirectoryPath.toFile(), null);
71+
log.info("Completed cleaning up un-accessed blobs in {}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
72+
} catch (Exception e) {
73+
e.printStackTrace();
74+
}
75+
}
76+
77+
}

src/main/java/com/lowtuna/jsonblob/core/BlobDataDirectoryCleaner.java

Lines changed: 0 additions & 123 deletions
This file was deleted.

src/main/java/com/lowtuna/jsonblob/core/FileSystemJsonBlobManager.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import java.util.Map;
3232
import java.util.TimeZone;
3333
import java.util.UUID;
34+
import java.util.concurrent.ArrayBlockingQueue;
35+
import java.util.concurrent.BlockingQueue;
3436
import java.util.concurrent.ConcurrentMap;
3537
import java.util.concurrent.ScheduledExecutorService;
3638
import java.util.concurrent.TimeUnit;
@@ -45,8 +47,9 @@
4547
*/
4648
@Slf4j
4749
public class FileSystemJsonBlobManager implements JsonBlobManager, Runnable, Managed {
48-
public static final String BLOB_METADATA_FILE_NAME = "blobMetadata";
50+
static final String BLOB_METADATA_FILE_NAME = "blobMetadata";
4951

52+
private static final int CONSUMER_COUNT = 7;
5053
private static final DateTimeFormatter DIRECTORY_FORMAT = DateTimeFormat.forPattern("yyyy/MM/dd");
5154

5255
@GuardedBy("lastAccessedLock")
@@ -308,8 +311,12 @@ public void start() throws Exception {
308311
scheduledExecutorService.scheduleWithFixedDelay(this, 1, 1, TimeUnit.MINUTES);
309312

310313
log.info("Scheduling blob cleanup job");
311-
BlobDataDirectoryCleaner dataDirectoryCleaner = new BlobDataDirectoryCleaner(blobDataDirectory.toPath(), blobAccessTtl, this,objectMapper);
314+
BlockingQueue<File> filesToProcess = new ArrayBlockingQueue<>(1024);
315+
for (int i = 0; i < CONSUMER_COUNT; i++) {
316+
cleanupScheduledExecutorService.scheduleAtFixedRate(new BlobCleanupConsumer(filesToProcess, blobAccessTtl, this, objectMapper), 0, 250, TimeUnit.MILLISECONDS);
317+
}
312318

319+
BlobCleanupProducer dataDirectoryCleaner = new BlobCleanupProducer(blobDataDirectory.toPath(), blobAccessTtl, filesToProcess);
313320
cleanupScheduledExecutorService.scheduleWithFixedDelay(dataDirectoryCleaner, 0, 1, TimeUnit.DAYS);
314321
}
315322

src/test/java/com/lowtuna/jsonblob/core/BlobCleanupJobTest.java renamed to src/test/java/com/lowtuna/jsonblob/core/TryBlobCleanupJob.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
* Created by tburch on 8/16/17.
2121
*/
2222
@Log
23-
public class BlobCleanupJobTest {
23+
public class TryBlobCleanupJob {
2424

2525
private static final File TEMP;
2626
static {
@@ -36,7 +36,7 @@ public class BlobCleanupJobTest {
3636

3737
@Before
3838
public void initBlobManage() {
39-
this.blobManager = new FileSystemJsonBlobManager(TEMP, Executors.newSingleThreadScheduledExecutor(), Executors.newSingleThreadScheduledExecutor(), new ObjectMapper(), blobTtl, true);
39+
this.blobManager = new FileSystemJsonBlobManager(TEMP, Executors.newSingleThreadScheduledExecutor(), Executors.newScheduledThreadPool(10), new ObjectMapper(), blobTtl, true);
4040
}
4141

4242
@Test

0 commit comments

Comments
 (0)