首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用pg8000的Postgres复制流(错误:无法确定参数$1的数据类型)

使用pg8000的Postgres复制流(错误:无法确定参数$1的数据类型)
EN

Stack Overflow用户
提问于 2021-10-09 00:15:42
回答 1查看 624关注 0票数 0

我正在尝试实现一个副本声明,将熊猫的数据推送到一个CloudSQL Postgres数据库中的气流数据数据。我有一个限制:我只能使用pg8000驱动程序。我使用它作为一个参考https://github.com/tlocke/pg8000#copy-from-and-to-a-file (我在这个线程https://news.ycombinator.com/item?id=25402430中找到)

这是我的密码

代码语言:javascript
复制
    def getconn() -> pg8000.native.Connection:
        conn: pg8000.native.Connection = connector.connect(
            PG_CONFIG["host"],
            "pg8000",
            user=PG_CONFIG["user"],
            password=PG_CONFIG["password"],
            db=PG_CONFIG["database"]
        )
        return conn
    engine = sqlalchemy.create_engine("postgresql+pg8000://",creator=getconn)
    engine.dialect.description_encoding = None

    stream_in = StringIO()
    csv_writer = csv.writer(stream_in)
    csv_writer.writerow([1, "electron"])
    csv_writer.writerow([2, "muon"])
    csv_writer.writerow([3, "tau"])
    stream_in.seek(0)

    conn = engine.connect()
    conn.execute("CREATE TABLE IF NOT EXISTS temp_table (user_id numeric, user_name text)")    
    conn.execute("COPY temp_table FROM STDIN WITH (FORMAT CSV)", stream=stream_in)

我已经尝试了我能想到的一切(使用DELEMITER选项,传递文本而不是csv.)但我仍然得到以下错误:“无法确定参数$1的数据类型”

代码语言:javascript
复制
[SQL: COPY winappsx.aa FROM STDIN WITH (FORMAT CSV)]
[parameters: {'stream': <_io.StringIO object at 0x7f86a58d7dc8>}]
(Background on this error at: http://sqlalche.me/e/13/4xp6)
Traceback (most recent call last):
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
  File "/opt/python3.6/lib/python3.6/site-packages/pg8000/dbapi.py", line 454, in execute
    statement, vals=vals, input_oids=self._input_oids, stream=stream
  File "/opt/python3.6/lib/python3.6/site-packages/pg8000/core.py", line 632, in execute_unnamed
    self.handle_messages(context)
  File "/opt/python3.6/lib/python3.6/site-packages/pg8000/core.py", line 769, in handle_messages
    raise self.error
pg8000.exceptions.DatabaseError: {'S': 'ERROR', 'V': 'ERROR', 'C': '42P18', 'M': 'could not determine data type of parameter $1', 'F': 'postgres.c', 'L': '1363', 'R': 'exec_parse_message'}

我知道连接可以工作,因为表的创建是正确的。该错误发生在复制语句上。

我怀疑流参数的提供方式存在问题,但无法找到正确的语法。这可能对https://www.kite.com/python/docs/pg8000.Cursor.execute有帮助

谢谢你的帮助!

EN

回答 1

Stack Overflow用户

发布于 2021-10-09 10:13:46

一个朋友找到了答案-)

我们使用的不是普通的SQLAlchemy连接,而是使用pg8000 API的连接。这是来自https://docs.sqlalchemy.org/en/13/core/connections.html#working-with-raw-dbapi-connections

现在我们有了pg8000连接,我看了pg8000示例的这一部分:https://github.com/tlocke/pg8000#copy-from-and-to-a-file-1从at 8000-conn生成一个游标,然后使用cursor.execute函数。这个connPG8K.cursor.execute()在第120行使用pg8000,然后可以在函数中使用流输入。sqlAlchemy conn.execute没有流输入选项,因此可能会失败。

以下是代码:

代码语言:javascript
复制
stream_in = StringIO()
    csv_writer = csv.writer(stream_in)
    csv_writer.writerow([1, "electron"])
    csv_writer.writerow([2, "muon"])
    csv_writer.writerow([3, "tau"])
    csv_writer.writerow([4, "sean is the best"])
    stream_in.seek(0)
    
   # Creates a connection with sqlalchemy methods
    conn = engine.connect()
    # Get the connection from pg8000 library
    connPG8K = engine.raw_connection()
    # Get cursor from pg8000 to be able to run commands
    cursor = connPG8K.cursor()
    cursor.execute("CREATE TABLE IF NOT EXISTS temp_table (user_id numeric, user_name text)")        
    cursor.execute("COPY temp_table FROM STDIN WITH (FORMAT csv, DELIMITER)", stream=stream_in)
    connPG8K.commit()
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69502814

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档