首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >pySpark:将Kafka流放入parquet中,并从远程会话读取parquet

pySpark:将Kafka流放入parquet中,并从远程会话读取parquet
EN

Stack Overflow用户
提问于 2021-04-29 16:26:02
回答 1查看 263关注 0票数 0

我有一个bitnami spark docker基础设施(一个主机和一个工人)。

Spark读取Kafka流。

代码语言:javascript
复制
stream_df = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", "kafka1:19091")\
    .option("subscribe", "Aleca")\
    .option("startingOffsets", "earliest")\
    .load()

使用select、filter修改stream_df .

并将stream_df写入拼图文件。

代码语言:javascript
复制
   df_edge.writeStream\
        .format("parquet")\
        .option("checkpointLocation", "/tmp/edge/check")\
        .option("path", "/tmp/edge/data")\
        .trigger(processingTime='10 seconds')\
        .start()\
        .awaitTermination()

我使用Kafka发送数据,当我检查目录"/tmp/edge/ data“时,我只有一个目录_spark_metadata。

在这个目录中,我有一个json文件,文件路径很快。但是snappy不会被创建。

从不同的docker容器中,我尝试读取拼图文件。

代码语言:javascript
复制
spark = SparkSession.builder\
    .appName('Flask_gunicorn') \
    .master('spark://0.0.0.0:7077') \
    .config('spark.jars.packages', 'graphframes:graphframes:0.8.1-spark3.0-s_2.12') \
    .config('spark.submit.deployMode', 'client') \
    .config('spark.executor.memory', '1g') \
    .config('spark.cores.max', '1') \
    .config('spark.jars.ivy', '/opt/bitnami/spark/ivy') \
    .config('spark.jars', '/opt/bitnami/spark/jars') \
    .getOrCreate()

edge_df = spark.read.csv(edge_location)

读取器返回一个错误:

代码语言:javascript
复制
Traceback (most recent call last):
  File "/usr/src/app/apao-flask-gunicorn/graph_generator.py", line 22, in <module>
    vertex_df = spark.read.parquet(edge_location)
  File "/usr/local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 353, in parquet
  File "/usr/local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/usr/local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 134, in deco
  File "<string>", line 3, in raise_from
pyspark.sql.utils.AnalysisException: Path does not exist: file:/tmp/edge/parquet;

如果我尝试使用spark shell读取:我有一个错误文件.snappy.parquet不存在。

但是如果提交一个读取spark容器上拼花文件的应用程序,我就可以访问数据...

我已经尝试了csv文件,我有类似的错误。

什么是有快文件的方法,当一个put流?

谢谢塞巴斯蒂安

EN

回答 1

Stack Overflow用户

发布于 2021-05-01 14:45:49

最后一条新闻

我的基础设施:

容器烧瓶(无数据) ->容器火花主机(json元数据) ->容器火花工人(拼图文件)

当在Flask容器上启动我的python脚本(与Spark master上的远程会话)时,元数据的读取在Flask容器上本地完成,而不是在Spark master中。

如果我从flask容器调用spark-submit到master spark (使用本地会话),我有相同的pb。

如果我从主spark容器调用spark-submit到主spark (使用本地会话),它可以正常工作,但我更喜欢有单独的docker文件。

感谢@OneCricketeer的解释。

docker-compose.yml:

代码语言:javascript
复制
version: '2'

networks:
  default:
    driver: bridge
    name: aleca_network

services:
  spark:
    build: .
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
      - SPARK_LOCAL_DIRS=/tmp
    ports:
      - '8080:8080'
      - '7077:7077'
    volumes:
      - ./master-ivy-jars:/opt/bitnami/spark/ivy:z
      #- ./vertex_data:/opt/bitnami/spark/data/vertex
      #- ./edge_data:/opt/bitnami/spark/data/edge

  spark-worker-1:
    build: .
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark:7077
      - SPARK_WORKER_INSTANCES=2
      - SPARK_WORKER_MEMORY=4G
      - SPARK_WORKER_CORES=4
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
      - SPARK_LOCAL_DIRS=/tmp
      - SPARK_WORKER_PORT=7078
    volumes:
      - ./worker1-ivy-jars:/opt/bitnami/spark/ivy:z
      #- ./vertex_data:/opt/bitnami/spark/data/vertex
      #- ./edge_data:/opt/bitnami/spark/data/edge

Dockerfile:

代码语言:javascript
复制
FROM docker.io/bitnami/spark:3-debian-10

USER root
RUN pip install numpy

USER 1001
RUN mkdir /tmp/vertex
RUN mkdir /tmp/vertex/data
RUN mkdir /tmp/vertex/check

RUN mkdir /tmp/edge
RUN mkdir /tmp/edge/data
RUN mkdir /tmp/edge/check


RUN curl https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.0.1/spark-sql-kafka-0-10_2.12-3.0.1.jar --output /opt/bitnami/spark/jars/spark-sql-kafka-0-10_2.12-3.0.1.jar
RUN curl https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.6.2/commons-pool2-2.6.2.jar --output /opt/bitnami/spark/jars/commons-pool2-2.6.2.jar
RUN curl https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.4.1/kafka-clients-2.4.1.jar --output /opt/bitnami/spark/jars/kafka-clients-2.4.1.jar
RUN curl https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.0.1/spark-token-provider-kafka-0-10_2.12-3.0.1.jar --output /opt/bitnami/spark/jars/spark-token-provider-kafka-0-10_2.12-3.0.1.jar
RUN curl https://repo1.maven.org/maven2/org/apache/spark/spark-tags_2.12/3.0.1/spark-tags_2.12-3.0.1.jar --output /opt/bitnami/spark/jars/spark-tags_2.12-3.0.1.jar
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67313685

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档