为什么AWS SQS不是Apache Flink的默认连接器?这样做有什么技术上的限制吗?或者这只是一些没有完成的事情?我想实现这一点,任何指针将不胜感激
发布于 2019-05-06 16:54:39
目前,Apache Flink中没有用于AWS SQS的连接器。看一看已经存在的existing connectors。我假设你已经知道了这一点,并想给出一些建议。我最近也在寻找SQS连接器,并找到了这个mail thread。
Apache Kinesis Connector在某种程度上类似于你可以在上面实现的东西。看看你是否可以开始使用这个连接器。
发布于 2021-10-27 15:48:25
可能已经来不及回答最初的问题了……我使用面向SQS的Java消息传递服务库编写了一个SQS使用者作为SourceFunction:
SQSConsumer extends RichParallelSourceFunction<String> {
private volatile boolean isRunning;
private transient AmazonSQS sqs;
private transient SQSConnectionFactory connectionFactory;
private transient ExecutorService consumerExecutor;
@Override
public void open(Configuration parameters) throws Exception {
String region = ...
AWSCredentialsProvider credsProvider = ...
// may be use a blocking array backed thread pool to handle surges?
consumerExecutor = Executors.newCachedThreadPool();
ClientConfiguration clientConfig = PredefinedClientConfigurations.defaultConfig();
this.sqs = AmazonSQSAsyncClientBuilder.standard().withRegion(region).withCredentials(credsProvider)
.withClientConfiguration(clientConfig)
.withExecutorFactory(()->consumerExecutor).build();
this.connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), sqs);
this.isRunning = true;
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
SQSConnection connection = connectionFactory.createConnection();
// ack each msg explicitly
Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE);
Queue queue = session.createQueue(<queueName>);
MessageConsumer msgConsumer = session.createConsumer(queue);
msgConsumer.setMessageListener(msg -> {
try {
String msgId = msg.getJMSMessageID();
String evt = ((TextMessage) msg).getText();
ctx.collect(evt);
msg.acknowledge();
} catch (JSMException e) {
// log and move on the next msg or bail with an exception
// have a dead letter queue is configured so this message is not lost
// msg is not acknowledged so it may be picked up again by another consumer instance
}
};
// check if we were canceled
if (!isRunning) {
return;
}
connection.start();
while (!consumerExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
// keep waiting
}
}
@Override
public void cancel() {
isRunning = false;
// this method might be called before the task actually starts running
if (sqs != null) {
sqs.shutdown();
}
if(consumerExecutor != null) {
consumerExecutor.shutdown();
try {
consumerExecutor.awaitTermination(1, TimeUnit.MINUTES);
} catch (Exception e) {
//log e
}
}
}
@Override
public void close() throws Exception {
cancel();
super.close();
}
}注如果您使用的是标准SQS队列,则可能必须根据是否需要只需一次保证来对消息进行重复数据删除。
https://stackoverflow.com/questions/51231656
复制相似问题