SpringBoot2之WebFlux初体验

开始

SpringBoot升级到2.0版本之后,支持了WebFlux,初次体验后记录笔记如下

WebFlux是什么

相对于SpringMVCMVC是基于Servlet APIServlet容器设计的。Spring WebFlux是基于Reactive StreamsServlet3.1+容器设计的。

Reactor

RxJava 库是 JVM 上反应式编程的先驱,也是反应式流规范的基础。RxJava2在RxJava的基础上做了很多的更新。不过 RxJava 库也有其不足的地方。RxJava 产生于反应式流规范之前,虽然可以和反应式流的接口进行转换,但是由于底层实现的原因,使用起来并不是很直观。RxJava 2 在设计和实现时考虑到了与规范的整合,不过为了保持与 RxJava 的兼容性,很多地方在使用时也并不直观。Reactor 则是完全基于反应式流规范设计和实现的库,没有 RxJava 那样的历史包袱,在使用上更加的直观易懂。Reactor 也是 Spring 5 中反应式编程的基础。学习和掌握 Reactor 可以更好地理解 Spring 5 中的相关概念。

在 Java 程序中使用 Reactor 库非常的简单,只需要通过 Maven 或 Gradle 来添加对 io.projectreactor:reactor-core 的依赖即可,目前的版本是 3.0.5.RELEASE。

Flux 和 Mono

Flux 和 Mono 是 Reactor 中的两个基本概念。Flux 表示的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产生时,订阅者中对应的方法 onNext(), onComplete()和 onError()会被调用。Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。Flux 和 Mono 之间可以进行转换。对一个 Flux 序列进行计数操作,得到的结果是一个 Mono对象。把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。

  • 创建Flux
    • just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
    • fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
    • empty():创建一个不包含任何元素,只发布结束消息的序列。
    • error(Throwable error):创建一个只包含错误消息的序列。
    • never():创建一个不包含任何消息通知的序列。
    • range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
    • interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
    • intervalMillis(long period)和 intervalMillis(long delay, long period):与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。
  • 代码实例如下:
Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscribe(System.out::println);
  • 创建Mono

    • fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
    • delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
    • ignoreElements(Publisher source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。
    • justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。

还可以通过 create()方法来使用 MonoSink 来创建 Mono。

Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);

WebFlux的使用

首先,需要创建一个SpringBoot2的项目工程,并且引入WebFlux和其他需要的依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency> 

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

配置Reactive Redis

@SpringBootConfiguration
public class RedisConfig &#123;
    @Resource
    private RedisConnectionFactory factory;

    @Bean
    public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) &#123;
        return new ReactiveRedisTemplate<>(connectionFactory, RedisSerializationContext.string());
    &#125;

    @Bean
    public ReactiveRedisConnection connection(ReactiveRedisConnectionFactory connectionFactory) &#123;
        return connectionFactory.getReactiveConnection();
    &#125;

    @Bean
    ReactiveRedisOperations<String, Object> redisOperations(ReactiveRedisConnectionFactory factory) &#123;
        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
        RedisSerializationContext.RedisSerializationContextBuilder<String, Object> builder = RedisSerializationContext
                .newSerializationContext(new StringRedisSerializer());
        RedisSerializationContext<String, Object> context = builder.value(serializer).build();

        return new ReactiveRedisTemplate<>(factory, context);
    &#125;

    public @PreDestroy
    void flushDb() &#123;
        factory.getConnection().flushDb();
    &#125;
&#125;

编写一个RedisLoader.java类,在项目启动的时候初始化数据

@Component
public class RedisLoader &#123;
    @Resource
    private ReactiveRedisConnectionFactory factory;
    @Resource
    private ReactiveRedisOperations<String, Object> redisOperations;

    @PostConstruct
    public void loadData() &#123;
        factory.getReactiveConnection().serverCommands().flushAll()
                .thenMany(Flux.just("Thor", "Hulk", "Tony")
                        .map(name -> new User(UUID.randomUUID().toString().substring(0, 5), name, "123456"))
                        .flatMap(user -> redisOperations.opsForValue().set(user.getId(), user))
                ).thenMany(redisOperations.keys("*")
                .flatMap(redisOperations.opsForValue()::get))
                .subscribe(System.out::println);
    &#125;
&#125;

创建一个简单的User.java类,作为用户数据模型

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User &#123;
    private String id;

    private String name;

    private String password;
&#125;

定义用户数据操作接口UserService.java

public interface UserService &#123;
    /**
     * 用户注册
     *
     * @param id
     * @param username
     * @return
     */
    Mono<Boolean> add(String id, String username);

    /**
     * 用户登录
     *
     * @param username
     * @param password
     * @return
     */
    Mono<User> find(String username, String password);

    /**
     * 获取所有用户
     *
     * @return
     */
    Flux<User> getAll();

    Mono<Boolean> remove(String id);
&#125;

定义接口实现类UserServiceImpl.java

@Service
@Slf4j
public class UserServiceImpl implements UserService &#123;
    @Resource
    private ReactiveRedisOperations<String, User> redisOperations;

    @Override
    public Mono<Boolean> add(String id, String username) &#123;
        User user = new User();
        user.setId(id);
        user.setName(username);
        user.setPassword("123456");
        return redisOperations.opsForValue().set(id, user);
    &#125;

    @Override
    public Mono<User> find(String username, String password) &#123;
        return redisOperations.opsForValue().get(username);
    &#125;

    @Override
    public Flux<User> getAll() &#123;
        return redisOperations.keys("*")
                .flatMap(redisOperations.opsForValue()::get);
    &#125;

    @Override
    public Mono<Boolean> remove(String id) &#123;
        return redisOperations.opsForValue().delete(id);
    &#125;
&#125;

创建基于SpringMVCREST API

@RestController
public class UserController &#123;
    @Resource
    private UserService userService;

    @GetMapping("/users")
    public Flux<User> all() &#123;
        return userService.getAll();
    &#125;

    @PostMapping("/add")
    public Mono<Boolean> register(@RequestBody User user) &#123;
        return userService.add(user.getId(), user.getName());
    &#125;

    @PostMapping("/find")
    public Mono find(@RequestBody User user) &#123;
        return userService.find(user.getName(), user.getPassword());
    &#125;
&#125;

基于 Functional 函数式路由实现 RESTful API

@SpringBootConfiguration
public class Router &#123;
    @Resource
    private UserHandler userHandler;

    @Bean
    public RouterFunction<?> routerFunction() &#123;
        return RouterFunctions.route(RequestPredicates.GET("/hello"), userHandler::hello)
                .andRoute(RequestPredicates.POST("/login"), userHandler::login);
    &#125;
&#125;

UserHandler.java

@Service
public class UserHandler &#123;
    private final static Logger log = LoggerFactory.getLogger(UserHandler.class);
    @Resource
    private ReactiveRedisConnection connection;

    public Mono<ServerResponse> hello(ServerRequest request) &#123;
        return ServerResponse
                .ok()
                .contentType(MediaType.TEXT_PLAIN)
                .body(BodyInserters.fromObject("Hello, World"));
    &#125;

    /**
     * 登录
     *
     * @param request
     * @return
     */
    public Mono<ServerResponse> login(ServerRequest request) &#123;
        Mono<Map> body = request.bodyToMono(Map.class);
        return body.flatMap(map -> &#123;
            String username = (String) map.get("username");
            String password = (String) map.get("password");
            log.debug("username:&#123;&#125;,password:&#123;&#125;", username, password);
            return connection.stringCommands().get(
                    ByteBuffer.wrap(username.getBytes()))
                    .flatMap(byteBuffer -> &#123;
                        byte[] bytes = new byte[byteBuffer.remaining()];
                        byteBuffer.get(bytes, 0, bytes.length);
                        String userStr;
                        userStr = new String(bytes, StandardCharsets.UTF_8);
                        log.debug(userStr);
                        User user = JSON.parseObject(userStr, User.class);
                        Map<String, String> result = new HashMap<>(2);
                        if (Objects.isNull(user.getPassword()) || !user.getPassword().equals(password)) &#123;
                            result.put("message", "账号或密码错误");
                            log.debug("账号或密码错误");
                            return ServerResponse.status(HttpStatus.UNAUTHORIZED)
                                    .contentType(MediaType.APPLICATION_JSON_UTF8)
                                    .body(BodyInserters.fromObject(result));
                        &#125; else &#123;
                            result.put("message", "登录成功");
                            log.debug("登录成功");
                            return ServerResponse.ok()
                                    .contentType(MediaType.APPLICATION_JSON_UTF8)
                                    .body(BodyInserters.fromObject(result));
                        &#125;
                    &#125;);
        &#125;);
    &#125;
&#125;

最后

使用Postman测试,步骤省略

参考


   转载规则

本文不允许转载。
 上一篇
在SpringBoot中使用MongoDB 在SpringBoot中使用MongoDB
在SpringBoot中使用MongoDB 最近项目中使用了MongoDB,在SpringBoot中集成了MongoDB,MongoDB是当前非常火的一个非关系型数据库,同时也是最接近关系型数据库的,本篇文章用于记录SpringBoot中集
2018-10-26
下一篇 
记一次向开源项目提交PR的过程 记一次向开源项目提交PR的过程
最近在做Electron+Vue的项目,这里用到了这个项目作为脚手架。然而,在准备打包生产环境配置,用于发布第一个正式版本的时候,发现把process.env.NODE_ENV设置为production并不能切换为生产环境的配置。 原
2018-08-08