首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >SNS FIFO主题不会将消息挤出SQS队列

SNS FIFO主题不会将消息挤出SQS队列
EN

Stack Overflow用户
提问于 2021-01-27 18:29:00
回答 2查看 971关注 0票数 0

我试图探索SNS FIFO主题与SQS队列,这是我只是尝试。我创建了SNS FIFO主题和SQS FIFO队列,并将FIFO队列订阅到FIFO主题。根据文档,对于上述设置,每当我们将消息发布到SNS FIFO队列时,它应该将该消息扇出到SQS队列,但它不会发生。我能够获得PublishResult#getMessageId()意味着发布部分正在成功运行,但是队列中没有任何消息。由于SNS FIFO主题不支持电子邮件协议订阅,所以我断言这个发布子体系结构的唯一方法是轮询队列中的消息。由于没有风扇,队列似乎总是空的。

完整的代码块:

代码语言:javascript
复制
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.CreateTopicRequest;
import com.amazonaws.services.sns.model.CreateTopicResult;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import com.amazonaws.services.sns.model.SubscribeRequest;
import com.amazonaws.services.sns.model.SubscribeResult;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.UUID;

class FifoTopicsITest {

    @Test
    void test() {
        final String topicName = UUID.randomUUID().toString().substring(15);
        //creating sns client
        AmazonSNS amazonSNS = AmazonSNSClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
                        "<accessKey>", "<secretKey>")))
                .withEndpointConfiguration(new AwsClientBuilder
                        .EndpointConfiguration("https://sns.us-west-1.amazonaws.com",
                        "us-west-1")).build();
        //creating sqs client
        AmazonSQS amazonSQS = AmazonSQSClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
                "<accessKey>", "<secretKey>")))
                .withEndpointConfiguration(new AwsClientBuilder
                        .EndpointConfiguration("https://sqs.us-west-1.amazonaws.com",
                        "us-west-1")).build();

        //creating SNS topic
        CreateTopicRequest createTopicRequest = new CreateTopicRequest().withName(topicName + ".fifo");
        createTopicRequest
                .addAttributesEntry("FifoTopic", "true")
                .addAttributesEntry("ContentBasedDeduplication", "false");
        CreateTopicResult topicResult = amazonSNS.createTopic(createTopicRequest);
        String topicArn = topicResult.getTopicArn();

        //creating dead-letter sqs queue
        CreateQueueRequest createDLQQueueRequest = new CreateQueueRequest();
        createDLQQueueRequest.addAttributesEntry("FifoQueue", "true");
        createDLQQueueRequest.addAttributesEntry("ContentBasedDeduplication", "false");
        createDLQQueueRequest.withQueueName(topicName + "_DLQ_" + ".fifo");
        CreateQueueResult createDeadLetterQueueResult = amazonSQS.createQueue(createDLQQueueRequest);

        //getting ARN value of dead-letter queue
        GetQueueAttributesResult getQueueAttributesResult = amazonSQS.getQueueAttributes(
                new GetQueueAttributesRequest(createDeadLetterQueueResult.getQueueUrl())
                        .withAttributeNames("QueueArn"));
        String deleteQueueArn = getQueueAttributesResult.getAttributes().get("QueueArn");

        //creating sqs queue
        CreateQueueRequest createQueueRequest = new CreateQueueRequest();
        createQueueRequest.addAttributesEntry("FifoQueue", "true");
        createQueueRequest.addAttributesEntry("ContentBasedDeduplication", "false");
        createQueueRequest.withQueueName(topicName + ".fifo");
        String reDrivePolicy = "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\""
                + deleteQueueArn + "\"}";
        createQueueRequest.addAttributesEntry("RedrivePolicy", reDrivePolicy);
        CreateQueueResult createQueueResult = amazonSQS.createQueue(createQueueRequest);
        String queueUrl = createQueueResult.getQueueUrl();

        //getting ARN value of queue
        getQueueAttributesResult = amazonSQS.getQueueAttributes(
                new GetQueueAttributesRequest(queueUrl)
                        .withAttributeNames("QueueArn"));
        String queueArn = getQueueAttributesResult.getAttributes().get("QueueArn");

        //Subscribe FIFO queue to FIFO Topic
        SubscribeRequest subscribeRequest = new SubscribeRequest();
        subscribeRequest.withProtocol("sqs")
                .withTopicArn(topicArn)
                .withEndpoint(queueArn);
        SubscribeResult subscribeResult = amazonSNS.subscribe(subscribeRequest);
        Assertions.assertNotNull(subscribeResult.getSubscriptionArn());

        //Publishing 4 sample message to FIFO SNS Topic
        for (int i = 0; i < 5; i++) {
            PublishRequest publishRequest = new PublishRequest()
                    .withTopicArn(topicArn)
                    .withMessage("Test Message" + i)
                    .withMessageGroupId(topicName)
                    .withMessageDeduplicationId(UUID.randomUUID().toString());
            PublishResult publishResult = amazonSNS.publish(publishRequest);
            Assertions.assertNotNull(publishResult.getMessageId());
        }

        //Getting ApproximateNumberOfMessages no of messages from the FIFO Queue
        getQueueAttributesResult = amazonSQS.getQueueAttributes(
                new GetQueueAttributesRequest(queueUrl)
                        .withAttributeNames("All"));
        String approximateNumberOfMessages = getQueueAttributesResult.getAttributes()
                                                     .get("ApproximateNumberOfMessages");

        //My expectation here is SNS FIFO topic should have fanout the 4 published message to SQS FIFO Queue
        Assertions.assertEquals(4, Integer.valueOf(approximateNumberOfMessages));
    }
}

SNS访问策略(权限)

代码语言:javascript
复制
{
  "Version": "2008-10-17",
  "Id": "__default_policy_ID",
  "Statement": [
    {
      "Sid": "__default_statement_ID",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": [
        "SNS:GetTopicAttributes",
        "SNS:SetTopicAttributes",
        "SNS:AddPermission",
        "SNS:RemovePermission",
        "SNS:DeleteTopic",
        "SNS:Subscribe",
        "SNS:ListSubscriptionsByTopic",
        "SNS:Publish",
        "SNS:Receive"
      ],
      "Resource": "arn:aws:sns:us-west-1:<account>:<topicName>.fifo",
      "Condition": {
        "StringEquals": {
          "AWS:SourceOwner": "<account>"
        }
      }
    }
  ]
}

访问策略(权限)

代码语言:javascript
复制
{
  "Version": "2012-10-17",
  "Id": "arn:aws:sqs:us-west-1:<account>:<topicName>.fifo/SQSDefaultPolicy"
}

我遗漏了什么?为什么消息不存在于SQS队列中。对于SQS队列权限我应该做些什么,如下所示?

代码语言:javascript
复制
{
  "Id": "Policy1611770719125",
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "Stmt1611770707743",
      "Action": [
        "sqs:GetQueueAttributes",
        "sqs:GetQueueUrl",
        "sqs:ListQueueTags",
        "sqs:ListQueues",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:SendMessageBatch",
        "sqs:SetQueueAttributes"
      ],
      "Effect": "Allow",
      "Resource": "arn:aws:sqs:us-west-1:<account>:<queueName>.fifo",
      "Principal": {
        "AWS": "*"
      }
    }
  ]
}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-01-29 13:25:33

与后人分享我的答案,由于怀疑实际问题与Access Policy有关,当我们创建FIFO队列并使用AWS V1订阅SQS队列时,默认访问策略如下所示

代码语言:javascript
复制
{
  "Version": "2012-10-17",
  "Id": "arn:aws:sqs:us-west-1:<account>:<topicName>.fifo/SQSDefaultPolicy"
}

即使我尝试使用AWS v2 链接创建SQS队列,上述访问策略也将是相同的。因此,当我按下面的方式手动更改访问策略时,问题已经解决,并且FIFO SNS主题退出已按规定发生:

代码语言:javascript
复制
{
  "Statement": [
    {
      "Action": [
        "sqs:*"
      ],
      "Effect": "Allow",
      "Resource": "arn:aws:sqs:us-west-1:<account>:<queueName>.fifo",
      "Principal": {
        "AWS": "*"
      }
    }
  ]
}

代码块,为每个FIFO队列添加上述Access policy

代码语言:javascript
复制
Policy policy = new Policy().withStatements(
        new Statement(Statement.Effect.Allow)
                .withPrincipals(Principal.AllUsers)
                .withResources(new Resource(queueArn))
                .withActions(SQSActions.AllSQSActions));

Map<String, String> policyQueueAttributes = new HashMap<>();
policyQueueAttributes.put(QueueAttributeName.Policy.toString(), policy.toJson());
amazonSQS.setQueueAttributes(new SetQueueAttributesRequest(queueUrl, policyQueueAttributes));

在创建SQS队列后添加了上述代码块,最终解决了这个问题。

票数 2
EN

Stack Overflow用户

发布于 2021-11-24 23:18:11

如果您正在寻找非编程解决方案,并且希望使用AWS控制台,请查看AWS正式文档中的“如何使用SNS FIFO Topics”部分(下面的链接),其中他们提到您需要向FIFO SQS队列的访问策略中添加一条语句。这个新添加的语句授予FIFO主题权限来向队列发送消息。

使用它们的示例,如果您有一个名为updates.fifo的FIFO主题,并且试图将消息扇出到两个队列,即customer.fifo和loyalty.fifo,则在订阅该主题的队列后,您将导航到控制台中的customer.fifo队列,并通过添加上述语句编辑访问策略:

代码语言:javascript
复制
{
    "Effect": "Allow",
    "Principal": {
        "Service": "sns.amazonaws.com"
    },
    "Action": "SQS:SendMessage",
    "Resource": "arn:aws:sqs:us-east-2:123412341234:customer.fifo",
    "Condition": {
        "ArnLike": {
            "aws:SourceArn": "arn:aws:sns:us-east-2:123412341234:updates.fifo"
        }
    }
}

这也适用于loyalty.fifo队列:

代码语言:javascript
复制
{
    "Effect": "Allow",
    "Principal": {
        "Service": "sns.amazonaws.com"
    },
    "Action": "SQS:SendMessage",
    "Resource": "arn:aws:sqs:us-east-2:123412341234:loyalty.fifo",
    "Condition": {
        "ArnLike": {
            "aws:SourceArn": "arn:aws:sns:us-east-2:123412341234:updates.fifo"
        }
    }
}

资源:介绍Amazon

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65925207

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档