2222#include " cachelib/common/PeriodicWorker.h"
2323#include " cachelib/common/Serialization.h"
2424#include " cachelib/experimental/objcache/Allocator.h"
25+ #include " cachelib/experimental/objcache/Persistence.h"
2526
2627#pragma GCC diagnostic push
2728#pragma GCC diagnostic ignored "-Wconversion"
@@ -250,18 +251,40 @@ class ObjectCacheConfig {
250251 // TODO: add comments for persistence
251252 using SerializationCallback = typename ObjectCache::SerializationCallback;
252253 using DeserializationCallback = typename ObjectCache::DeserializationCallback;
253- ObjectCacheConfig& enablePersistence (SerializationCallback scb,
254- DeserializationCallback dcb) {
254+ ObjectCacheConfig& enablePersistence (
255+ uint32_t persistorRestorerThreadCount,
256+ uint32_t restorerTimeOutDurationInSec,
257+ std::string persistFullPathFile,
258+ SerializationCallback scb,
259+ DeserializationCallback dcb,
260+ uint32_t persistorQueueBatchSize = 1000 ) {
255261 serializationCallback_ = std::move (scb);
256262 deserializationCallback_ = std::move (dcb);
263+ persistorRestorerThreadCount_ = persistorRestorerThreadCount;
264+ restorerTimeOutDurationInSec_ = restorerTimeOutDurationInSec;
265+ persistFullPathFile_ = std::move (persistFullPathFile);
266+ persistorQueueBatchSize_ = persistorQueueBatchSize;
257267 return *this ;
258268 }
269+
259270 const SerializationCallback& getSerializationCallback () const {
260271 return serializationCallback_;
261272 }
262273 const DeserializationCallback& getDeserializationCallback () const {
263274 return deserializationCallback_;
264275 }
276+ [[nodiscard]] uint32_t getPersistorRestorerThreadCount () const {
277+ return persistorRestorerThreadCount_;
278+ }
279+ [[nodiscard]] uint32_t getRestorerTimeOut () const {
280+ return restorerTimeOutDurationInSec_;
281+ }
282+ [[nodiscard]] const std::string& getPersistFullPathFile () const {
283+ return persistFullPathFile_;
284+ }
285+ [[nodiscard]] uint32_t getPersistorQueueBatchSize () const {
286+ return persistorQueueBatchSize_;
287+ }
265288
266289 private:
267290 CacheAllocatorConfig cacheAllocatorConfig_;
@@ -275,6 +298,10 @@ class ObjectCacheConfig {
275298
276299 SerializationCallback serializationCallback_;
277300 DeserializationCallback deserializationCallback_;
301+ uint32_t persistorRestorerThreadCount_;
302+ uint32_t restorerTimeOutDurationInSec_;
303+ std::string persistFullPathFile_;
304+ uint32_t persistorQueueBatchSize_;
278305};
279306
280307struct ObjectCacheStats {
@@ -402,6 +429,10 @@ class ObjectCache {
402429 explicit ObjectCache (Config config) : ObjectCache(createCache(config), true) {
403430 serializationCallback_ = config.getSerializationCallback ();
404431 deserializationCallback_ = config.getDeserializationCallback ();
432+ persistorRestorerThreadCount_ = config.getPersistorRestorerThreadCount ();
433+ restorerTimeOutDurationInSec_ = config.getRestorerTimeOut ();
434+ persistFullPathFile_ = config.getPersistFullPathFile ();
435+ persistorQueueBatchSize_ = config.getPersistorQueueBatchSize ();
405436
406437 if (config.getCompactionCallback ()) {
407438 compactionWorker_ =
@@ -436,61 +467,20 @@ class ObjectCache {
436467 // Get the underlying CacheAllocator
437468 CacheAlloc& getCacheAlloc () { return *cache_; }
438469
439- void persist (RecordWriter& rw ) {
470+ void persist () {
440471 XDCHECK (serializationCallback_);
441- for (auto it = cache_->begin (); it != cache_->end (); ++it) {
442- auto iobuf = serializationCallback_ (it->getKey (), it->getMemory ());
443- if (!iobuf) {
444- XLOG (ERR) << " Failed to serialize for key: " << it->getKey ();
445- continue ;
446- }
447- serialization::Item item;
448- item.poolId ().value () = cache_->getAllocInfo (it->getMemory ()).poolId ;
449- // TODO: we need to actually recover creation and persistence as well
450- // we need to modify the create allocator resource logic to allow
451- // us to pass in creation and expiry times.
452- item.creationTime ().value () = it->getCreationTime ();
453- item.expiryTime ().value () = it->getExpiryTime ();
454- item.key ().value () = it->getKey ().str ();
455- item.payload ().value ().resize (iobuf->length ());
456- std::memcpy (item.payload ().value ().data (), iobuf->data (),
457- iobuf->length ());
458- rw.writeRecord (Serializer::serializeToIOBuf (item));
459- }
472+ ObjectCachePersistor<ObjectCache> persistor (
473+ persistorRestorerThreadCount_, serializationCallback_, *this ,
474+ persistFullPathFile_, persistorQueueBatchSize_);
475+ persistor.run ();
460476 }
461477
462- void recover (RecordReader& rr, uint32_t timeOutDurationInSec = 0 ) {
478+ void recover () {
463479 XDCHECK (deserializationCallback_);
464- auto recoveryStartTime = std::chrono::system_clock::now ();
465- uint32_t timeElapsedInSec = 0 ;
466- while (!rr.isEnd ()) {
467- auto iobuf = rr.readRecord ();
468- XDCHECK (iobuf);
469- Deserializer deserializer (iobuf->data (), iobuf->data () + iobuf->length ());
470- auto item = deserializer.deserialize <serialization::Item>();
471-
472- auto hdl = deserializationCallback_ (
473- item.poolId ().value (), item.key ().value (), item.payload ().value (),
474- item.creationTime ().value (), item.expiryTime ().value (), *this );
475- if (!hdl) {
476- XLOG (ERR) << " Failed to deserialize for key: " << item.key ().value ();
477- continue ;
478- }
479- cache_->insertOrReplace (hdl);
480- timeElapsedInSec =
481- timeOutDurationInSec > 0
482- ? std::chrono::duration_cast<std::chrono::seconds>(
483- std::chrono::system_clock::now () - recoveryStartTime)
484- .count ()
485- : 0 ;
486- if (timeElapsedInSec > timeOutDurationInSec) {
487- XLOG (INFO) << " Recover timed out and couldn't finish completely, "
488- " timeOutDurationInSec = "
489- << timeOutDurationInSec
490- << " , timeElapsedInSec = " << timeElapsedInSec;
491- break ;
492- }
493- }
480+ ObjectCacheRestorer<ObjectCache> restorer (persistorRestorerThreadCount_,
481+ deserializationCallback_, *this ,
482+ persistFullPathFile_, 0 );
483+ restorer.run ();
494484 }
495485
496486 // Create a new object backed by cachelib-memory. This behaves similar to
@@ -697,6 +687,10 @@ class ObjectCache {
697687 std::unique_ptr<CompactionWorker> compactionWorker_;
698688 SerializationCallback serializationCallback_;
699689 DeserializationCallback deserializationCallback_;
690+ uint32_t persistorRestorerThreadCount_;
691+ uint32_t restorerTimeOutDurationInSec_;
692+ std::string persistFullPathFile_;
693+ uint32_t persistorQueueBatchSize_;
700694};
701695} // namespace objcache
702696} // namespace cachelib
0 commit comments