Skip to content

Commit 5ee2ccf

Browse files
committed
Diskless replication: EOF:<mark> streaming support slave side.
1 parent 43ae606 commit 5ee2ccf

File tree

1 file changed

+60
-7
lines changed

1 file changed

+60
-7
lines changed

src/replication.c

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -818,6 +818,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
818818
REDIS_NOTUSED(privdata);
819819
REDIS_NOTUSED(mask);
820820

821+
/* Static vars used to hold the EOF mark, and the last bytes received
822+
* form the server: when they match, we reached the end of the transfer. */
823+
static char eofmark[REDIS_RUN_ID_SIZE];
824+
static char lastbytes[REDIS_RUN_ID_SIZE];
825+
static int usemark = 0;
826+
821827
/* If repl_transfer_size == -1 we still have to read the bulk length
822828
* from the master reply. */
823829
if (server.repl_transfer_size == -1) {
@@ -843,23 +849,65 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
843849
redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
844850
goto error;
845851
}
846-
server.repl_transfer_size = strtol(buf+1,NULL,10);
847-
redisLog(REDIS_NOTICE,
848-
"MASTER <-> SLAVE sync: receiving %lld bytes from master",
849-
(long long) server.repl_transfer_size);
852+
853+
/* There are two possible forms for the bulk payload. One is the
854+
* usual $<count> bulk format. The other is used for diskless transfers
855+
* when the master does not know beforehand the size of the file to
856+
* transfer. In the latter case, the following format is used:
857+
*
858+
* $EOF:<40 bytes delimiter>
859+
*
860+
* At the end of the file the announced delimiter is transmitted. The
861+
* delimiter is long and random enough that the probability of a
862+
* collision with the actual file content can be ignored. */
863+
if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= REDIS_RUN_ID_SIZE) {
864+
usemark = 1;
865+
memcpy(eofmark,buf+5,REDIS_RUN_ID_SIZE);
866+
memset(lastbytes,0,REDIS_RUN_ID_SIZE);
867+
redisLog(REDIS_NOTICE,
868+
"MASTER <-> SLAVE sync: receiving streamed RDB from master");
869+
} else {
870+
usemark = 0;
871+
server.repl_transfer_size = strtol(buf+1,NULL,10);
872+
redisLog(REDIS_NOTICE,
873+
"MASTER <-> SLAVE sync: receiving %lld bytes from master",
874+
(long long) server.repl_transfer_size);
875+
}
850876
return;
851877
}
852878

853879
/* Read bulk data */
854-
left = server.repl_transfer_size - server.repl_transfer_read;
855-
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
880+
if (usemark) {
881+
left = server.repl_transfer_size - server.repl_transfer_read;
882+
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
883+
} else {
884+
readlen = sizeof(buf);
885+
}
886+
856887
nread = read(fd,buf,readlen);
857888
if (nread <= 0) {
858889
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
859890
(nread == -1) ? strerror(errno) : "connection lost");
860891
replicationAbortSyncTransfer();
861892
return;
862893
}
894+
895+
/* When a mark is used, we want to detect EOF asap in order to avoid
896+
* writing the EOF mark into the file... */
897+
int eof_reached = 0;
898+
899+
if (usemark) {
900+
/* Update the last bytes array, and check if it matches our delimiter. */
901+
if (nread >= REDIS_RUN_ID_SIZE) {
902+
memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE);
903+
} else {
904+
int rem = REDIS_RUN_ID_SIZE-nread;
905+
memmove(lastbytes,lastbytes+nread,rem);
906+
memcpy(lastbytes+rem,buf,nread);
907+
}
908+
if (memcmp(lastbytes,eofmark,REDIS_RUN_ID_SIZE) == 0) eof_reached = 1;
909+
}
910+
863911
server.repl_transfer_lastio = server.unixtime;
864912
if (write(server.repl_transfer_fd,buf,nread) != nread) {
865913
redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
@@ -881,7 +929,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
881929
}
882930

883931
/* Check if the transfer is now complete */
884-
if (server.repl_transfer_read == server.repl_transfer_size) {
932+
if (!usemark) {
933+
if (server.repl_transfer_read == server.repl_transfer_size)
934+
eof_reached = 1;
935+
}
936+
937+
if (eof_reached) {
885938
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
886939
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
887940
replicationAbortSyncTransfer();

0 commit comments

Comments
 (0)