我有一个bitnami spark docker基础设施(一个主机和一个工人)。
Spark读取Kafka流。
stream_df = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", "kafka1:19091")\
.option("subscribe", "Aleca")\
.option("startingOffsets", "earliest")\
.load()使用select、filter修改stream_df .
并将stream_df写入拼图文件。
df_edge.writeStream\
.format("parquet")\
.option("checkpointLocation", "/tmp/edge/check")\
.option("path", "/tmp/edge/data")\
.trigger(processingTime='10 seconds')\
.start()\
.awaitTermination()我使用Kafka发送数据,当我检查目录"/tmp/edge/ data“时,我只有一个目录_spark_metadata。
在这个目录中,我有一个json文件,文件路径很快。但是snappy不会被创建。
从不同的docker容器中,我尝试读取拼图文件。
spark = SparkSession.builder\
.appName('Flask_gunicorn') \
.master('spark://0.0.0.0:7077') \
.config('spark.jars.packages', 'graphframes:graphframes:0.8.1-spark3.0-s_2.12') \
.config('spark.submit.deployMode', 'client') \
.config('spark.executor.memory', '1g') \
.config('spark.cores.max', '1') \
.config('spark.jars.ivy', '/opt/bitnami/spark/ivy') \
.config('spark.jars', '/opt/bitnami/spark/jars') \
.getOrCreate()
edge_df = spark.read.csv(edge_location)读取器返回一个错误:
Traceback (most recent call last):
File "/usr/src/app/apao-flask-gunicorn/graph_generator.py", line 22, in <module>
vertex_df = spark.read.parquet(edge_location)
File "/usr/local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 353, in parquet
File "/usr/local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
File "/usr/local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 134, in deco
File "<string>", line 3, in raise_from
pyspark.sql.utils.AnalysisException: Path does not exist: file:/tmp/edge/parquet;如果我尝试使用spark shell读取:我有一个错误文件.snappy.parquet不存在。
但是如果提交一个读取spark容器上拼花文件的应用程序,我就可以访问数据...
我已经尝试了csv文件,我有类似的错误。
什么是有快文件的方法,当一个put流?
谢谢塞巴斯蒂安
发布于 2021-05-01 14:45:49
最后一条新闻
我的基础设施:
容器烧瓶(无数据) ->容器火花主机(json元数据) ->容器火花工人(拼图文件)
当在Flask容器上启动我的python脚本(与Spark master上的远程会话)时,元数据的读取在Flask容器上本地完成,而不是在Spark master中。
如果我从flask容器调用spark-submit到master spark (使用本地会话),我有相同的pb。
如果我从主spark容器调用spark-submit到主spark (使用本地会话),它可以正常工作,但我更喜欢有单独的docker文件。
感谢@OneCricketeer的解释。
docker-compose.yml:
version: '2'
networks:
default:
driver: bridge
name: aleca_network
services:
spark:
build: .
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_LOCAL_DIRS=/tmp
ports:
- '8080:8080'
- '7077:7077'
volumes:
- ./master-ivy-jars:/opt/bitnami/spark/ivy:z
#- ./vertex_data:/opt/bitnami/spark/data/vertex
#- ./edge_data:/opt/bitnami/spark/data/edge
spark-worker-1:
build: .
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_INSTANCES=2
- SPARK_WORKER_MEMORY=4G
- SPARK_WORKER_CORES=4
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_LOCAL_DIRS=/tmp
- SPARK_WORKER_PORT=7078
volumes:
- ./worker1-ivy-jars:/opt/bitnami/spark/ivy:z
#- ./vertex_data:/opt/bitnami/spark/data/vertex
#- ./edge_data:/opt/bitnami/spark/data/edgeDockerfile:
FROM docker.io/bitnami/spark:3-debian-10
USER root
RUN pip install numpy
USER 1001
RUN mkdir /tmp/vertex
RUN mkdir /tmp/vertex/data
RUN mkdir /tmp/vertex/check
RUN mkdir /tmp/edge
RUN mkdir /tmp/edge/data
RUN mkdir /tmp/edge/check
RUN curl https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.0.1/spark-sql-kafka-0-10_2.12-3.0.1.jar --output /opt/bitnami/spark/jars/spark-sql-kafka-0-10_2.12-3.0.1.jar
RUN curl https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.6.2/commons-pool2-2.6.2.jar --output /opt/bitnami/spark/jars/commons-pool2-2.6.2.jar
RUN curl https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.4.1/kafka-clients-2.4.1.jar --output /opt/bitnami/spark/jars/kafka-clients-2.4.1.jar
RUN curl https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.0.1/spark-token-provider-kafka-0-10_2.12-3.0.1.jar --output /opt/bitnami/spark/jars/spark-token-provider-kafka-0-10_2.12-3.0.1.jar
RUN curl https://repo1.maven.org/maven2/org/apache/spark/spark-tags_2.12/3.0.1/spark-tags_2.12-3.0.1.jar --output /opt/bitnami/spark/jars/spark-tags_2.12-3.0.1.jarhttps://stackoverflow.com/questions/67313685
复制相似问题