我正试着在气流中使用HDFS传感器。但是我得到了下面的错误
[2020-06-30 07:22:57,636] {dagbag.py:246} ERROR - Failed to import: /airflow/dags/test_s3_file.py
Traceback (most recent call last):
File "/airflow/airflow-staging/lib/python3.6/site-packages/airflow/models/dagbag.py", line 243, in process_file
m = imp.load_source(mod_name, filepath)
File "/airflow/airflow-staging/lib64/python3.6/imp.py", line 172, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 684, in _load
File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/airflow/dags/test_s3_file.py", line 3, in <module>
from S3HDFSObjectSensor import S3HDFSObjectSensor
File "/airflow/plugins/S3HDFSObjectSensor.py", line 6, in <module>
from airflow.hooks.hdfs_hook import HDFSHook
File "/airflow/airflow-staging/lib/python3.6/site-packages/airflow/hooks/hdfs_hook.py", line 24, in <module>
from snakebite.client import Client, HAClient, Namenode, AutoConfigClient
File "/airflow/airflow-staging/lib/python3.6/site-packages/snakebite/client.py", line 1473
baseTime = min(time * (1L << retries), cap);
^
SyntaxError: invalid syntax从谷歌,发现我们会有这个问题,如果我们使用python3。你能让我知道除了使用python2之外,还有其他的解决方法吗?
from airflow.plugins.S3HDFSObjectSensor import S3HDFSObjectSensor
t1 = S3HDFSObjectSensor(
task_id='t1',
poke_interval=sensor_poke_interval,
timeout=sensor_timeout,
object_path=check_file_in_hdfs,
trigger_rule='one_failed',
dag=dag)下面是我的AIRFLOW_HOME目录结构
airflow
+-- dags
+-- plugins
+-- S3HDFSObjectSensor.py谢谢
发布于 2020-06-30 17:54:42
在安装了“pip3 install snakebite py3”之后,它就可以工作了。
https://stackoverflow.com/questions/62634706
复制相似问题