I have a private Ethereum blockchain set up with 5 machines mining on it. The size of the block chain [number of blocks] are as of now, 300. The processing is done on back-end Java.
I need to run the following loop construct in a asynchronous manner. The bottleneck of the loop is during the execution of the following command:
EthBlock eb = web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send();
The command can also return a Completablefuture<EthBlock> object by ending it with supplyAsync() given here https://github.com/web3j/web3j#start-sending-requests Just calling supplyAync().get() removes the parallelism aspect and makes it behave synchronously.
public void businessLogic() throws Exception {
recentBlocks = new ArrayList<EthBlock.Block>();
for (long i = 1; i <= 300000; i++) {
EthBlock eb = web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send();
if (eb == null || eb.getBlock() == null) {
continue;
}
EthBlock.Block block = eb.getBlock();
recentBlocks.add(block);
}
}
I not able to grasp the institution of translating the code into a way CompleteableFuture can operate on. Goal is to 'group' up multiple calls to web.ethGetBlockNumber(...).supplyAync() into a collection and call them all at once to update an array which will get filled by EthBlock objects i.e recentBlocks.
This is what I came up with:
public void businessLogic() throws Exception {
recentBlocks = new ArrayList<EthBlock.Block>();
List<CompleteableFuture> compFutures = new ArrayList<>();
for (long i = 0, i <= 300000, i++){
CompleteableFuture<EthBlock> compFuture = eb3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).sendAsync();
compFuture.thenAcceptAsync(eb -> // Doesn't look right
EthBlock.Block block = eb.getBlock();
recentBlock.add(block);)
compFutures.add(compFuture);
}
CompleteableFuture.allOf(compFutures).get();
}
Implementing IntStream
long start = System.nanoTime();
recentBlocks = IntStream.rangeClosed(0, 300_000)
.parallel()
.mapToObj(i -> {
try {
System.out.println("Current Thread -> " + Thread.currentThread());
return web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
})
.filter(Objects::nonNull)
.map(EthBlock::getBlock)
.filter(Objects::nonNull)
.collect(Collectors.toList());
long stop = System.nanoTime();
System.out.println("Time Elapsed: " + TimeUnit.MICROSECONDS.convert(stop-start, TimeUnit.NANOSECONDS));