CompletableFuture example in Java 8

Lime5005
5 min readDec 4, 2021

I’ve just learned CompletableFuture from the book “Java 8 in Action”, to solve the problem that I encountered in my study project. I want to share the code here because it’s been so well explained in the book.

First, let’s create a repository called shop. Then 3 classes: SuperPriceShop, Discount, Quote, and of course, there is a main function, you can put it in the SuperPriceShop class to simplify the task.

So the deal is that:

1, My app: My service is to get all the prices when I search for a product from all the shops, how to get all prices from a lot of shops in a list and show it to my clients? Sometimes the shops give me the results very slowly.

2, Shop: I’m selling things that I import from other countries, the price is changing every day, so I have to get the newest price each time from my providers.

3, Discount: I’m just waiting and calculating the final discounted price from every shop with a discount code and the original price that I received and then sending it back.

4, Quote: Shops are changing their discount percentage every day, I’m helping the discount to receive the newest code, and send it to Discount to calculate.

In the SuperPriceShop, the code is as below:

public class SuperPriceShop {
private static final Random random = new Random();
private String name;
public SuperPriceShop(String name) {
this.name = name;
}

public String getName() {
return name;
}

static List<SuperPriceShop> shops = Arrays.asList(
new SuperPriceShop("BestPrice"),
new SuperPriceShop("LetsSaveBig"),
new SuperPriceShop("MyFavoriteShop"),
new SuperPriceShop("BuyItAll"),
new SuperPriceShop("BeMyGuest"),
new SuperPriceShop("YourLastChance"),
new SuperPriceShop("TheOnlyOne"));

public static List<String> findPrices(String product) {
return shops.stream()
.map(shop -> shop.getStringPrice(product)) // String format info, but slow.
.map(Quote::parse) // To Quote object
.map(Discount::applyDiscount) // Generate final output, but also slow.
.collect(toList());
}

public static List<String> findPricesAsync(String product) {​
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getStringPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(
quote -> CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor
)))
.collect(toList());

return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}

public static Stream<CompletableFuture<String>> findPricesStream(String product) {
return shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getStringPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor)));

}

public String getStringPrice(String product) {
double price = calculatePrice(product);
Discount.Code code = Discount.Code.values()[
random.nextInt(Discount.Code.values().length)];
return String.format("%s:%.2f:%s", name, price, code);
}

// Imagine the pricing-service is very slow, provided by someone else:
private static double calculatePrice(String product) {
randomDelay(); // Things out of control, and your clients want a response ASAP.
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}

public static void randomDelay() {
int delay = 500 + random.nextInt(2000);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private static final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 50),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
}

In class Quote :

/**
* Get the String format info from the shop and return a Quote object
* with the SAME info, why? because the business change their discount from time to time,
* so this step must be implemented into each client request.
* It's basically a discount service.
*/
public class Quote {

private final String shopName;
private final double price;
private final Discount.Code discountCode;

public String getShopName() { return shopName; }
public double getPrice() { return price; }
public Discount.Code getDiscountCode() { return discountCode; }

public Quote(String shopName, double price, Discount.Code code) {
this.shopName = shopName;
this.price = price;
this.discountCode = code;
}

// From "BeTheBest:109.22:GOLD(name:price.2f:code)" to a Quote object.
public static Quote parse(String stringFormat) {
String[] split = stringFormat.split(":");
String shopName = split[0];
double price = Double.parseDouble(split[1]);
Discount.Code discountCode = Discount.Code.valueOf(split[2]);
return new Quote(shopName, price, discountCode);
}

In class Discount :

import java.util.concurrent.TimeUnit;

/**
* The discount can be changed by shop owners from time to time.
*/
public class Discount {

public enum Code {
NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
private final int percentage;

Code(int percentage) {
this.percentage = percentage;
}
}

public static String applyDiscount(Quote quote) {
return quote.getShopName() + " price is " +
Discount.apply(quote.getPrice(), quote.getDiscountCode());
}

// Calculate the final price with discount percentage:
private static double apply(double price, Code code) {
/* Imagine the calculation service is very busy: */
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return (price * (100 - code.percentage) / 100);
}

}

In the main function, test all the different ways of providing the service, either with synchronous streams or with asynchronous streams. You can conclude that with asynchronous function it is like 7 times faster (4272 msecs vs 28180 msecs), as I’ve listed the results below.

public static void main(String[] args) {
long start = System.nanoTime();
// 1, First way with plain stream.
/*System.out.println(getStringPrice("Favorite"));*/

//2, Second way with async, and get a result.
/*System.out.println(findPricesAsync("Favorite"));*/

//3, Third way, collect the prices in the end.
/*CompletableFuture[] futures = findPricesStream("myPhone")
.map(f -> f.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();*/

//4, Last way, show clients some prices as soon as possible.
CompletableFuture[] futures = findPricesStream("myPhone27S")
.map(future -> future.thenAccept(
str -> System.out.println(str + " (done in " +
((System.nanoTime() - start) / 1_000_000) + " msecs)")))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();

System.out.println("All shops have now responded in "
+ ((System.nanoTime() - start) / 1_000_000) + " msecs");

long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Done in " + duration + " msecs");
}
//And the results you may receive:
//1, Find one result.
//BeTheBest:109.22:GOLD
//Done in 1027 msecs

//2, With plain stream(), 4 shops.
//Done in 16136 msecs

//3, With plain stream(), 7 shops.
//Done in 28180 msecs

//4, With composed Future: it shows the prices provided by the different shops only when all of them are available.
//Done in 4272 msecs

//5, With refactoring for random delay:display the price for a given shop as soon as it becomes available, without waiting for the slowest one (which perhaps even times out).
//BuyItAll price is 156.598 (done in 3682 msecs)
//BestPrice price is 153.05450000000002 (done in 4193 msecs)
//BeMyGuest price is 171.42 (done in 4198 msecs)
//MyFavoriteShop price is 191.9475 (done in 4230 msecs)
//LetsSaveBig price is 177.16 (done in 4949 msecs)
//TheOnlyOne price is 132.066 (done in 5313 msecs)
//YourLastChance price is 196.6 (done in 5337 msecs)
//All shops have now responded in 5338 msecs
//Done in 5350 msecs

Some methods along with the CompletableFuture you have to know:

1, thenAccept(): This new operation simply registers an action on each CompletableFuture; this action consumes the value of the CompletableFuture as soon as it completes.

2, thenCompose(): Usually, you have another asynchronous action below to follow.

3, thenApply(): You are not waiting and running directly.

4, allOf(): The allOf factory method takes as input an array of CompletableFutures and returns a CompletableFuture<Void> that’s completed only when all the CompletableFutures passed have been completed. So the app can display a message to the client: “All shops returned results or timed out.”.

5, anyOf(): It takes the result of the first to respond. As a matter of detail, this method takes as input an array of CompletableFutures and returns a Completable-Future<Object> that completes with the same value as the first-to-complete CompletableFuture.

So that’s all I’ve learned, thanks for reading! 😊

--

--

Lime5005

Web developer, focus on Java, JavaScript, PHP, HTML, CSS.