首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >当在Docker中运行Apache气流时,我如何解决即使在修复了DAG之后,DAG也不能完好无损的问题?

当在Docker中运行Apache气流时,我如何解决即使在修复了DAG之后,DAG也不能完好无损的问题?
EN

Stack Overflow用户
提问于 2022-02-01 16:33:54
回答 1查看 2.7K关注 0票数 1

因此,在我的例子中,我以前在我的机器上直接运行过气流,现在我尝试使用码头在容器中运行,同时保持我以前的测试记录。不过,我一直有一些问题。

一点点背景..。当我第一次使用坞-撰写来打开我的容器时,气流正在发送一条错误消息,说明列dag_has_import_errors不存在。所以我继续创造了它,一切看起来都很好。

但是,现在我的工具都坏了,当我修改一个没有解决问题的代码时,我可以在when服务器顶部显示的简短错误信息中看到更新的代码行。

但是,当我解决这个问题时,代码不会改变,DAG仍然会中断。我会提供

这个错误的图像

这是代码的图像\

另外,下面是我的坞-撰写文件(我注释掉了气流db init,但我是否应该将它与db升级参数保持为真呢?我的撰写文件基于这个模板\

代码语言:javascript
复制
version: '3.1'
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    # postgresql+psycopg2://postgres:airflow@localhost:5434/airflowdb
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://postgres:airflow@postgres:5434/airflowdb
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://postgres:airflow@postgres:5434/airflowdb
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflowdb
      PGPORT: 5434

    volumes:
      - pipeline-scripts_airflow-docker-db:/var/lib/postgresql/data
      # - postgres-db-volume:/var/lib/postgresql/data
    ports:
      - 5434:5434
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "postgres"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      # airflow-init:
      #   condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      # airflow-init:
      #   condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    environment:
      <<: *airflow-common-env
      # Required to handle warm shutdown of the celery workers properly
      # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      # airflow-init:
      #   condition: service_completed_successfully

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      # airflow-init:
      #   condition: service_completed_successfully

#below here
  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
    command:
      - bash
      - -c
      - airflow

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      # airflow-init:
      #   condition: service_completed_successfully

# volumes:
#   postgres-db-volume:
volumes: 
    pipeline-scripts_airflow-docker-db:
        external: true

另外,我的容器中的日志很有趣,它们如下所示:

代码语言:javascript
复制
apache-airflow-airflow-scheduler-1  | Process DagFileProcessor4728-Process:
apache-airflow-airflow-scheduler-1  | Traceback (most recent call last):
apache-airflow-airflow-scheduler-1  |   File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
apache-airflow-airflow-scheduler-1  |     self.run()
apache-airflow-airflow-scheduler-1  |   File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
apache-airflow-airflow-scheduler-1  |     self._target(*self._args, **self._kwargs)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/processor.py", line 168, in _run_file_processor
apache-airflow-airflow-scheduler-1  |     callback_requests=callback_requests,
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
apache-airflow-airflow-scheduler-1  |     return func(*args, session=session, **kwargs)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/processor.py", line 663, in process_file
apache-airflow-airflow-scheduler-1  |     dagbag.sync_to_db()
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
apache-airflow-airflow-scheduler-1  |     return func(*args, session=session, **kwargs)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 608, in sync_to_db
apache-airflow-airflow-scheduler-1  |     for attempt in run_with_db_retries(logger=self.log):
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/tenacity/__init__.py", line 382, in __iter__
apache-airflow-airflow-scheduler-1  |     do = self.iter(retry_state=retry_state)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/tenacity/__init__.py", line 349, in iter
apache-airflow-airflow-scheduler-1  |     return fut.result()
apache-airflow-airflow-scheduler-1  |   File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 428, in result
apache-airflow-airflow-scheduler-1  |     return self.__get_result()
apache-airflow-airflow-scheduler-1  |   File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
apache-airflow-airflow-scheduler-1  |     raise self._exception
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 622, in sync_to_db
apache-airflow-airflow-scheduler-1  |     DAG.bulk_write_to_db(self.dags.values(), session=session)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 67, in wrapper
apache-airflow-airflow-scheduler-1  |     return func(*args, **kwargs)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 2433, in bulk_write_to_db
apache-airflow-airflow-scheduler-1  |     most_recent_runs = {run.dag_id: run for run in most_recent_runs_iter}
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 2433, in <dictcomp>
apache-airflow-airflow-scheduler-1  |     most_recent_runs = {run.dag_id: run for run in most_recent_runs_iter}
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 100, in instances
apache-airflow-airflow-scheduler-1  |     cursor.close()
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
apache-airflow-airflow-scheduler-1  |     with_traceback=exc_tb,
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
apache-airflow-airflow-scheduler-1  |     raise exception
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 80, in instances
apache-airflow-airflow-scheduler-1  |     rows = [proc(row) for row in fetch]
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 80, in <listcomp>
apache-airflow-airflow-scheduler-1  |     rows = [proc(row) for row in fetch]
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 588, in _instance
apache-airflow-airflow-scheduler-1  |     populators,
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 725, in _populate_full
apache-airflow-airflow-scheduler-1  |     dict_[key] = getter(row)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/sqltypes.py", line 1723, in process
apache-airflow-airflow-scheduler-1  |     return loads(value)
apache-airflow-airflow-scheduler-1  | ValueError: unsupported pickle protocol: 5

如果需要任何其他信息,我可以很高兴地提供。

编辑:微小的更新,所以我先运行\ docker exec -it apache-airflow-airflow-webserver-1 bash,然后运行airflow db upgrade,因为毕竟它只是一个alembic,不应该删除我的数据。

因此,在这样做之后,它添加了缺失的列本身就像这样。现在,当我查看postgres数据库时,我使用它显示dag.has_import_errors如果为false。

然而,在表import_error中,我仍然存在相同的问题,即dags没有更新。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-02-03 21:40:34

让我们一起去吧!

潘科米多!

杜古堡!

终于开始工作了:)。所以主要的问题是我没有所有必需的包。因此,我尝试在容器中只执行pip install configparser,这实际上对我必须运行的一个DAG很有帮助。然而,这看起来既不持久,也不实际,所以我决定继续使用Dockerfile方法,实际上是扩展映像。我相信他们就是这么叫的。这是我的Dockerfile \

代码语言:javascript
复制
FROM apache/airflow:2.2.3-python3.8

COPY requirements.txt ./

RUN pip install -r requirements.txt

现在,关于这个Dockerfile的两件重要事情是,当然,我安装了我可能需要的依赖项,但是我的一些依赖项与气流的依赖项发生了冲突,我刚刚决定从我的requirements.txt文件中删除这些依赖项。

第二件事是添加了python3.8,这实际上消除了错误ValueError: unsupported pickle protocol: 5,这将阻止您查看dags的历史记录。

我遇到的其他问题是找到在容器中放置文件的方法,比如ssh操作符的密钥文件,但这是另一个故事:D。

那么,如果在docker-compose.yaml文件中的课程,您将不得不按以下方式编辑它\

代码语言:javascript
复制
  # image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3-python3.8}
  build: .
  environment:

这些解决了大部分问题。

唯一困扰我的是阿帕奇-气流-气流-网络服务器-1在码头日志中显示为红色。因此,我不确定这是否正常,但除此之外,运行docker ps时一切都是健康的。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70944153

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档