开始
SpringBoot升级到2.0版本之后,支持了WebFlux,初次体验后记录笔记如下
WebFlux是什么
相对于SpringMVC,MVC是基于Servlet API和Servlet容器设计的。Spring WebFlux是基于Reactive Streams和Servlet3.1+容器设计的。
Reactor
RxJava 库是 JVM 上反应式编程的先驱,也是反应式流规范的基础。RxJava2在RxJava的基础上做了很多的更新。不过 RxJava 库也有其不足的地方。RxJava 产生于反应式流规范之前,虽然可以和反应式流的接口进行转换,但是由于底层实现的原因,使用起来并不是很直观。RxJava 2 在设计和实现时考虑到了与规范的整合,不过为了保持与 RxJava 的兼容性,很多地方在使用时也并不直观。Reactor 则是完全基于反应式流规范设计和实现的库,没有 RxJava 那样的历史包袱,在使用上更加的直观易懂。Reactor 也是 Spring 5 中反应式编程的基础。学习和掌握 Reactor 可以更好地理解 Spring 5 中的相关概念。
在 Java 程序中使用 Reactor 库非常的简单,只需要通过 Maven 或 Gradle 来添加对 io.projectreactor:reactor-core 的依赖即可,目前的版本是 3.0.5.RELEASE。
Flux 和 Mono
Flux 和 Mono 是 Reactor 中的两个基本概念。Flux 表示的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产生时,订阅者中对应的方法 onNext(), onComplete()和 onError()会被调用。Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。Flux 和 Mono 之间可以进行转换。对一个 Flux 序列进行计数操作,得到的结果是一个 Mono
- 创建Flux
- just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
 - fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
 - empty():创建一个不包含任何元素,只发布结束消息的序列。
 - error(Throwable error):创建一个只包含错误消息的序列。
 - never():创建一个不包含任何消息通知的序列。
 - range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
 - interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
 - intervalMillis(long period)和 intervalMillis(long delay, long period):与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。
 
 - 代码实例如下:
 
Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscribe(System.out::println);
创建Mono
- fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
 - delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
 - ignoreElements(Publisher
source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。  - justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
 
还可以通过 create()方法来使用 MonoSink 来创建 Mono。
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);
WebFlux的使用
首先,需要创建一个SpringBoot2的项目工程,并且引入WebFlux和其他需要的依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency> 
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>
配置Reactive Redis
@SpringBootConfiguration
public class RedisConfig {
    @Resource
    private RedisConnectionFactory factory;
    @Bean
    public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
        return new ReactiveRedisTemplate<>(connectionFactory, RedisSerializationContext.string());
    }
    @Bean
    public ReactiveRedisConnection connection(ReactiveRedisConnectionFactory connectionFactory) {
        return connectionFactory.getReactiveConnection();
    }
    @Bean
    ReactiveRedisOperations<String, Object> redisOperations(ReactiveRedisConnectionFactory factory) {
        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
        RedisSerializationContext.RedisSerializationContextBuilder<String, Object> builder = RedisSerializationContext
                .newSerializationContext(new StringRedisSerializer());
        RedisSerializationContext<String, Object> context = builder.value(serializer).build();
        return new ReactiveRedisTemplate<>(factory, context);
    }
    public @PreDestroy
    void flushDb() {
        factory.getConnection().flushDb();
    }
}
编写一个
RedisLoader.java类,在项目启动的时候初始化数据
@Component
public class RedisLoader {
    @Resource
    private ReactiveRedisConnectionFactory factory;
    @Resource
    private ReactiveRedisOperations<String, Object> redisOperations;
    @PostConstruct
    public void loadData() {
        factory.getReactiveConnection().serverCommands().flushAll()
                .thenMany(Flux.just("Thor", "Hulk", "Tony")
                        .map(name -> new User(UUID.randomUUID().toString().substring(0, 5), name, "123456"))
                        .flatMap(user -> redisOperations.opsForValue().set(user.getId(), user))
                ).thenMany(redisOperations.keys("*")
                .flatMap(redisOperations.opsForValue()::get))
                .subscribe(System.out::println);
    }
}
创建一个简单的User.java类,作为用户数据模型
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
    private String id;
    private String name;
    private String password;
}
定义用户数据操作接口
UserService.java
public interface UserService {
    /**
     * 用户注册
     *
     * @param id
     * @param username
     * @return
     */
    Mono<Boolean> add(String id, String username);
    /**
     * 用户登录
     *
     * @param username
     * @param password
     * @return
     */
    Mono<User> find(String username, String password);
    /**
     * 获取所有用户
     *
     * @return
     */
    Flux<User> getAll();
    Mono<Boolean> remove(String id);
}
定义接口实现类
UserServiceImpl.java
@Service
@Slf4j
public class UserServiceImpl implements UserService {
    @Resource
    private ReactiveRedisOperations<String, User> redisOperations;
    @Override
    public Mono<Boolean> add(String id, String username) {
        User user = new User();
        user.setId(id);
        user.setName(username);
        user.setPassword("123456");
        return redisOperations.opsForValue().set(id, user);
    }
    @Override
    public Mono<User> find(String username, String password) {
        return redisOperations.opsForValue().get(username);
    }
    @Override
    public Flux<User> getAll() {
        return redisOperations.keys("*")
                .flatMap(redisOperations.opsForValue()::get);
    }
    @Override
    public Mono<Boolean> remove(String id) {
        return redisOperations.opsForValue().delete(id);
    }
}
创建基于
SpringMVC的REST API
@RestController
public class UserController {
    @Resource
    private UserService userService;
    @GetMapping("/users")
    public Flux<User> all() {
        return userService.getAll();
    }
    @PostMapping("/add")
    public Mono<Boolean> register(@RequestBody User user) {
        return userService.add(user.getId(), user.getName());
    }
    @PostMapping("/find")
    public Mono find(@RequestBody User user) {
        return userService.find(user.getName(), user.getPassword());
    }
}
基于 Functional 函数式路由实现 RESTful API
@SpringBootConfiguration
public class Router {
    @Resource
    private UserHandler userHandler;
    @Bean
    public RouterFunction<?> routerFunction() {
        return RouterFunctions.route(RequestPredicates.GET("/hello"), userHandler::hello)
                .andRoute(RequestPredicates.POST("/login"), userHandler::login);
    }
}
UserHandler.java
@Service
public class UserHandler {
    private final static Logger log = LoggerFactory.getLogger(UserHandler.class);
    @Resource
    private ReactiveRedisConnection connection;
    public Mono<ServerResponse> hello(ServerRequest request) {
        return ServerResponse
                .ok()
                .contentType(MediaType.TEXT_PLAIN)
                .body(BodyInserters.fromObject("Hello, World"));
    }
    /**
     * 登录
     *
     * @param request
     * @return
     */
    public Mono<ServerResponse> login(ServerRequest request) {
        Mono<Map> body = request.bodyToMono(Map.class);
        return body.flatMap(map -> {
            String username = (String) map.get("username");
            String password = (String) map.get("password");
            log.debug("username:{},password:{}", username, password);
            return connection.stringCommands().get(
                    ByteBuffer.wrap(username.getBytes()))
                    .flatMap(byteBuffer -> {
                        byte[] bytes = new byte[byteBuffer.remaining()];
                        byteBuffer.get(bytes, 0, bytes.length);
                        String userStr;
                        userStr = new String(bytes, StandardCharsets.UTF_8);
                        log.debug(userStr);
                        User user = JSON.parseObject(userStr, User.class);
                        Map<String, String> result = new HashMap<>(2);
                        if (Objects.isNull(user.getPassword()) || !user.getPassword().equals(password)) {
                            result.put("message", "账号或密码错误");
                            log.debug("账号或密码错误");
                            return ServerResponse.status(HttpStatus.UNAUTHORIZED)
                                    .contentType(MediaType.APPLICATION_JSON_UTF8)
                                    .body(BodyInserters.fromObject(result));
                        } else {
                            result.put("message", "登录成功");
                            log.debug("登录成功");
                            return ServerResponse.ok()
                                    .contentType(MediaType.APPLICATION_JSON_UTF8)
                                    .body(BodyInserters.fromObject(result));
                        }
                    });
        });
    }
}
最后
使用Postman测试,步骤省略