如何将此同步sqlalchemy查询逻辑转换为异步sqlalchemy。
def update_job_by_id(id:int, job: JobCreate,db: Session,owner_id):
existing_job = db.query(Job).filter(Job.id == id)
if not existing_job.first():
return 0
job.__dict__.update(owner_id=owner_id) #update dictionary with new key value of owner_id
existing_job.update(job.__dict__)
db.commit()
return 1我试图将此逻辑转换为异步sqlalchemy,但没有luck.Here是上述代码的异步版本:
async def update_job_by_id(self, id: int, job: PydanticValidationModel, owner_id):
job_query = await self.db_session.get(db_table, id)
if not job_query:
return 0
updated_job = update(db_table).where(db_table.id == id).values(job.__dict__.update(owner_id = owner_id)).execution_options(synchronize_session="fetch")
await self.db_session.execute(updated_job)
return 1它会产生以下错误:
AttributeError: 'NoneType' object has no attribute 'items'范围:
使用FastAPI和异步SQLAlchemy制作一个简单的作业板API,我在函数async def update_job_by_id中很难完成API的更新功能,首先检查作业的ID (如果ID是True ),然后将PydanticModel导出到dict对象,用owner_id更新PydanticModel,最后更新Model并更新作业,但不幸的是,当我试图更新PydanticModel update(Job).values(**job_dict.update({"owner_id": owner_id})).where(Job.id == id).execution_options(synchronize_session="fetch")时,我得到了None而不是更新的dict。
models/jobs.py
from sqlalchemy import Column, Integer, String, Boolean, Date, ForeignKey
from sqlalchemy.orm import relationship
from db.base_class import Base
class Job(Base):
id = Column(Integer, primary_key=True, index=True)
title = Column(String, nullable=False)
company_name = Column(String, nullable=False)
company_url = Column(String)
location = Column(String, nullable=False)
description = Column(String)
date_posted = Column(Date)
is_active = Column(Boolean, default=True)
owner_id = Column(Integer, ForeignKey('user.id'))
owner = relationship("User", back_populates="jobs")schemas/jobs.py
from typing import Optional
from pydantic import BaseModel
from datetime import date, datetime
class JobBase(BaseModel):
title: Optional[str] = None
company_name: Optional[str] = None
company_url: Optional[str] = None
location: Optional[str] = "remote"
description: Optional[str] = None
date_posted: Optional[date] = datetime.now().date()
class JobCreate(JobBase):
title: str
company_name: str
location: str
description: str
class ShowJob(JobBase):
title: str
company_name: str
company_url: Optional[str]
location: str
date_posted: date
description: str
class Config():
orm_mode = Trueroutes/route_jobs.py
from typing import List
from fastapi import APIRouter, HTTPException, status
from fastapi import Depends
from db.repository.job_board_dal import job_board
from schemas.jobs import JobCreate, ShowJob
from db.repository.job_board_dal import Job
from depends import get_db
router = APIRouter()
@router.post("/create-job",response_model=ShowJob)
async def create_user(Job: JobCreate, jobs: Job = Depends(get_db)):
owner_id = 1
return await jobs.create_new_job(Job, owner_id)
@router.get("/get/{id}", response_model=ShowJob)
async def retrieve_job_by_id(id:int, id_job: job_board = Depends(get_db)):
job_id = await job_board.retrieve_job(id_job, id=id)
if not job_id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Job with id {id} does not exist")
return job_id
@router.get("/all", response_model=List[ShowJob])
async def retrieve_all_jobs(all_jobs: job_board = Depends(get_db)):
return await all_jobs.get_all_jobs()
@router.put("/update/{id}")
async def update_job(id: int, job: JobCreate, job_update: job_board = Depends(get_db)):
current_user = 1
response = await job_update.update_job_by_id(id = id, job = job, owner_id = current_user)
if not response:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Job with id {id} does not exist")
return {"response": "Successfully updated the Job."}db/repository/job_board_dal.py
from typing import List
from sqlalchemy import update
from sqlalchemy.engine import result
from sqlalchemy.orm import Session
from sqlalchemy.future import select
from schemas.users import UserCreate
from schemas.jobs import JobCreate
from db.models.users import User
from db.models.jobs import Job
from core.hashing import Hasher
class job_board():
def __init__(self, db_session: Session):
self.db_session = db_session
async def register_user(self, user: UserCreate):
new_user = User(username=user.username,
email=user.email,
hashed_password=Hasher.get_password_hash(user.password),
is_active = False,
is_superuser=False
)
self.db_session.add(new_user)
await self.db_session.flush()
return new_user
async def create_new_job(self, job: JobCreate, owner_id: int):
new_job = Job(**job.dict(), owner_id = owner_id)
self.db_session.add(new_job)
await self.db_session.flush()
return new_job
async def retrieve_job(self, id:int):
item = await self.db_session.get(Job, id)
return item
async def get_all_jobs(self) -> List[Job]:
query = await self.db_session.execute(select(Job).order_by(Job.is_active == True))
return query.scalars().all()
async def update_job_by_id(self, id: int, job: JobCreate, owner_id):
_job = await self.db_session.execute(select(Job).where(Job.id==id))
(result, ) = _job.one()
job_dict = job.dict()
print(job_dict.update({"owner_id": owner_id}))
#print(job_update)
if not result:
return 0
job_update = update(Job).values(job_dict.update({"owner_id": owner_id})).where(Job.id == id).execution_options(synchronize_session="fetch")
return await self.db_session.execute(job_update)如果有人能指出我在这里错过了什么,我将不胜感激。
发布于 2021-07-14 07:46:43
Sqlalchemy直到1.4版本才支持异步操作,请参阅这。
https://stackoverflow.com/questions/68336093
复制相似问题