I’ve just learned CompletableFuture from the book “Java 8 in Action”, to solve the problem that I encountered in my study project. I want to share the code here because it’s been so well explained in the book.
First, let’s create a repository called shop
. Then 3 classes: SuperPriceShop
, Discount
, Quote
, and of course, there is a main
function, you can put it in the SuperPriceShop
class to simplify the task.
So the deal is that:
1, My app: My service is to get all the prices when I search for a product from all the shops, how to get all prices from a lot of shops in a list and show it to my clients? Sometimes the shops give me the results very slowly.
2, Shop: I’m selling things that I import from other countries, the price is changing every day, so I have to get the newest price each time from my providers.
3, Discount: I’m just waiting and calculating the final discounted price from every shop with a discount code and the original price that I received and then sending it back.
4, Quote: Shops are changing their discount percentage every day, I’m helping the discount to receive the newest code, and send it to Discount to calculate.
In the SuperPriceShop
, the code is as below:
public class SuperPriceShop {
private static final Random random = new Random();
private String name;
public SuperPriceShop(String name) {
this.name = name;
}
public String getName() {
return name;
}
static List<SuperPriceShop> shops = Arrays.asList(
new SuperPriceShop("BestPrice"),
new SuperPriceShop("LetsSaveBig"),
new SuperPriceShop("MyFavoriteShop"),
new SuperPriceShop("BuyItAll"),
new SuperPriceShop("BeMyGuest"),
new SuperPriceShop("YourLastChance"),
new SuperPriceShop("TheOnlyOne"));
public static List<String> findPrices(String product) {
return shops.stream()
.map(shop -> shop.getStringPrice(product)) // String format info, but slow.
.map(Quote::parse) // To Quote object
.map(Discount::applyDiscount) // Generate final output, but also slow.
.collect(toList());
}
public static List<String> findPricesAsync(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getStringPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(
quote -> CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor
)))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
public static Stream<CompletableFuture<String>> findPricesStream(String product) {
return shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getStringPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor)));
}
public String getStringPrice(String product) {
double price = calculatePrice(product);
Discount.Code code = Discount.Code.values()[
random.nextInt(Discount.Code.values().length)];
return String.format("%s:%.2f:%s", name, price, code);
}
// Imagine the pricing-service is very slow, provided by someone else:
private static double calculatePrice(String product) {
randomDelay(); // Things out of control, and your clients want a response ASAP.
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
public static void randomDelay() {
int delay = 500 + random.nextInt(2000);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 50),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
}
In class Quote
:
/**
* Get the String format info from the shop and return a Quote object
* with the SAME info, why? because the business change their discount from time to time,
* so this step must be implemented into each client request.
* It's basically a discount service.
*/
public class Quote {
private final String shopName;
private final double price;
private final Discount.Code discountCode;
public String getShopName() { return shopName; }
public double getPrice() { return price; }
public Discount.Code getDiscountCode() { return discountCode; }
public Quote(String shopName, double price, Discount.Code code) {
this.shopName = shopName;
this.price = price;
this.discountCode = code;
}
// From "BeTheBest:109.22:GOLD(name:price.2f:code)" to a Quote object.
public static Quote parse(String stringFormat) {
String[] split = stringFormat.split(":");
String shopName = split[0];
double price = Double.parseDouble(split[1]);
Discount.Code discountCode = Discount.Code.valueOf(split[2]);
return new Quote(shopName, price, discountCode);
}
In class Discount
:
import java.util.concurrent.TimeUnit;
/**
* The discount can be changed by shop owners from time to time.
*/
public class Discount {
public enum Code {
NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
private final int percentage;
Code(int percentage) {
this.percentage = percentage;
}
}
public static String applyDiscount(Quote quote) {
return quote.getShopName() + " price is " +
Discount.apply(quote.getPrice(), quote.getDiscountCode());
}
// Calculate the final price with discount percentage:
private static double apply(double price, Code code) {
/* Imagine the calculation service is very busy: */
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return (price * (100 - code.percentage) / 100);
}
}
In the main
function, test all the different ways of providing the service, either with synchronous streams or with asynchronous streams. You can conclude that with asynchronous function it is like 7 times faster (4272 msecs vs 28180 msecs), as I’ve listed the results below.
public static void main(String[] args) {
long start = System.nanoTime();
// 1, First way with plain stream.
/*System.out.println(getStringPrice("Favorite"));*/
//2, Second way with async, and get a result.
/*System.out.println(findPricesAsync("Favorite"));*/
//3, Third way, collect the prices in the end.
/*CompletableFuture[] futures = findPricesStream("myPhone")
.map(f -> f.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();*/
//4, Last way, show clients some prices as soon as possible.
CompletableFuture[] futures = findPricesStream("myPhone27S")
.map(future -> future.thenAccept(
str -> System.out.println(str + " (done in " +
((System.nanoTime() - start) / 1_000_000) + " msecs)")))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
System.out.println("All shops have now responded in "
+ ((System.nanoTime() - start) / 1_000_000) + " msecs");
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Done in " + duration + " msecs");
}
//And the results you may receive:
//1, Find one result.
//BeTheBest:109.22:GOLD
//Done in 1027 msecs
//2, With plain stream(), 4 shops.
//Done in 16136 msecs
//3, With plain stream(), 7 shops.
//Done in 28180 msecs
//4, With composed Future: it shows the prices provided by the different shops only when all of them are available.
//Done in 4272 msecs
//5, With refactoring for random delay:display the price for a given shop as soon as it becomes available, without waiting for the slowest one (which perhaps even times out).
//BuyItAll price is 156.598 (done in 3682 msecs)
//BestPrice price is 153.05450000000002 (done in 4193 msecs)
//BeMyGuest price is 171.42 (done in 4198 msecs)
//MyFavoriteShop price is 191.9475 (done in 4230 msecs)
//LetsSaveBig price is 177.16 (done in 4949 msecs)
//TheOnlyOne price is 132.066 (done in 5313 msecs)
//YourLastChance price is 196.6 (done in 5337 msecs)
//All shops have now responded in 5338 msecs
//Done in 5350 msecs
Some methods along with the CompletableFuture
you have to know:
1, thenAccept()
: This new operation simply registers an action on each CompletableFuture; this action consumes the value of the CompletableFuture as soon as it completes.
2, thenCompose()
: Usually, you have another asynchronous action below to follow.
3, thenApply()
: You are not waiting and running directly.
4, allOf()
: The allOf factory method takes as input an array of CompletableFutures and returns a CompletableFuture<Void> that’s completed only when all the CompletableFutures passed have been completed. So the app can display a message to the client: “All shops returned results or timed out.”.
5, anyOf()
: It takes the result of the first to respond. As a matter of detail, this method takes as input an array of CompletableFutures and returns a Completable-Future<Object> that completes with the same value as the first-to-complete CompletableFuture.
So that’s all I’ve learned, thanks for reading! 😊