Table of Contents(中文说明)
- 1. Redis-replicator
- 2. Install
- 3. Simple usage
- 4. Advanced topics
- 5. Other topics
- 6. Contributors
- 7. References
- 8. Supported by
Redis Replicator implement Redis Replication protocol written in java. It can parse, filter, broadcast the RDB and AOF events in a real time manner. It also can synchronize redis data to your local cache or to database. The following I mentioned Command which means Writable Command(e.g. set,hmset) in Redis and excludes the Readable Command(e.g. get,hmget)
479688557
jdk 1.7+
maven-3.2.3+
redis 2.6 - 4.0.x
<dependency>
<groupId>com.moilioncircle</groupId>
<artifactId>redis-replicator</artifactId>
<version>2.5.0</version>
</dependency> $mvn clean install package -Dmaven.test.skip=true
| redis version | redis-replicator version |
|---|---|
| [2.6, 4.0.x] | [2.3.0, ] |
| [2.6, 4.0-RC3] | [2.1.0, 2.2.0] |
| [2.6, 3.2.x] | [1.0.18](not supported) |
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
System.out.println(kv);
}
});
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
System.out.println(command);
}
});
replicator.open(); Replicator replicator = new RedisReplicator("redis:///path/to/dump.rdb");
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
System.out.println(kv);
}
});
replicator.open(); Replicator replicator = new RedisReplicator("redis:///path/to/appendonly.aof");
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
System.out.println(command);
}
});
replicator.open(); [RDB file][AOF tail] aof-use-rdb-preamble yes final Replicator replicator = new RedisReplicator("redis:///path/to/appendonly.aof");
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
System.out.println(kv);
}
});
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
System.out.println(command);
}
});
replicator.open();See examples
public static class YourAppendCommand implements Command {
private final String key;
private final String value;
public YourAppendCommand(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return "YourAppendCommand{" +
"key='" + key + '\'' +
", value='" + value + '\'' +
'}';
}
}
} public class YourAppendParser implements CommandParser<YourAppendCommand> {
@Override
public YourAppendCommand parse(Object[] command) {
return new YourAppendCommand(new String((byte[]) command[1], UTF_8), new String((byte[]) command[2], UTF_8));
}
} Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
replicator.addCommandParser(CommandName.name("APPEND"),new YourAppendParser()); replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
if(command instanceof YourAppendCommand){
YourAppendCommand appendCommand = (YourAppendCommand)command;
// your code goes here
}
}
});See CommandExtensionExample.java
$cd /path/to/redis-4.0-rc2/src/modules
$make loadmodule /path/to/redis-4.0-rc2/src/modules/hellotype.so public class HelloTypeModuleParser implements ModuleParser<HelloTypeModule> {
@Override
public HelloTypeModule parse(RedisInputStream in, int version) throws IOException {
DefaultRdbModuleParser parser = new DefaultRdbModuleParser(in);
int elements = parser.loadUnsigned(version).intValue();
long[] ary = new long[elements];
int i = 0;
while (elements-- > 0) {
ary[i++] = parser.loadSigned(version);
}
return new HelloTypeModule(ary);
}
}
public class HelloTypeModule implements Module {
private final long[] value;
public HelloTypeModule(long[] value) {
this.value = value;
}
public long[] getValue() {
return value;
}
@Override
public String toString() {
return "HelloTypeModule{" +
"value=" + Arrays.toString(value) +
'}';
}
} public class HelloTypeParser implements CommandParser<HelloTypeCommand> {
@Override
public HelloTypeCommand parse(Object[] command) {
String key = new String((byte[]) command[1], Constants.UTF_8);
long value = Long.parseLong(new String((byte[]) command[2], Constants.UTF_8));
return new HelloTypeCommand(key, value);
}
}
public class HelloTypeCommand implements Command {
private final String key;
private final long value;
public long getValue() {
return value;
}
public String getKey() {
return key;
}
public HelloTypeCommand(String key, long value) {
this.key = key;
this.value = value;
}
@Override
public String toString() {
return "HelloTypeCommand{" +
"key='" + key + '\'' +
", value=" + value +
'}';
}
} public static void main(String[] args) throws IOException {
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
replicator.addCommandParser(CommandName.name("hellotype.insert"), new HelloTypeParser());
replicator.addModuleParser("hellotype", 0, new HelloTypeModuleParser());
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
if (kv instanceof KeyStringValueModule) {
System.out.println(kv);
}
}
});
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
if (command instanceof HelloTypeCommand) {
System.out.println(command);
}
}
});
replicator.open();
}See ModuleExtensionExample.java
- Extends
RdbVisitor - Register your
RdbVisitortoReplicatorusingsetRdbVisitormethod.
| full resynchronization | partial resynchronization |
+-----------<--------------<-------------<----------<-----+--------------<-------<------+
| RDB | AOF | <-reconnect
connect+-->----->-------------->------------->---------->-------------------->------->---------x <-disconnect
| | | | | | | |
prefullsync auxfields... rdbs... postfullsync cmds... Before redis-replicator-2.4.0, We construct RedisReplicator like the following:
Replicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/dump.rdb", FileType.RDB, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/appendonly.aof", FileType.AOF, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/appendonly.aof", FileType.MIXED, Configuration.defaultSetting());After redis-replicator-2.4.0, We introduced a new concept(Redis URI) which simplify the constructor of RedisReplicator.
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
Replicator replicator = new RedisReplicator("redis:///path/to/dump.rdb");
Replicator replicator = new RedisReplicator("redis:///path/to/appendonly.aof");
// configuration setting example
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379?authPassword=foobared&readTimeout=10000&ssl=yes");
Replicator replicator = new RedisReplicator("redis:///path/to/dump.rdb?rateLimit=1000000");| commands | commands | commands | commands | commands | commands |
|---|---|---|---|---|---|
| PING | APPEND | SET | SETEX | MSET | DEL |
| SADD | HMSET | HSET | LSET | EXPIRE | EXPIREAT |
| GETSET | HSETNX | MSETNX | PSETEX | SETNX | SETRANGE |
| HDEL | UNLINK | SREM | LPOP | LPUSH | LPUSHX |
| LRem | RPOP | RPUSH | RPUSHX | ZREM | ZINTERSTORE |
| INCR | DECR | INCRBY | PERSIST | SELECT | FLUSHALL |
| FLUSHDB | HINCRBY | ZINCRBY | MOVE | SMOVE | BRPOPLPUSH |
| PFCOUNT | PFMERGE | SDIFFSTORE | RENAMENX | PEXPIREAT | SINTERSTORE |
| ZADD | BITFIELD | SUNIONSTORE | RESTORE | LINSERT | ZREMRANGEBYLEX |
| GEOADD | PEXPIRE | ZUNIONSTORE | EVAL | SCRIPT | ZREMRANGEBYRANK |
| PUBLISH | BITOP | SETBIT | SWAPDB | PFADD | ZREMRANGEBYSCORE |
| RENAME | MULTI | EXEC | LTRIM | RPOPLPUSH | SORT |
| EVALSHA |
- Adjust redis server setting like the following. more details please refer to redis.conf
client-output-buffer-limit slave 0 0 0WARNNING: this setting may run out of memory of redis server in some cases.
- Set log level to debug
- If you are using log4j2, add logger like the following:
<Logger name="com.moilioncircle" level="debug">
<AppenderRef ref="YourAppender"/>
</Logger> Configuration.defaultSetting().setVerbose(true);
// redis uri
"redis://127.0.0.1:6379?verbose=yes" System.setProperty("javax.net.ssl.trustStore", "/path/to/truststore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "your_type");
Configuration.defaultSetting().setSsl(true);
//optional setting
Configuration.defaultSetting().setSslSocketFactory(sslSocketFactory);
Configuration.defaultSetting().setSslParameters(sslParameters);
Configuration.defaultSetting().setHostnameVerifier(hostnameVerifier); Configuration.defaultSetting().setAuthPassword("foobared");
// redis uri
"redis://127.0.0.1:6379?authPassword=foobared"- Adjust redis server setting like the following
repl-backlog-size
repl-backlog-ttl
repl-ping-slave-periodsrepl-ping-slave-period MUST less than Configuration.getReadTimeout(), default Configuration.getReadTimeout() is 30 seconds
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
final long start = System.currentTimeMillis();
final AtomicInteger acc = new AtomicInteger(0);
replicator.addRdbListener(new RdbListener() {
@Override
public void preFullSync(Replicator replicator) {
System.out.println("pre full sync");
}
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
acc.incrementAndGet();
}
@Override
public void postFullSync(Replicator replicator, long checksum) {
long end = System.currentTimeMillis();
System.out.println("time elapsed:" + (end - start));
System.out.println("rdb event count:" + acc.get());
}
});
replicator.open();- For any
KeyValuePairtype exceptKeyStringValueModule, we can get the raw bytes. In some cases(e.g. HyperLogLog),this is very useful.
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
if (kv instanceof KeyStringValueString) {
KeyStringValueString ksvs = (KeyStringValueString) kv;
byte[] rawValue = ksvs.getRawValue();
// handle raw bytes value
} else if (kv instanceof KeyStringValueHash) {
KeyStringValueHash ksvh = (KeyStringValueHash) kv;
Map<byte[], byte[]> rawValue = ksvh.getRawValue();
// handle raw bytes value
} else {
...
}
}
});
replicator.open();For easy operation, the key of return type Map<byte[], byte[]> of KeyStringValueHash.getRawValue, we can get and put the key as value type
KeyStringValueHash ksvh = (KeyStringValueHash) kv;
Map<byte[], byte[]> rawValue = ksvh.getRawValue();
byte[] value = new byte[]{2};
rawValue.put(new byte[]{1}, value);
System.out.println(rawValue.get(new byte[]{1}) == value) //will print true Commands also support raw bytes.
SetCommand set = (SetCommand) command;
byte[] rawKey = set.getRawKey();
byte[] rawValue = set.getRawValue();According to 4.3. Write your own rdb parser, This tool built in an Iterable Rdb Parser so that handle huge key value pair.
More details please refer to:
[1] HugeKVFileExample.java
[2] HugeKVSocketExample.java
- Leon Chen
- Adrian Yao
- Trydofor
- Argun
- Sean Pan
- Special thanks to Kevin Zheng
- rdb.c
- Redis RDB File Format
- Redis Protocol specification
- Redis Replication
- Redis-replicator Design and Implementation
YourKit is kindly supporting this open source project with its full-featured Java Profiler.
YourKit, LLC is the creator of innovative and intelligent tools for profiling
Java and .NET applications. Take a look at YourKit's leading software products:
YourKit Java Profiler and
YourKit .NET Profiler.
IntelliJ IDEA is a Java integrated development environment (IDE) for developing computer software.
It is developed by JetBrains (formerly known as IntelliJ), and is available as an Apache 2 Licensed community edition,
and in a proprietary commercial edition. Both can be used for commercial development.
Redisson is Redis based In-Memory Data Grid for Java offers distributed objects and services (BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) backed by Redis server. Redisson provides more convenient and easiest way to work with Redis. Redisson objects provides a separation of concern, which allows you to keep focus on the data modeling and application logic.
