目前,我有接受输入并创建数据流的代码。我的目标是将数据上传到snowflake。目前我正在尝试,有没有更简单的方法来解决这个问题。或者,有没有可能将此写入熊猫df,然后将熊猫df上传到snowflake?它以前只适用于结构化流媒体,而没有连接到snowflake。
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
sfconn = {
"sfURL": f"{os.getenv('SNOWFLAKE_ACCOUNT')}.snowflakecomputing.com",
"sfUser": os.getenv('SNOWFLAKE_USER'),
"sfPassword": os.getenv('SNOWFLAKE_PASSWORD'),
"sfDatabase": "x",
"sfSchema": "x",
"sfWarehouse": "x"
}
spark = SparkSession.builder\
.appName("snowflake-connector")\
.getOrCreate()
df = spark \
.readStream\
.format('json') \
.schema(spark_schemas['x']) \
.load(f"s3a://{x_path}")
out = df \
.writeStream\
.outputMode("append")\
.option("dbtable", "scratch_table")\
.options(sfconn)\
.trigger(processingTime='1 minutes')\
.format("snowflake")\
.start()现在它正在上映
options() takes 1 positional argument but 2 were given和
: java.lang.ClassNotFoundException: Failed to find data source: snowflake. 发布于 2020-08-15 20:40:36
第一个错误如下- options采用指定options的可变数量的对。如果您有选项作为映射,则需要使用**map语法对其进行"unpack“,如下所示:
opts = {'inferSchema': "true", "header": "false"}
df = spark.read.options(**opts)
.format("csv")
.schema("ticker String,date Date, price Float")
.load(".../datasets/dow-quotes.csv")对于第二个错误-你只需要指定正确的连接器名称- net.snowflake.spark.snowflake而不是snowflake,并确保在提交作业时指定了雪花火花连接器。有关更多详细信息,请参见雪花documentation。
https://stackoverflow.com/questions/63382790
复制相似问题