假设我有两个微服务,并且我想在一个使用WebFlux的Spring REST控制器中实现BFF (前端后端)模式。
来自两个远程服务的域对象是:
public class Comment {
private Long id;
private String text;
private Long authorId;
private Long editorId;
}
public class Person {
private Long id;
private String firstName;
private String lastName;
}并且API编写器必须返回以下类型的对象:
public class ComposedComment {
private String text;
private String authorFullName;
private String editorFullName;
}为了方便起见,我编写了一个控制器,它在一个控制器中模拟所有的服务。
@RestController
@RequestMapping("/api")
public class Controller {
private static final List<Comment> ALL_COMMENTS = Arrays.asList(//
new Comment(1L, "Bla bla", 1L, null), //
new Comment(2L, "lorem ipsum", 2L, 3L), //
new Comment(3L, "a comment", 2L, 1L));
private static final Map<Long, Person> PERSONS;
static {
PERSONS = new HashMap<>();
PERSONS.put(1L, new Person(1L, "John", "Smith"));
PERSONS.put(2L, new Person(2L, "Paul", "Black"));
PERSONS.put(3L, new Person(3L, "Maggie", "Green"));
}
private WebClient clientCommentService = WebClient.create("http://localhost:8080/api");
private WebClient clientPersonService = WebClient.create("http://localhost:8080/api");
@GetMapping("/composed/comments")
public Flux<ComposedComment> getComposedComments() {
//This is the tricky part
}
private String extractFullName(Map<Long, Person> map, Long personId) {
Person person = map.get(personId);
return person == null ? null : person.getFirstName() + " " + person.getLastName();
}
@GetMapping("/comments")
public ResponseEntity<List<Comment>> getAllComments() {
return new ResponseEntity<List<Comment>>(ALL_COMMENTS, HttpStatus.OK);
}
@GetMapping("/persons/{personIds}")
public ResponseEntity<List<Person>> getPersonsByIdIn(@PathVariable("personIds") Set<Long> personIds) {
List<Person> persons = personIds.stream().map(id -> PERSONS.get(id)).filter(person -> person != null)
.collect(Collectors.toList());
return new ResponseEntity<List<Person>>(persons, HttpStatus.OK);
}
}我的问题是我刚刚开始使用反应器,我不是很确定我在做什么。这是我的composer方法的当前版本:
@GetMapping("/composed/comments")
public Flux<ComposedComment> getComposedComments() {
Flux<Comment> commentFlux = clientCommentService.get().uri("/comments").retrieve().bodyToFlux(Comment.class);
Set<Long> personIds = commentFlux.toStream().map(comment -> Arrays.asList(comment.getAuthorId(), comment.getEditorId())).flatMap(Collection::stream).filter(Objects::nonNull).collect(Collectors.toSet());
Map<Long, Person> personsById = clientPersonService.get().uri("/persons/{ids}", personIds.stream().map(Object::toString).collect(Collectors.joining(","))).retrieve().bodyToFlux(Person.class).collectMap(Person::getId).block();
return commentFlux.map(
comment -> new ComposedComment(
comment.getText(),
extractFullName(personsById, comment.getAuthorId()),
extractFullName(personsById, comment.getEditorId()))
);
}它可以工作,但是我知道我应该用map、flatMap和zip进行几次转换,而不是调用block()和toStream()……你能帮我正确地重写这个方法吗?:)
发布于 2018-08-22 15:32:45
您应该尝试使用zip操作符来组合这两个发布器。如果你想退还,就不要订阅flux。
如果因为第二个发布者依赖于第一个发布者的结果而无法使用zip,那么可以使用flatMap。
您可以像这样使用flatMap:
commentsFlux.flatMap(comment -> personService.getPersonsByIds(comment.getPersonId1() + "," + comment.getPersonId2())
//at this moment you have scope on both
.map(listOfTwoPersons -> new Composed(listOfTwoPersons, comment))注:我没有使用webflux客户端,我只是从你的工作示例中猜测,它知道要包装到Flux/Mono,即使你返回一个实体或实体列表。
发布于 2018-08-22 15:48:06
你在你的控制器中返回null。通过返回反应流来替换它。
return commentFlux.flatMap(comment -> ....)
....您的控制器签名返回一个
Flux<ComposedComment>因此,请确保在最后一次返回时,必须使用flatMap或map将它们转换为ComposedComment。你可以把它想象成一个promise链,在那里你可以在实现中执行许多flatMap,映射到最终的dataset。
在这些情况下不要使用subscribe,subscribe适用于演示反应流的调用过程,或者在app中的某个地方,结果作为这个控制器直接调用了不需要的方法
此时,您只需使用map、flatMap、collect、zip……返回反应流。只需返回反应流(Mono,Flux<>),spring-webflux就会调用它们。
https://stackoverflow.com/questions/51961079
复制相似问题