首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >FastAPI同步SQLAlchemy到异步SQLAlchemy

FastAPI同步SQLAlchemy到异步SQLAlchemy
EN

Stack Overflow用户
提问于 2021-07-11 12:10:34
回答 1查看 3.7K关注 0票数 1

如何将此同步sqlalchemy查询逻辑转换为异步sqlalchemy。

代码语言:javascript
复制
def update_job_by_id(id:int, job: JobCreate,db: Session,owner_id):
    existing_job = db.query(Job).filter(Job.id == id)
    if not existing_job.first():
        return 0
    job.__dict__.update(owner_id=owner_id)  #update dictionary with new key value of owner_id
    existing_job.update(job.__dict__)
    db.commit()
    return 1

我试图将此逻辑转换为异步sqlalchemy,但没有luck.Here是上述代码的异步版本:

代码语言:javascript
复制
async def update_job_by_id(self, id: int, job: PydanticValidationModel, owner_id):
        job_query = await self.db_session.get(db_table, id)
        if not job_query:
            return 0
        updated_job = update(db_table).where(db_table.id == id).values(job.__dict__.update(owner_id = owner_id)).execution_options(synchronize_session="fetch")
        await self.db_session.execute(updated_job)
        return 1

它会产生以下错误:

代码语言:javascript
复制
AttributeError: 'NoneType' object has no attribute 'items'

范围:

使用FastAPI和异步SQLAlchemy制作一个简单的作业板API,我在函数async def update_job_by_id中很难完成API的更新功能,首先检查作业的ID (如果IDTrue ),然后将PydanticModel导出到dict对象,用owner_id更新PydanticModel,最后更新Model并更新作业,但不幸的是,当我试图更新PydanticModel update(Job).values(**job_dict.update({"owner_id": owner_id})).where(Job.id == id).execution_options(synchronize_session="fetch")时,我得到了None而不是更新的dict。

models/jobs.py

代码语言:javascript
复制
from sqlalchemy import Column, Integer, String, Boolean, Date, ForeignKey
from sqlalchemy.orm import relationship

from db.base_class import Base



class Job(Base):
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, nullable=False)
    company_name = Column(String, nullable=False)
    company_url = Column(String)
    location = Column(String, nullable=False)
    description = Column(String)
    date_posted = Column(Date)
    is_active = Column(Boolean, default=True)
    owner_id = Column(Integer, ForeignKey('user.id'))
    owner = relationship("User", back_populates="jobs")

schemas/jobs.py

代码语言:javascript
复制
from typing import Optional
from pydantic import BaseModel
from datetime import date, datetime


class JobBase(BaseModel):
    title: Optional[str] = None
    company_name: Optional[str] = None
    company_url: Optional[str] = None
    location: Optional[str] = "remote"
    description: Optional[str] = None
    date_posted: Optional[date] = datetime.now().date()

class JobCreate(JobBase):
    title: str
    company_name: str
    location: str
    description: str

class ShowJob(JobBase):
    title: str
    company_name: str
    company_url: Optional[str]
    location: str
    date_posted: date
    description: str

    class Config():
        orm_mode = True

routes/route_jobs.py

代码语言:javascript
复制
from typing import List
from fastapi import APIRouter, HTTPException, status
from fastapi import Depends

from db.repository.job_board_dal import job_board
from schemas.jobs import JobCreate, ShowJob
from db.repository.job_board_dal import Job
from depends import get_db

router = APIRouter()

@router.post("/create-job",response_model=ShowJob)
async def create_user(Job: JobCreate, jobs: Job = Depends(get_db)):
    owner_id = 1
    return await jobs.create_new_job(Job, owner_id)

@router.get("/get/{id}", response_model=ShowJob)
async def retrieve_job_by_id(id:int, id_job: job_board = Depends(get_db)):
    job_id = await job_board.retrieve_job(id_job, id=id)
    if not job_id:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
        detail=f"Job with id {id} does not exist")
    return job_id

@router.get("/all", response_model=List[ShowJob])
async def retrieve_all_jobs(all_jobs: job_board = Depends(get_db)):
    return await all_jobs.get_all_jobs()

@router.put("/update/{id}")
async def update_job(id: int, job: JobCreate, job_update: job_board = Depends(get_db)):
    current_user = 1
    response = await job_update.update_job_by_id(id = id, job = job, owner_id = current_user)
    if not response:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
        detail=f"Job with id {id} does not exist")
    return {"response": "Successfully updated the Job."}

db/repository/job_board_dal.py

代码语言:javascript
复制
from typing import List


from sqlalchemy import update
from sqlalchemy.engine import result
from sqlalchemy.orm import Session
from sqlalchemy.future import select

from schemas.users import UserCreate
from schemas.jobs import JobCreate
from db.models.users import User
from db.models.jobs import Job
from core.hashing import Hasher



class job_board():
    def __init__(self, db_session: Session):
        self.db_session = db_session

    async def register_user(self, user: UserCreate):
        new_user = User(username=user.username,
        email=user.email,
        hashed_password=Hasher.get_password_hash(user.password),
        is_active = False,
        is_superuser=False
        )
        self.db_session.add(new_user)
        await self.db_session.flush()
        return new_user

    
    async def create_new_job(self, job: JobCreate, owner_id: int):
        new_job = Job(**job.dict(), owner_id = owner_id)
        self.db_session.add(new_job)
        await self.db_session.flush()
        return new_job

    async def retrieve_job(self, id:int):
        item = await self.db_session.get(Job, id)
        return item

    async def get_all_jobs(self) -> List[Job]:
        query = await self.db_session.execute(select(Job).order_by(Job.is_active == True))
        return query.scalars().all()

    async def update_job_by_id(self, id: int, job: JobCreate, owner_id):
        _job = await self.db_session.execute(select(Job).where(Job.id==id))
        (result, ) = _job.one()
        job_dict = job.dict()
        print(job_dict.update({"owner_id": owner_id}))
        
        #print(job_update)
        if not result:
            return 0
        job_update = update(Job).values(job_dict.update({"owner_id": owner_id})).where(Job.id == id).execution_options(synchronize_session="fetch")
        return await self.db_session.execute(job_update)

如果有人能指出我在这里错过了什么,我将不胜感激。

EN

回答 1

Stack Overflow用户

发布于 2021-07-14 07:46:43

Sqlalchemy直到1.4版本才支持异步操作,请参阅

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68336093

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档