首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将AWS SQS连接到Apache-Flink

将AWS SQS连接到Apache-Flink
EN

Stack Overflow用户
提问于 2018-07-08 19:44:40
回答 2查看 1.8K关注 0票数 5

为什么AWS SQS不是Apache Flink的默认连接器?这样做有什么技术上的限制吗?或者这只是一些没有完成的事情?我想实现这一点,任何指针将不胜感激

EN

回答 2

Stack Overflow用户

发布于 2019-05-06 16:54:39

目前,Apache Flink中没有用于AWS SQS的连接器。看一看已经存在的existing connectors。我假设你已经知道了这一点,并想给出一些建议。我最近也在寻找SQS连接器,并找到了这个mail thread

Apache Kinesis Connector在某种程度上类似于你可以在上面实现的东西。看看你是否可以开始使用这个连接器。

票数 1
EN

Stack Overflow用户

发布于 2021-10-27 15:48:25

可能已经来不及回答最初的问题了……我使用面向SQS的Java消息传递服务库编写了一个SQS使用者作为SourceFunction:

代码语言:javascript
复制
SQSConsumer extends RichParallelSourceFunction<String> {
   private volatile boolean isRunning;
   private transient AmazonSQS sqs;
   private transient SQSConnectionFactory connectionFactory;
   private transient ExecutorService consumerExecutor;

   @Override
   public void open(Configuration parameters) throws Exception {
      String region = ...
      AWSCredentialsProvider credsProvider = ...
      // may be use a blocking array backed thread pool to handle surges?
      consumerExecutor = Executors.newCachedThreadPool();
      ClientConfiguration clientConfig = PredefinedClientConfigurations.defaultConfig();
      this.sqs = AmazonSQSAsyncClientBuilder.standard().withRegion(region).withCredentials(credsProvider)
            .withClientConfiguration(clientConfig)
            .withExecutorFactory(()->consumerExecutor).build();
      this.connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), sqs);
      this.isRunning = true;
   }

   @Override
   public void run(SourceContext<String> ctx) throws Exception {
      SQSConnection connection = connectionFactory.createConnection();
      // ack each msg explicitly
      Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE);
      Queue queue = session.createQueue(<queueName>);
      MessageConsumer msgConsumer = session.createConsumer(queue);
      msgConsumer.setMessageListener(msg -> {
          try {
              String msgId = msg.getJMSMessageID();
              String evt = ((TextMessage) msg).getText();
              ctx.collect(evt);
              msg.acknowledge();
          } catch (JSMException e) {
              // log and move on the next msg or bail with an exception
              // have a dead letter queue is configured so this message is not lost
              // msg is not acknowledged so it may be picked up again by another consumer instance
          }
      };
      // check if we were canceled
      if (!isRunning) {
          return;
      }
      connection.start();
      while (!consumerExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
          // keep waiting
      }
  }
            

  @Override
  public void cancel() {
      isRunning = false;
      // this method might be called before the task actually starts running
      if (sqs != null) {
          sqs.shutdown();
      }
      if(consumerExecutor != null) {
           consumerExecutor.shutdown();
           try {
               consumerExecutor.awaitTermination(1, TimeUnit.MINUTES); 
           } catch (Exception e) {
               //log e
           }
      }
   }

   @Override
   public void close() throws Exception {
       cancel();
       super.close();
   }
}

注如果您使用的是标准SQS队列,则可能必须根据是否需要只需一次保证来对消息进行重复数据删除。

参考:Working with JMS and Amazon SQS

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51231656

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档