首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何用纯gRPC客户端解码Arrow Arrow` `FlightData`‘

如何用纯gRPC客户端解码Arrow Arrow` `FlightData`‘
EN

Stack Overflow用户
提问于 2022-01-08 02:21:53
回答 1查看 508关注 0票数 0

我遇到了一种情况,我们需要使用普通的gRPC客户机(通过grpc.aio API)与箭飞 gRPC服务器对话。

DoGet调用确实到达了服务器,我们收到了一个FlightData作为响应。如果我们对定义的理解是正确的,那么响应包含一条flatbuffers消息,该消息可以以某种方式被解码为RecordBatch

下面是客户端代码,

代码语言:javascript
复制
import asyncio
import pathlib

import grpc
import pyarrow as pa
import pyarrow.flight as pf

import flight_pb2, flight_pb2_grpc

async def main():
    ticket = pf.Ticket("tick")
    sock_file = pathlib.Path.cwd().joinpath("arena.sock").resolve()
    async with grpc.aio.insecure_channel(f"unix://{sock_file}") as channel:
        stub = flight_pb2_grpc.FlightServiceStub(channel)
        async for data in stub.DoGet(flight_pb2.Ticket(ticket=ticket.ticket)):
            assert type(data) is flight_pb2.FlightData
            print(data)
            # How to convert data into a RecordBatch?

asyncio.run(main())

目前,我们还停留在解码FlightData响应的最后一步。

问题是两方面的,

  1. 是否有一些现有的pyarrow.flight工具可以用来解码FlightData类型的python grpc对象;
  2. 如果#1是不可能的,那么还有哪些其他选项可以解码FlightData的内容并从头开始重构RecordBatch呢?

这里的主要兴趣是使用普通的AsyncIO gRPC客户机。据推测,这是不可行的,目前版本的箭飞行gRPC客户端。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-01-08 02:49:14

在pyarrow.flight中确实没有公开这方面的实用程序。

除其他外,ArrowData包含Arrow头和主体。因此,您可以使用pyarrow.ipc来解码它。下面是一个例子:

代码语言:javascript
复制
import asyncio
import pathlib
import struct

import grpc
import pyarrow as pa
import pyarrow.flight as pf

import Flight_pb2, Flight_pb2_grpc

async def main():
    ticket = pf.Ticket("tick")
    async with grpc.aio.insecure_channel("localhost:1234") as channel:
        stub = Flight_pb2_grpc.FlightServiceStub(channel)
        schema = None
        async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
            # 4 bytes: Need IPC continuation token
            token = b'\xff\xff\xff\xff'
            # 4 bytes: message length (little-endian)
            length = struct.pack('<I', len(data.data_header))
            buf = pa.py_buffer(token + length + data.data_header + data.data_body)
            message = pa.ipc.read_message(buf)
            print(message)
            if schema is None:
                # This should work but is unimplemented
                # print(pa.ipc.read_schema(message))
                schema = pa.ipc.read_schema(buf)
                print(schema)
            else:
                batch = pa.ipc.read_record_batch(message, schema)
                print(batch)
                print(batch.to_pydict())

asyncio.run(main())

服务器:

代码语言:javascript
复制
import pyarrow.flight as flight
import pyarrow as pa

class TestServer(flight.FlightServerBase):
    def do_get(self, context, ticket):
        table = pa.table([[1,2,3,4]], names=["a"])
        return flight.RecordBatchStream(table)

TestServer("grpc://localhost:1234").serve()

有一些关于异步飞行API的讨论,如果您想加入dev@邮件列表,请加入它。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70629168

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档