首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在生产环境中,Sidekiq将作业添加到历史中,但不执行它(Sidekiq + Redis + EC2 + Cloud66)

在生产环境中,Sidekiq将作业添加到历史中,但不执行它(Sidekiq + Redis + EC2 + Cloud66)
EN

Stack Overflow用户
提问于 2020-12-09 18:49:26
回答 2查看 324关注 0票数 0

我的应用程序有一个导入函数,它将执行一个Sidekiq Worker并导入一堆CSV行,将它们保存到我的数据库中。当我在本地机器上执行Sidekiq时,这可以很好地工作,但当我将代码部署到生产环境中时,Sidekiq将只正确执行一次作业。当我第二次使用import函数时,作业直接转到Sidekiq中的历史记录堆中,worker内部的逻辑永远不会执行。这真的很奇怪,因为它不会抛出错误,就像作业被正确执行一样。对于暂存,我使用AWS Elastic Cache中的Redis。

代码语言:javascript
复制
redis_version: 5.0.6

rails, "5.0.7"
sidekiq, "6.0.5"
sidekiq-failures, "1.0.0"
sidekiq-history, "0.0.11"
sidekiq-limit_fetch, "3.4.0"
sidekiq-pro, "5.0.1"
sidekiq-unique-jobs, "6.0.15"

我将感谢任何与您之前遇到的类似问题相关的提示,或者我可以做的任何其他调试这个问题的技巧。我已经跑了

代码语言:javascript
复制
Sidekiq.redis { |conn| conn.ping }
=> "PONG"

所以看起来Redis已经连接好了。

项目工作者

代码语言:javascript
复制
# frozen_string_literal: true

class ImportWorker
  include Sidekiq::Worker
  sidekiq_options queue: "import_worker", lock: :until_executed, retry: false

  def perform(import_id)
    import = Import.find_by(id: import_id)
    return if import.blank?

    path = import.file.expiring_url(10)
    file = open(path)

    csv = CSV.parse(file.read, headers: true)
    import.update!( number_of_lines_in_csv: csv.size,
                    import_started_at: DateTime.now)

    created_transactions = []
    csv.each do |row|
      guid = row["TransactionUniqueId"]
      next if guid.blank?

      existing_transaction = Transaction.find_by(transaction_unique_id: guid)
      next if existing_transaction.present?

      attributes = Transaction.convert_attributes(import, row).merge(imported_at: Time.now)

      transaction = Transaction.create!(attributes)
      created_transactions << [transaction.id, guid]
      Rails.logger.info "Transaction #{row["TransactionUniqueId"]} created."
    end

    import.update!(import_finished_at: DateTime.now,
                   imported:           true)
    send_mail(import_id, created_transactions)
  end

  def send_mail(import_id, created_transactions)
    ["email0@example.com", "email1@example.com"].each do |email|
      ImportTransactionsMailer.import_processed(import_id, email, created_transactions).deliver
    end
  end
end

编辑1:对不起,我忘了说我正在使用Cloud66部署我的应用程序,如果这对我有任何帮助的话。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-12-11 17:27:35

我找到了问题所在。因此,我在导入模型中的after_create钩子中触发了我的ImportWorker,如下所示。

代码语言:javascript
复制
# frozen_string_literal: true

class Import < ApplicationRecord
  has_many :transactions
  belongs_to :admin_user

  has_attached_file :file, s3_protocol: :https
  validates_attachment_content_type :file, content_type: ["text/plain",
                                                          "text/csv",
                                                          "application/vnd.ms-excel",
                                                          "application/octet-stream"]
  validates :file, attachment_presence: true
  has_paper_trail
  after_create :run_import_in_background

  def run_import_in_background
    ImportWorker.perform_async(id)
  end
end

但是当Worker执行第一行代码来查找Import时

代码语言:javascript
复制
class ImportWorker
  include Sidekiq::Worker
  sidekiq_options queue: "import_worker", lock: :until_executed, retry: false

  def perform(import_id)
    import = Import.find_by(id: import_id)
    return if import.blank?
...

如果导入是nil,它应该返回。问题是,我假设导入永远不会是nil,因为这是从after_create钩子触发的,但它实际上是作为nil来的。当我将返回线更改为raise StandardError.new("Empty import object.") if import.blank?时,worker开始失败。

因此,我还将worker sidekiq_optionsretry: false更改为retry: 3,并在第二次尝试中worker executed ok,因为它现在可以找到具有指定id的导入。所以我认为这是after_create钩子和Sidekiq之间的某种同步问题。这可能也与在此设置中使用S3 gem有关。将文件保存在S3中可能会导致在数据库中保存对象时出现一些延迟。

您可以在下面看到最终的Worker代码。

代码语言:javascript
复制
# frozen_string_literal: true

class ImportWorker
  include Sidekiq::Worker
  sidekiq_options queue: "import_worker", lock: :until_executed, retry: 3

  def perform(import_id)
    import = Import.find_by(id: import_id)
    raise StandardError.new("Empty import object.") if import.blank?
...
票数 0
EN

Stack Overflow用户

发布于 2020-12-10 01:10:53

代码语言:javascript
复制
sidekiq_options queue: "import_worker", lock: :until_executed, retry: false

如果发生错误会发生什么?该作业是否将被丢弃,但唯一性锁将保留,从而阻止进一步的作业入队?

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65215302

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档