首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >获取嵌套(连接)表以在FastAPI和SQLModel提供的SQLModel接口中显示

获取嵌套(连接)表以在FastAPI和SQLModel提供的SQLModel接口中显示
EN

Stack Overflow用户
提问于 2022-07-05 13:51:04
回答 1查看 254关注 0票数 1

我很难理解如何使用FastAPI和SQLModel在一对多的关系中显示子数据。我使用的是Python3.10.3、FastAPI版本0.78.0和SQLModel版本0.0.6。下面是父/子数据库模型的简化版本:

代码语言:javascript
复制
from datetime import datetime
from email.policy import default
from sqlalchemy import UniqueConstraint
from sqlmodel import Field, SQLModel, Relationship

class CustomerBase(SQLModel):
    __table_args__ = (UniqueConstraint("email"),)

    first_name: str
    last_name: str
    email: str
    active: bool | None = True

class Customer(CustomerBase, table=True):
    id: int | None =Field(primary_key=True, default=None)

class CustomerCreate(CustomerBase):
    pass

class CustomerRead(CustomerBase):
    id: int

class CustomerReadWithCalls(CustomerRead):
    calls: list["CallRead"] = []

class CallBase(SQLModel):
    duration: int
    cost_per_minute: int | None = None
    customer_id: int | None = Field(default=None, foreign_key="customer.id")
    created: datetime = Field(nullable=False, default=datetime.now().date())

class Call(CallBase, table=True):
    id: int | None = Field(primary_key=True)

class CallCreate(CallBase):
    pass

class CallRead(CallBase):
    id: int

class CallReadWithCustomer(CallRead):
    customer: CustomerRead | None

下面是API路由:

代码语言:javascript
复制
from fastapi import APIRouter, HTTPException, Depends, Query
from rbi_app.crud.customer import (
    get_customers,
    get_customer,
)
from rbi_app.models import (
    CustomerRead,
    CustomerReadWithCalls,
)
from rbi_app.database import Session, get_session

router = APIRouter()

@router.get("/customers/", status_code=200, response_model=list[CustomerRead])
def read_customers(
    email: str = "",
    offset: int = 0,
    limit: int = Query(default=100, lte=100),
    db: Session = Depends(get_session)
):
    return get_customers(db, email, offset=offset, limit=limit)

@router.get("/customers/{customer_id}", status_code=200, response_model=CustomerReadWithCalls)
def read_customer(id: int, db: Session = Depends(get_session)):
    customer = get_customer(db, id)
    if customer is None:
        raise HTTPException(status_code=404, detail=f"Customer not found for {id=}")
    return customer

下面是对API路由端点所做的数据库的查询:

代码语言:javascript
复制
from sqlmodel import select
from rbi_app.database import Session
from rbi_app.models import (
    Customer,
    CustomerCreate,
)
# from rbi_app.schemas.customer import CustomerCreate
    
def get_customer(db: Session, id: int):
    return db.get(Customer, id)
    
def get_customers(db: Session, email: str = "", offset: int = 0, limit: int = 100):
    if email:
        return db.exec(select(Customer).where(Customer.email == email)).first()
    return db.exec(select(Customer).offset(offset).limit(limit).order_by(Customer.id)).all()

当我导航到一条获得所有客户的路径时,我的查询运行,我得到一个客户,但是在客户中没有“调用”列表属性。OpenAPI显示显示一个"calls“属性,但它是空的。

我做错了什么?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-08-16 11:02:40

这里的问题似乎是您没有在Customer模型(或Call模块)上定义关系。因为您使用Customer模型查询数据库,而且它没有calls属性,所以get_customer函数返回的对象中没有这些数据。

尽管路由将CustomerReadWithCalls定义为响应模型,但在调用它时,该类的对象只能使用路由处理程序函数返回的数据实例化,在本例中是Customer实例。因为这甚至没有calls属性(更不用说数据),所以CustomerReadWithCalls对象本质上是用您为calls字段定义的默认值--空列表创建的。

添加

代码语言:javascript
复制
    calls: list["Call"] = Relationship(back_populates="customer")

对于您的Customer模型,应该足够了。

(但顺便提一句,对于我来说,只有在CustomerReadWithCalls定义之后显式更新CallRead模型上的引用时,路由文档才能正常工作。)

下面是一个完整的工作示例。

models.py

代码语言:javascript
复制
from datetime import datetime

from sqlalchemy import UniqueConstraint
from sqlmodel import Field, Relationship, SQLModel


class CustomerBase(SQLModel):
    __table_args__ = (UniqueConstraint("email"),)

    first_name: str
    last_name: str
    email: str
    active: bool | None = True


class Customer(CustomerBase, table=True):
    id: int | None = Field(primary_key=True, default=None)

    calls: list["Call"] = Relationship(back_populates="customer")


class CustomerCreate(CustomerBase):
    pass


class CustomerRead(CustomerBase):
    id: int


class CustomerReadWithCalls(CustomerRead):
    calls: list["CallRead"] = []


class CallBase(SQLModel):
    duration: int
    cost_per_minute: int | None = None
    customer_id: int | None = Field(default=None, foreign_key="customer.id")
    created: datetime = Field(nullable=False, default=datetime.now().date())


class Call(CallBase, table=True):
    id: int | None = Field(primary_key=True, default=None)

    customer: Customer | None = Relationship(back_populates="calls")


class CallCreate(CallBase):
    pass


class CallRead(CallBase):
    id: int


# After the definition of `CallRead`, update the forward reference to it:
CustomerReadWithCalls.update_forward_refs()


class CallReadWithCustomer(CallRead):
    customer: CustomerRead | None

routes.py

代码语言:javascript
复制
from fastapi import FastAPI, HTTPException, Depends
from sqlmodel import Session, SQLModel, create_engine

from .models import CustomerReadWithCalls, Customer, Call


api = FastAPI()

sqlite_file_name = 'database.db'
sqlite_url = f'sqlite:///{sqlite_file_name}'
engine = create_engine(sqlite_url, echo=True)


@api.on_event('startup')
def initialize_db():
    SQLModel.metadata.drop_all(engine)
    SQLModel.metadata.create_all(engine)

    # For testing:
    with Session(engine) as session:
        customer = Customer(first_name="Foo", last_name="Bar", email="foo@bar.com")
        call1 = Call(duration=123)
        call2 = Call(duration=456)
        customer.calls.extend([call1, call2])
        session.add(customer)
        session.commit()


def get_session() -> Session:
    session = Session(engine)
    try:
        yield session
    finally:
        session.close()


def get_customer(db: Session, id: int):
    return db.get(Customer, id)


@api.get("/customers/{customer_id}", status_code=200, response_model=CustomerReadWithCalls)
def read_customer(customer_id: int, db: Session = Depends(get_session)):
    customer = get_customer(db, customer_id)
    if customer is None:
        raise HTTPException(status_code=404, detail=f"Customer not found for {customer_id=}")
    return customer

启动API服务器并将GET发送给http://127.0.0.1:8000/customers/1

代码语言:javascript
复制
{
  "first_name": "Foo",
  "last_name": "Bar",
  "email": "foo@bar.com",
  "active": true,
  "id": 1,
  "calls": [
    {
      "duration": 123,
      "cost_per_minute": null,
      "customer_id": 1,
      "created": "2022-08-16T00:00:00",
      "id": 1
    },
    {
      "duration": 456,
      "cost_per_minute": null,
      "customer_id": 1,
      "created": "2022-08-16T00:00:00",
      "id": 2
    }
  ]
}

希望这能有所帮助。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72870598

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档