Expand Pipeline interface with methods for fire'n'forget pattern
Discussion started at: https://gist.github.com/dgomesbr/5dc3e2d5bb4fcc90f82cb94fe6fd6561
Expected behavior
While on pipeline mode we have to periodically sync() because Jedis creates response<Future<T>> and when sync happens it's just those futures getting called back. When we're trying to bulk load millions or even billions of keys into Redis the throughtput approach would be:
- Open a Pipeline
- Loop calling setOps
- Close Pipeline
To do that, parts of the Pipeline should be changed to not call the get on those future, instead just submitting then to the executor pool. I've traced the responses being populated within Connection:267
Actual behavior
Trying to load a 30kk buffered input stream through Jedis client will result in OOM because the expected behavior of calling Response.get() to add it to the clients so a guard clause and periodic sync() calls are needed.
Steps to reproduce:
- Prepare a file with 30 million more lines (eg. LOAD.txt)
- feed into a redis pipeline loop
Split the LOAD file into small 10kk chunks.
split -l10000000 LOAD.txt
### will result in xaa, xab, xac, xad etc.
Feed into the java program (single-threaded)
java
-Dloader.input.src=/tmp/redis/loader
-Dloader.redis.ip=127.0.0.1
-Dloader.redis.port=6379
-Xms512m
-Xmx512m
-Djava.net.preferIPv4Stack=true
-XX:+UseG1GC
-XX:MaxGCPauseMillis=250
-jar redis-loader.jar
public class FileUtils {
public static List<Path> listFiles(Path path, String pattern) throws IOException {
Pattern r = Pattern.compile(pattern);
final List<Path> files = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
for (Path entry : stream) {
if (Files.isDirectory(entry)) {
listFiles(entry, pattern);
}
if (r.matcher(entry.getFileName().toString()).matches()) {
files.add(entry);
}
}
}
return files;
}
}
@SpringBootApplication
public class RedisLoaderApplication implements CommandLineRunner {
public static final int CHUNK_SIZE = 64 * 1000 * 1000;
public static final int SYNC_CHECKPOINT = 350_000;
public static final int REDIS_TIMEOUT = 30 * 1000;
/**
* Default value for keys inserted
*/
public static String DEFAULT_VALUE = "1";
private static Logger LOGGER = org.slf4j.LoggerFactory.getLogger(RedisLoaderApplication.class);
/**
* Used for syncing operations with Jedis
*/
public AtomicInteger counter = new AtomicInteger(0);
/**
* To be used along with inputFile
*/
@Value("${loader.input.pattern:(^x.*)}")
String inputFilePattern;
@Value("${loader.redis.ip:127.0.0.1}")
private String redisServerHost;
@Value("${loader.redis.port:6379}")
private int redisServerPort;
@Value("${loader.input.src}")
private String inputFile;
/**
* Always wipe data before insertion
*/
@Value("${loader.flush.all:true}")
private boolean isFlushEnabled;
public static void main(String[] args) {
SpringApplication.run(RedisLoaderApplication.class, args);
}
JedisPool jedisPool() {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setTestOnBorrow(true);
poolConfig.setTestWhileIdle(true);
poolConfig.setMaxWaitMillis(REDIS_TIMEOUT);
poolConfig.setMaxTotal(1);
return new JedisPool(poolConfig, redisServerHost, redisServerPort, defaultTimeout);
}
private void runWithJedis(Pipeline p, BufferedReader br) {
br.lines().forEach(key -> {
p.setnx(key, DEFAULT_VALUE);
if ((counter.incrementAndGet() % SYNC_CHECKPOINT) == 0) {
p.sync();
}
});
p.sync(); //Rest of the unsync'd
}
@Override
public void run(String... args) throws Exception {
LOGGER.info("Redis Loader started");
long start = System.nanoTime();
List<Path> files = FileUtils.listFiles(Paths.get(inputFile), inputFilePattern);
if (!files.isEmpty()) {
JedisPool pool = jedisPool();
try (Jedis jedis = pool.getResource()) {
Pipeline pipeline = jedis.pipelined();
if (isFlushEnabled) {
jedis.flushAll();
}
for (Path p : files) {
long startfile = System.nanoTime();
BufferedReader br = new BufferedReader(
new InputStreamReader(
new FileInputStream(p.toFile()), StandardCharsets.UTF_8
),
CHUNK_SIZE
);
runWithJedis(pipeline, br);
final long delta = TimeUnit.SECONDS.convert(System.nanoTime() - startfile, TimeUnit.NANOSECONDS);
LOGGER.info("File ({} - {} lines read) (took {}s)", p.toString(), counter.get(), delta);
counter.set(0);
}
}
} else {
LOGGER.warn("Can't find files to process. Nothing to do. Exiting...");
}
LOGGER.info("Redis Loader finished (took {}s)", TimeUnit.SECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
}
}
Redis / Jedis Configuration
default
Jedis version:
2.8.1
Redis version:
3.0.7
Java version:
1.8.25+
This issue is marked stale. It will be closed in 30 days if it is not updated.
I just read this notification, it took me on a ride! 8 years ago :D @sazzad16 hopefully still relevant.
Hi,
We are encountering a similar issue where the memory is filled with Jedis Rsults. Is there a process to upvote the feature?
@jamiguet Regularly calling sync() (and not keeping references of the Response objects) should've solved the issues. Are you sure of calling sync()?