vlambda博客
学习文章列表

webflux 快速入门(基于spring 5)

废话少说,直接上代码


pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>webflex</artifactId> <version>0.0.1-SNAPSHOT</version> <name>webflex</name> <description>Demo project for Spring Boot</description>
<properties> <java.version>1.8</java.version> </properties>
<dependencies> <!--<dependency>--> <!--<groupId>io.projectreactor</groupId>--> <!--<artifactId>reactor-core</artifactId>--> <!--<version>3.1.5.RELEASE</version>--> <!--</dependency>-->
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
</project>

创建entity

package com.example.webflex.entity;
/** * @author junfeng.hu * @create 2020-05-27 15:00 */public class User { private String name; private String geneder; private Integer age;
public User(){
}
public User(String name, String geneder, Integer age) { this.name = name; this.geneder = geneder; this.age = age; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getGeneder() { return geneder; }
public void setGeneder(String geneder) { this.geneder = geneder; }
public Integer getAge() { return age; }
public void setAge(Integer age) { this.age = age; }}

定义好UserService

package com.example.webflex.service;
import com.example.webflex.entity.User;import reactor.core.publisher.Flux;import reactor.core.publisher.Mono;
/** * @author junfeng.hu * @create 2020-05-27 15:03 */public interface UserService { Mono<User> getUserById(int id); Flux<User> getAllUser(); Mono<Void> saveUserInfo(Mono<User> user);}

实现UserService

package com.example.webflex.service.impl;
import com.example.webflex.entity.User;import com.example.webflex.service.UserService;import org.springframework.stereotype.Repository;import reactor.core.publisher.Flux;import reactor.core.publisher.Mono;
import java.util.HashMap;import java.util.Map;
/** * @author junfeng.hu * @create 2020-05-27 15:21 */@Repositorypublic class UserServiceImpl implements UserService {
private final Map<Integer, User> users = new HashMap<>();
public UserServiceImpl(){ this.users.put(1,new User("zhangshan", "男", 29)); this.users.put(2,new User("lisi", "女", 29)); this.users.put(3,new User("wangwu", "男", 29)); }
@Override public Mono<User> getUserById(int id) { return Mono.justOrEmpty(this.users.get(id)); }
@Override public Flux<User> getAllUser() { return Flux.fromIterable(this.users.values()); }
@Override public Mono<Void> saveUserInfo(Mono<User> userMono) { return userMono.doOnNext(person -> { int id = users.size()+1; users.put(id, person); }).thenEmpty(Mono.empty()); }}

实现一个handler

package com.example.webflex.handle;
import com.example.webflex.entity.User;import com.example.webflex.service.UserService;import org.springframework.http.MediaType;import org.springframework.web.reactive.function.server.ServerRequest;import org.springframework.web.reactive.function.server.ServerResponse;import reactor.core.publisher.Flux;import reactor.core.publisher.Mono;
import static org.springframework.web.reactive.function.BodyInserters.fromObject;
/** * @author junfeng.hu * @create 2020-06-03 11:20 */public class UserHandler { // 调用just方式只是声明了数据流,必须调用subscribe方法才会触发数据流 // 否则什么都不会发生 private final UserService userService;
public UserHandler(UserService userService) { this.userService = userService; }

public Mono<ServerResponse> getUserById(ServerRequest request){ int userId = Integer.valueOf(request.pathVariable("id")); Mono<User> userMono = this.userService.getUserById(userId); return userMono.flatMap(person -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromObject(person)));
}
public Mono<ServerResponse> getAllUser(ServerRequest request){ Flux users = this.userService.getAllUser(); return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(users, User.class); }
public Mono<ServerResponse> saveUser(ServerRequest request){ Mono userMono = this.userService.saveUserInfo(request.bodyToMono(User.class));
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(userMono, User.class); }}

创建一个Server类

package com.example.webflex;
import com.example.webflex.handle.UserHandler;import com.example.webflex.service.UserService;import com.example.webflex.service.impl.UserServiceImpl;import org.springframework.http.MediaType;import org.springframework.http.server.reactive.HttpHandler;import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;import org.springframework.web.reactive.function.server.RouterFunction;import org.springframework.web.reactive.function.server.RouterFunctions;import org.springframework.web.reactive.function.server.ServerResponse;import reactor.netty.http.server.HttpServer;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;import static org.springframework.web.reactive.function.server.RequestPredicates.POST;import static org.springframework.web.reactive.function.server.RequestPredicates.accept;import static org.springframework.web.reactive.function.server.RouterFunctions.toHttpHandler;
/** * @author junfeng.hu * @create 2020-06-03 11:20 */public class Server2 {
public static void main(String[] args) throws Exception{ Server2 server = new Server2(); server.createReactorServer(); System.out.println("enter to exit"); System.in.read(); }

public RouterFunction<ServerResponse> routingFunction(){ UserService userService = new UserServiceImpl(); UserHandler handler = new UserHandler(userService); //设置路由 return RouterFunctions .route(POST("/saveuser").and(accept(MediaType.APPLICATION_JSON)), handler::saveUser) .andRoute(GET("/user/{id}").and(accept(MediaType.APPLICATION_JSON)), handler::getUserById) .andRoute(GET("/users").and(accept(MediaType.APPLICATION_JSON)), handler::getAllUser); }


public void createReactorServer(){ RouterFunction<ServerResponse> route = routingFunction(); HttpHandler httpHandler = toHttpHandler(route); ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
HttpServer httpServer = HttpServer.create(); httpServer.handle(adapter).bindNow(); }

}

和Server对应的Client

package com.example.webflex;
import com.example.webflex.handle.UserHandler;import com.example.webflex.service.UserService;import com.example.webflex.service.impl.UserServiceImpl;import org.springframework.http.MediaType;import org.springframework.http.server.reactive.HttpHandler;import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;import org.springframework.web.reactive.function.server.RouterFunction;import org.springframework.web.reactive.function.server.RouterFunctions;import org.springframework.web.reactive.function.server.ServerResponse;import reactor.netty.http.server.HttpServer;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;import static org.springframework.web.reactive.function.server.RequestPredicates.POST;import static org.springframework.web.reactive.function.server.RequestPredicates.accept;import static org.springframework.web.reactive.function.server.RouterFunctions.toHttpHandler;
/** * @author junfeng.hu * @create 2020-06-03 11:20 */public class Server2 {
public static void main(String[] args) throws Exception{ Server2 server = new Server2(); server.createReactorServer(); System.out.println("enter to exit"); System.in.read(); }

public RouterFunction<ServerResponse> routingFunction(){ UserService userService = new UserServiceImpl(); UserHandler handler = new UserHandler(userService); //设置路由 return RouterFunctions .route(POST("/saveuser").and(accept(MediaType.APPLICATION_JSON)), handler::saveUser) .andRoute(GET("/user/{id}").and(accept(MediaType.APPLICATION_JSON)), handler::getUserById) .andRoute(GET("/users").and(accept(MediaType.APPLICATION_JSON)), handler::getAllUser); }


public void createReactorServer(){ RouterFunction<ServerResponse> route = routingFunction(); HttpHandler httpHandler = toHttpHandler(route); ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
HttpServer httpServer = HttpServer.create(); httpServer.handle(adapter).bindNow(); }

}

通过上面一顿的操作,按照顺序启动

server

client

这样就完成了一个基于webflux的nio的项目搭建。帮助大家入了一下门。


https://www.ibm.com/developerworks/cn/java/spring5-webflux-reactive/index.html

https://www.ibm.com/developerworks/cn/java/spring5-webflux-reactive/index.html

https://howtodoinjava.com/spring-webflux/webclient-get-post-example/