Skip to content

Commit b93dc1c

Browse files
committed
Port diskless replication to Windows
During diskless replication the master forks a child, which on posix simply inherits the socket file descriptors for the connections to the slaves. A unix pipe is also used for the child to report the results back to the master. The bulk of the porting work is in making sure that the socket file descriptors and pipe file descriptor are propagated correctly from the master to its child.
1 parent b713fda commit b93dc1c

File tree

12 files changed

+274
-11
lines changed

12 files changed

+274
-11
lines changed

src/Win32_Interop/Win32_CommandLine.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,8 @@ static RedisParamterMapper g_redisArgMap =
392392
{ "repl-ping-slave-period", &fp1 }, // repl-ping-slave-period [number]
393393
{ "repl-timeout", &fp1 }, // repl-timeout [number]
394394
{ "repl-disable-tcp-nodelay", &fp1 }, // repl-disable-tcp-nodelay [yes/no]
395+
{ "repl-diskless-sync", &fp1 }, // repl-diskless-sync [yes/no]
396+
{ "repl-diskless-sync-delay", &fp1 }, // repl-diskless-sync-delay [number]
395397
{ "repl-backlog-size", &fp1 }, // repl-backlog-size [number]
396398
{ "repl-backlog-ttl", &fp1 }, // repl-backlog-ttl [number]
397399
{ "slave-priority", &fp1 }, // slave-priority [number]

src/Win32_Interop/Win32_FDAPI.cpp

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ redis_WSASend WSASend = NULL;
4343
redis_WSARecv WSARecv = NULL;
4444
redis_WSACleanup WSACleanup = NULL;
4545
redis_WSAGetOverlappedResult WSAGetOverlappedResult = NULL;
46+
redis_WSADuplicateSocket WSADuplicateSocket = NULL;
47+
redis_WSASocket WSASocket = NULL;
4648

4749
// other API forwards
4850
redis_fwrite fdapi_fwrite = NULL;
@@ -53,8 +55,10 @@ redis_isatty isatty = NULL;
5355
redis_access access = NULL;
5456
redis_lseek64 lseek64 = NULL;
5557
redis_get_osfhandle fdapi_get_osfhandle = NULL;
58+
redis_open_osfhandle fdapi_open_osfhandle = NULL;
5659

5760
// Unix compatible FD based routines
61+
redis_pipe pipe = NULL;
5862
redis_socket socket = NULL;
5963
redis_close fdapi_close = NULL;
6064
redis_open open = NULL;
@@ -290,6 +294,21 @@ int redis_socket_impl(int af,int type,int protocol) {
290294
return rfd;
291295
}
292296

297+
int redis_pipe_impl(int *pfds) {
298+
int err = -1;
299+
try {
300+
// Not passing _O_NOINHERIT, the underlying handles are inheritable by default
301+
err = crt_pipe(pfds, 8192, _O_BINARY);
302+
if(err == 0) {
303+
pfds[0] = RFDMap::getInstance().addPosixFD(pfds[0]);
304+
pfds[1] = RFDMap::getInstance().addPosixFD(pfds[1]);
305+
}
306+
} CATCH_AND_REPORT()
307+
308+
return err;
309+
}
310+
311+
293312
// In unix a fd is a fd. All are closed with close().
294313
auto f_closesocket = dllfunctor_stdcall<int, SOCKET>("ws2_32.dll", "closesocket");
295314
int redis_close_impl(RFD rfd) {
@@ -795,6 +814,40 @@ BOOL redis_WSAGetOverlappedResult_impl(int rfd, LPWSAOVERLAPPED lpOverlapped, LP
795814
return SOCKET_ERROR;
796815
}
797816

817+
auto f_WSADuplicateSocket = dllfunctor_stdcall<int, SOCKET, DWORD, LPWSAPROTOCOL_INFO>("ws2_32.dll", "WSADuplicateSocketW");
818+
int redis_WSADuplicateSocket_impl(int rfd, DWORD dwProcessId, LPWSAPROTOCOL_INFO lpProtocolInfo) {
819+
try {
820+
SOCKET s = RFDMap::getInstance().lookupSocket( rfd );
821+
if( s != INVALID_SOCKET ) {
822+
return f_WSADuplicateSocket(s, dwProcessId, lpProtocolInfo);
823+
} else {
824+
errno = EBADF;
825+
return SOCKET_ERROR;
826+
}
827+
} CATCH_AND_REPORT();
828+
829+
return SOCKET_ERROR;
830+
}
831+
832+
auto f_WSASocket = dllfunctor_stdcall<SOCKET, int, int, int, LPWSAPROTOCOL_INFO, GROUP, DWORD>("ws2_32.dll", "WSASocketW");
833+
int redis_WSASocket_impl(int af, int type, int protocol, LPWSAPROTOCOL_INFO lpProtocolInfo, GROUP g, DWORD dwFlags) {
834+
RFD rfd = RFDMap::invalidRFD;
835+
try {
836+
SOCKET socket = f_WSASocket(af,
837+
type,
838+
protocol,
839+
lpProtocolInfo,
840+
g,
841+
dwFlags);
842+
843+
if(socket != INVALID_SOCKET) {
844+
rfd = RFDMap::getInstance().addSocket(socket);
845+
}
846+
} CATCH_AND_REPORT()
847+
848+
return rfd;
849+
}
850+
798851
int redis_WSAIoctl_impl(RFD rfd,DWORD dwIoControlCode,LPVOID lpvInBuffer,DWORD cbInBuffer,LPVOID lpvOutBuffer,DWORD cbOutBuffer,LPDWORD lpcbBytesReturned,LPWSAOVERLAPPED lpOverlapped,LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine) {
799852
try {
800853
SOCKET s = RFDMap::getInstance().lookupSocket( rfd );
@@ -1012,6 +1065,18 @@ intptr_t redis_get_osfhandle_impl(int fd) {
10121065
return -1;
10131066
}
10141067

1068+
int redis_open_osfhandle_impl(intptr_t osfhandle, int flags) {
1069+
RFD rfd = RFDMap::invalidRFD;
1070+
try {
1071+
int posixFD = crt_open_osfhandle(osfhandle, flags);
1072+
if(posixFD != -1) {
1073+
rfd = RFDMap::getInstance().addPosixFD(posixFD);
1074+
}
1075+
} CATCH_AND_REPORT()
1076+
1077+
return rfd;
1078+
}
1079+
10151080
auto f_freeaddrinfo = dllfunctor_stdcall<void, addrinfo*>("ws2_32.dll", "freeaddrinfo");
10161081
void redis_freeaddrinfo_impl(struct addrinfo *ai) {
10171082
f_freeaddrinfo(ai);
@@ -1089,6 +1154,7 @@ class Win32_FDSockMap {
10891154
Win32_FDSockMap() {
10901155
InitWinsock();
10911156

1157+
pipe = redis_pipe_impl;
10921158
socket = redis_socket_impl;
10931159
fdapi_close = redis_close_impl;
10941160
open = redis_open_impl;
@@ -1123,12 +1189,15 @@ class Win32_FDSockMap {
11231189
WSARecv = redis_WSARecv_impl;
11241190
WSACleanup = redis_WSACleanup_impl;
11251191
WSAGetOverlappedResult = redis_WSAGetOverlappedResult_impl;
1192+
WSADuplicateSocket = redis_WSADuplicateSocket_impl;
1193+
WSASocket = redis_WSASocket_impl;
11261194
select = redis_select_impl;
11271195
ntohl = redis_ntohl_impl;
11281196
isatty = redis_isatty_impl;
11291197
access = redis_access_impl;
11301198
lseek64 = redis_lseek64_impl;
11311199
fdapi_get_osfhandle = redis_get_osfhandle_impl;
1200+
fdapi_open_osfhandle = redis_open_osfhandle_impl;
11321201
freeaddrinfo = redis_freeaddrinfo_impl;
11331202
getaddrinfo = redis_getaddrinfo_impl;
11341203
inet_ntop = redis_inet_ntop_impl;

src/Win32_Interop/Win32_FDAPI.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,15 @@ typedef struct hostent* (*redis_gethostbyname)(const char *name);
149149
typedef char* (*redis_inet_ntoa)(struct in_addr in);
150150
typedef BOOL (*redis_WSAGetOverlappedResult)(int rfd,LPWSAOVERLAPPED lpOverlapped, LPDWORD lpcbTransfer, BOOL fWait, LPDWORD lpdwFlags);
151151

152+
typedef int (*redis_WSADuplicateSocket)(int rfd, DWORD dwProcessId, LPWSAPROTOCOL_INFO lpProtocolInfo);
153+
typedef int (*redis_WSASocket)(int af, int type, int protocol, LPWSAPROTOCOL_INFO lpProtocolInfo, GROUP g, DWORD dwFlags);
154+
152155
// other API forwards
153156
typedef int (*redis_setmode)(int fd,int mode);
154157
typedef size_t (*redis_fwrite)(const void * _Str, size_t _Size, size_t _Count, FILE * _File);
155158

156159
// API prototypes must match the unix implementation
160+
typedef int (*redis_pipe)(int pipefd[2]);
157161
typedef int (*redis_socket)(int af,int type,int protocol);
158162
typedef int (*redis_close)(int fd);
159163
typedef int (*redis_open)(const char * _Filename, int _OpenFlag, int flags);
@@ -186,6 +190,7 @@ typedef int (*redis_isatty)(int fd);
186190
typedef int (*redis_access)(const char *pathname, int mode);
187191
typedef u_int64 (*redis_lseek64)(int fd, u_int64 offset, int whence);
188192
typedef intptr_t (*redis_get_osfhandle)(int fd);
193+
typedef int (*redis_open_osfhandle)(intptr_t osfhandle, int flags);
189194
typedef int(*redis_FD_ISSET)(int fd, fd_set *);
190195

191196
// access() mode definitions
@@ -199,6 +204,7 @@ extern "C"
199204
#endif
200205

201206
// API replacements
207+
extern redis_pipe pipe;
202208
extern redis_socket socket;
203209
extern redis_WSASend WSASend;
204210
extern redis_WSARecv WSARecv;
@@ -207,6 +213,8 @@ extern redis_ioctlsocket ioctlsocket;
207213
extern redis_inet_addr inet_addr;
208214
extern redis_inet_ntoa inet_ntoa;
209215
extern redis_WSAGetOverlappedResult WSAGetOverlappedResult;
216+
extern redis_WSADuplicateSocket WSADuplicateSocket;
217+
extern redis_WSASocket WSASocket;
210218

211219
extern redis_close fdapi_close;
212220
extern redis_open open;
@@ -239,6 +247,7 @@ extern redis_isatty isatty;
239247
extern redis_access access;
240248
extern redis_lseek64 lseek64;
241249
extern redis_get_osfhandle fdapi_get_osfhandle;
250+
extern redis_open_osfhandle fdapi_open_osfhandle;
242251
extern redis_freeaddrinfo freeaddrinfo;
243252
extern redis_getaddrinfo getaddrinfo;
244253
extern redis_inet_ntop inet_ntop;

src/Win32_Interop/Win32_QFork.cpp

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,22 @@ BOOL QForkSlaveInit(HANDLE QForkConrolMemoryMapHandle, DWORD ParentProcessID) {
310310
g_SlaveExitCode = do_rdbSave(g_pQForkControl->globalData.filename);
311311
} else if (g_pQForkControl->typeOfOperation == OperationType::otAOF) {
312312
g_SlaveExitCode = do_aofSave(g_pQForkControl->globalData.filename);
313+
} else if (g_pQForkControl->typeOfOperation == OperationType::otSocket) {
314+
LPWSAPROTOCOL_INFO lpProtocolInfo = (LPWSAPROTOCOL_INFO) g_pQForkControl->globalData.protocolInfo;
315+
int pipe_write_fd = fdapi_open_osfhandle((intptr_t)g_pQForkControl->globalData.pipe_write_handle, _O_APPEND);
316+
for (int i = 0; i < g_pQForkControl->globalData.numfds; i++) {
317+
g_pQForkControl->globalData.fds[i] = WSASocket(FROM_PROTOCOL_INFO,
318+
FROM_PROTOCOL_INFO,
319+
FROM_PROTOCOL_INFO,
320+
&lpProtocolInfo[i],
321+
0,
322+
WSA_FLAG_OVERLAPPED);
323+
}
324+
325+
g_SlaveExitCode = do_socketSave(g_pQForkControl->globalData.fds,
326+
g_pQForkControl->globalData.numfds,
327+
g_pQForkControl->globalData.clientids,
328+
pipe_write_fd);
313329
} else {
314330
throw runtime_error("unexpected operation type");
315331
}
@@ -904,11 +920,22 @@ void CreateChildProcess(PROCESS_INFORMATION *pi, char* logfile, DWORD dwCreation
904920
g_hForkedProcess = pi->hProcess;
905921
}
906922

907-
BOOL BeginForkOperation(OperationType type, LPVOID globalData, int sizeOfGlobalData, DWORD* childPID, uint32_t dictHashSeed, char* logfile) {
923+
typedef void (*CHILD_PID_HOOK)(DWORD pid);
924+
925+
BOOL BeginForkOperation(OperationType type, LPVOID globalData, int sizeOfGlobalData, DWORD* childPID, uint32_t dictHashSeed, char* logfile, CHILD_PID_HOOK pidHook = NULL) {
908926
PROCESS_INFORMATION pi;
909927
try {
910-
CopyForkOperationData(type, globalData, sizeOfGlobalData, dictHashSeed);
911-
CreateChildProcess(&pi, logfile, 0);
928+
pi.hProcess = INVALID_HANDLE_VALUE;
929+
930+
if(pidHook != NULL) {
931+
CreateChildProcess(&pi, logfile, CREATE_SUSPENDED);
932+
pidHook(pi.dwProcessId);
933+
CopyForkOperationData(type, globalData, sizeOfGlobalData, dictHashSeed);
934+
ResumeThread(pi.hThread);
935+
} else {
936+
CopyForkOperationData(type, globalData, sizeOfGlobalData, dictHashSeed);
937+
CreateChildProcess(&pi, logfile, 0);
938+
}
912939

913940
*childPID = pi.dwProcessId;
914941
CloseHandle(pi.hThread);
@@ -932,6 +959,9 @@ BOOL BeginForkOperation(OperationType type, LPVOID globalData, int sizeOfGlobalD
932959
catch(...) {
933960
::redisLog(REDIS_WARNING, "BeginForkOperation: other exception caught.\n");
934961
}
962+
if(pi.hProcess != INVALID_HANDLE_VALUE) {
963+
TerminateProcess(pi.hProcess, 1);
964+
}
935965
return FALSE;
936966
}
937967

@@ -959,6 +989,43 @@ BOOL BeginForkOperation_Aof(
959989
return BeginForkOperation(otAOF, globalData, sizeOfGlobalData, childPID, dictHashSeed, logfile);
960990
}
961991

992+
void BeginForkOperation_Socket_PidHook(DWORD dwProcessId) {
993+
WSAPROTOCOL_INFO* protocolInfo = (WSAPROTOCOL_INFO*)dlmalloc(sizeof(WSAPROTOCOL_INFO) * g_pQForkControl->globalData.numfds);
994+
g_pQForkControl->globalData.protocolInfo = protocolInfo;
995+
for(int i = 0; i < g_pQForkControl->globalData.numfds; i++) {
996+
WSADuplicateSocket(g_pQForkControl->globalData.fds[i], dwProcessId, &protocolInfo[i]);
997+
}
998+
}
999+
1000+
BOOL BeginForkOperation_Socket(
1001+
int *fds,
1002+
int numfds,
1003+
uint64_t *clientids,
1004+
int pipe_write_fd,
1005+
LPVOID globalData,
1006+
int sizeOfGlobalData,
1007+
DWORD* childPID,
1008+
unsigned __int32 dictHashSeed,
1009+
char* logfile)
1010+
{
1011+
g_pQForkControl->globalData.fds = fds;
1012+
g_pQForkControl->globalData.numfds = numfds;
1013+
g_pQForkControl->globalData.clientids = clientids;
1014+
1015+
HANDLE pipe_write_handle = (HANDLE)_get_osfhandle(pipe_write_fd);
1016+
1017+
// The handle is already inheritable so there is no need to duplicate it
1018+
g_pQForkControl->globalData.pipe_write_handle = (pipe_write_handle);
1019+
1020+
return BeginForkOperation(otSocket,
1021+
globalData,
1022+
sizeOfGlobalData,
1023+
childPID,
1024+
dictHashSeed,
1025+
logfile,
1026+
BeginForkOperation_Socket_PidHook);
1027+
}
1028+
9621029
OperationStatus GetForkOperationStatus() {
9631030
if (WaitForSingleObject(g_pQForkControl->operationComplete, 0) == WAIT_OBJECT_0) {
9641031
return OperationStatus::osCOMPLETE;

src/Win32_Interop/Win32_QFork.h

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#pragma once
2424

2525
#include <Windows.h>
26+
#include <stdint.h>
2627

2728
#ifdef __cplusplus
2829
extern "C" {
@@ -31,7 +32,8 @@ extern "C" {
3132
typedef enum operationType {
3233
otINVALID = 0,
3334
otRDB = 1,
34-
otAOF = 2
35+
otAOF = 2,
36+
otSocket = 3
3537
} OperationType;
3638

3739
typedef enum operationStatus {
@@ -49,10 +51,15 @@ typedef enum startupStatus {
4951

5052
#define MAX_GLOBAL_DATA 10000
5153
typedef struct QForkBeginInfo {
52-
char filename[MAX_PATH];
5354
BYTE globalData[MAX_GLOBAL_DATA];
5455
size_t globalDataSize;
5556
unsigned __int32 dictHashSeed;
57+
char filename[MAX_PATH];
58+
int *fds;
59+
int numfds;
60+
uint64_t *clientids;
61+
HANDLE pipe_write_handle;
62+
LPVOID protocolInfo;
5663
} QForkStartupInfo;
5764

5865
StartupStatus QForkStartup(int argc, char** argv);
@@ -75,6 +82,17 @@ BOOL BeginForkOperation_Aof(
7582
unsigned __int32 dictHashSeed,
7683
char* logfile);
7784

85+
BOOL BeginForkOperation_Socket(
86+
int *fds,
87+
int numfds,
88+
uint64_t *clientids,
89+
int pipe_write_fd,
90+
LPVOID globalData,
91+
int sizeOfGlobalData,
92+
DWORD* childPID,
93+
unsigned __int32 dictHashSeed,
94+
char* logfile);
95+
7896
OperationStatus GetForkOperationStatus();
7997
BOOL EndForkOperation(int * pExitCode);
8098
BOOL AbortForkOperation();

0 commit comments

Comments
 (0)