Skip to content

Commit 34c8860

Browse files
committed
Add item reader for Redis
Resolves #4446
1 parent 854117d commit 34c8860

File tree

4 files changed

+259
-0
lines changed

4 files changed

+259
-0
lines changed
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.redis;
17+
18+
import org.springframework.batch.item.ExecutionContext;
19+
import org.springframework.batch.item.ItemStreamException;
20+
import org.springframework.batch.item.ItemStreamReader;
21+
import org.springframework.data.redis.core.Cursor;
22+
import org.springframework.data.redis.core.RedisTemplate;
23+
import org.springframework.data.redis.core.ScanOptions;
24+
import org.springframework.util.Assert;
25+
26+
/**
27+
* Item reader for Redis based on Spring Data Redis. Uses a {@link RedisTemplate} to query
28+
* data. The user should provide a {@link ScanOptions} to specify the set of keys to
29+
* query.
30+
*
31+
* <p>
32+
* The implementation is not thread-safe and not restartable.
33+
* </p>
34+
*
35+
* @author Mahmoud Ben Hassine
36+
* @since 5.1
37+
* @param <K> type of keys
38+
* @param <V> type of values
39+
*/
40+
public class RedisItemReader<K, V> implements ItemStreamReader<V> {
41+
42+
private final RedisTemplate<K, V> redisTemplate;
43+
44+
private final ScanOptions scanOptions;
45+
46+
private Cursor<K> cursor;
47+
48+
public RedisItemReader(RedisTemplate<K, V> redisTemplate, ScanOptions scanOptions) {
49+
Assert.notNull(redisTemplate, "redisTemplate must not be null");
50+
Assert.notNull(scanOptions, "scanOptions must no be null");
51+
this.redisTemplate = redisTemplate;
52+
this.scanOptions = scanOptions;
53+
}
54+
55+
@Override
56+
public void open(ExecutionContext executionContext) throws ItemStreamException {
57+
this.cursor = this.redisTemplate.scan(this.scanOptions);
58+
}
59+
60+
@Override
61+
public V read() throws Exception {
62+
if (this.cursor.hasNext()) {
63+
K nextKey = this.cursor.next();
64+
return this.redisTemplate.opsForValue().get(nextKey);
65+
}
66+
else {
67+
return null;
68+
}
69+
}
70+
71+
@Override
72+
public void close() throws ItemStreamException {
73+
this.cursor.close();
74+
}
75+
76+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.redis.builder;
17+
18+
import org.springframework.batch.item.redis.RedisItemReader;
19+
import org.springframework.data.redis.core.RedisTemplate;
20+
import org.springframework.data.redis.core.ScanOptions;
21+
22+
/**
23+
* Builder for {@link RedisItemReader}.
24+
*
25+
* @author Mahmoud Ben Hassine
26+
* @since 5.1
27+
* @param <K> type of keys
28+
* @param <V> type of values
29+
*/
30+
public class RedisItemReaderBuilder<K, V> {
31+
32+
private RedisTemplate<K, V> redisTemplate;
33+
34+
private ScanOptions scanOptions;
35+
36+
/**
37+
* Set the {@link RedisTemplate} to use in the reader.
38+
* @param redisTemplate the template to use
39+
* @return the current builder instance for fluent chaining
40+
*/
41+
public RedisItemReaderBuilder<K, V> redisTemplate(RedisTemplate<K, V> redisTemplate) {
42+
this.redisTemplate = redisTemplate;
43+
return this;
44+
}
45+
46+
/**
47+
* Set the {@link ScanOptions} to select the key set.
48+
* @param scanOptions the scan option to use
49+
* @return the current builder instance for fluent chaining
50+
*/
51+
public RedisItemReaderBuilder<K, V> scanOptions(ScanOptions scanOptions) {
52+
this.scanOptions = scanOptions;
53+
return this;
54+
}
55+
56+
/**
57+
* Build a new {@link RedisItemReader}.
58+
* @return a new item reader
59+
*/
60+
public RedisItemReader<K, V> build() {
61+
return new RedisItemReader<>(this.redisTemplate, this.scanOptions);
62+
}
63+
64+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.redis;
17+
18+
import org.junit.jupiter.api.Assertions;
19+
import org.junit.jupiter.api.Test;
20+
import org.junit.jupiter.api.extension.ExtendWith;
21+
import org.mockito.Answers;
22+
import org.mockito.Mock;
23+
import org.mockito.Mockito;
24+
import org.mockito.junit.jupiter.MockitoExtension;
25+
26+
import org.springframework.batch.item.ExecutionContext;
27+
import org.springframework.data.redis.core.Cursor;
28+
import org.springframework.data.redis.core.RedisTemplate;
29+
import org.springframework.data.redis.core.ScanOptions;
30+
31+
@ExtendWith(MockitoExtension.class)
32+
public class RedisItemReaderTests {
33+
34+
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
35+
private RedisTemplate<String, String> redisTemplate;
36+
37+
@Mock
38+
private ScanOptions scanOptions;
39+
40+
@Mock
41+
private Cursor<String> cursor;
42+
43+
@Test
44+
void testRead() throws Exception {
45+
// given
46+
Mockito.when(this.redisTemplate.scan(this.scanOptions)).thenReturn(this.cursor);
47+
Mockito.when(this.cursor.hasNext()).thenReturn(true, true, false);
48+
Mockito.when(this.cursor.next()).thenReturn("person:1", "person:2");
49+
Mockito.when(this.redisTemplate.opsForValue().get("person:1")).thenReturn("foo");
50+
Mockito.when(this.redisTemplate.opsForValue().get("person:2")).thenReturn("bar");
51+
RedisItemReader<String, String> redisItemReader = new RedisItemReader<>(this.redisTemplate, this.scanOptions);
52+
redisItemReader.open(new ExecutionContext());
53+
54+
// when
55+
String item1 = redisItemReader.read();
56+
String item2 = redisItemReader.read();
57+
String item3 = redisItemReader.read();
58+
59+
// then
60+
Assertions.assertEquals("foo", item1);
61+
Assertions.assertEquals("bar", item2);
62+
Assertions.assertNull(item3);
63+
}
64+
65+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.redis.builder;
17+
18+
import org.junit.jupiter.api.Test;
19+
20+
import org.springframework.batch.item.redis.RedisItemReader;
21+
import org.springframework.data.redis.core.RedisTemplate;
22+
import org.springframework.data.redis.core.ScanOptions;
23+
import org.springframework.test.util.ReflectionTestUtils;
24+
25+
import static org.junit.jupiter.api.Assertions.assertEquals;
26+
import static org.junit.jupiter.api.Assertions.assertNotNull;
27+
import static org.mockito.Mockito.mock;
28+
29+
/**
30+
* Test class for {@link RedisItemReaderBuilder}.
31+
*
32+
* @author Mahmoud Ben Hassine
33+
*/
34+
public class RedisItemReaderBuilderTests {
35+
36+
@Test
37+
void testRedisItemReaderCreation() {
38+
// given
39+
RedisTemplate<String, String> redisTemplate = mock();
40+
ScanOptions scanOptions = mock();
41+
42+
// when
43+
RedisItemReader<String, String> reader = new RedisItemReaderBuilder<String, String>()
44+
.redisTemplate(redisTemplate)
45+
.scanOptions(scanOptions)
46+
.build();
47+
48+
// then
49+
assertNotNull(reader);
50+
assertEquals(redisTemplate, ReflectionTestUtils.getField(reader, "redisTemplate"));
51+
assertEquals(scanOptions, ReflectionTestUtils.getField(reader, "scanOptions"));
52+
}
53+
54+
}

0 commit comments

Comments
 (0)