我推送100条信息到流与1 shrad。
spring:
cloud:
stream:
bindings:
myOutBound:
destination: my-stream
contentType: application/json为了测试目的,我在循环中推送消息。
@EnableBinding(MyBinder.class)
public class MyProcessor {
@Autowired
private MyBinder myBinder;
public void processRollup() {
List<MyObject> myObjects = IntStream.range(1, 100)
.mapToObj(Integer::valueOf)
.map(s-> new MyObject(s))
.collect(toList());
myObjects.forEach(messagePayload ->{
System.out.println(messagePayload.getId());
myBinder.myOutBound()
.send(MessageBuilder.withPayload(messagePayload)
.build());
}
);
}
}我正在使用下面的留言
spring:
cloud:
stream:
bindings:
RollUpInboundStream:
group: my-consumer-group
destination: my-stream
content-type: application/json没有对邮件消费进行排序。
我是不是漏掉了什么。
发布于 2019-04-03 13:41:57
有几件事要考虑。首先,绑定器中的生产者是基于默认情况下具有KinesisMessageHandler模式的async模式的:
messageHandler.setSync(producerProperties.getExtension().isSync());因此,即使您以正确的顺序发送这些消息,也并不意味着它们以相同的顺序到达AWS上的流。
而且,即使您以同步模式发送AWS,也不能保证它们在AWS上以相同的顺序固定下来。
见此处:Amazon Kinesis and guaranteed ordering
此外,您还可以通过显式sequenceNumber在同一个碎片内实现订单保证:
为了确保严格增加排序,请串行写入碎片并使用SequenceNumberForOrdering参数。
不幸的是,Kinesis目前不支持这个选项,但是在将它发送到绑定器的AwsHeaders.SEQUENCE_NUMBER目的地之前,我们可以通过在消息中设置一个显式AwsHeaders.SEQUENCE_NUMBER来克服它:
String sequenceNumber = messageHeaders.get(AwsHeaders.SEQUENCE_NUMBER, String.class);
if (!StringUtils.hasText(sequenceNumber) && this.sequenceNumberExpression != null) {
sequenceNumber = this.sequenceNumberExpression.getValue(getEvaluationContext(), message, String.class);
}https://stackoverflow.com/questions/55480469
复制相似问题