我正忙着用Nameko在Python中编写一个微服务后端。在寻找一个良好的依赖注入包时,我遇到了一个Injector。我真的很喜欢它,我想和Nameko一起使用它。然后,我注意到了一个小问题: Nameko实例化了工作人员,并且不直接使用依赖项注入包。在尝试使用文档让它工作之后,我偶然发现了包nameko喷射器。我喜欢这个概念,并试图实现它,但我得到了一个错误:
参数“绑定”未填充
使用git存储库中的示例代码(如下所示),在初始化NamekoInjector类时出现了问题。
微型服务工作者阶层:
from nameko.rpc import rpc
from services.order_service import OrderService
from nameko_injector.core import NamekoInjector
INJECTOR = NamekoInjector()
@INJECTOR.decorate_service
class OrderWorker:
# Mandatory field for service discovery
name = "order_worker"
def __init__(self, service: OrderService):
self.service = service
@rpc
def get_orders(self):
return self.service.orders()OrderService类:
from models.order.order import Order
class OrderService(object):
def __init__(self):
self.orders = [Order(1), Order(2)]
def get_users(self):
return self.orders订单类:
class Order:
def __init__(self, orderid):
self.orderid = orderid
def __str__(self):
return str(self.orderid)在查看NamekoInjector类时,我无法找到绑定的确切功能以及使用它的时间。首先,我甚至不需要它,但是当我删除绑定字段和NamekoInjector类中的其他用法时,它仍然不能工作。有人能帮帮我吗?谢谢!
发布于 2022-01-06 18:53:00
我意识到现在已经很晚了,但也许还很重要。你离解决方案很近。
缺失的部分:
configure的NamekoInjector函数。它告诉库如何创建依赖项、它们的作用域等。我修改了您的代码,并将其放在一个模块中,但您可以随意组织代码。我这么做是为了简单。
下面的代码应该按原样工作。我还包括了一些帮助我运行这个项目的信息。
import injector
from nameko.rpc import rpc
from nameko_injector.core import NamekoInjector
class Order:
def __init__(self, orderid):
self.orderid = orderid
def __str__(self):
return str(self.orderid)
class OrderService(object):
def __init__(self):
self.orders = [Order(1), Order(2)]
def get_orders(self):
return self.orders
class Calculator:
# class to demonstrate transitive dependency
def __init__(self, randomiser):
self.__randomiser = randomiser
def calculate(self):
return self.__randomiser()
# I usually keep the providers in the same place with the provided
@injector.provider
def provide_calculator() -> Calculator:
# simple function that initialises one dependency
return Calculator(randomiser=lambda: 42)
class ComplexInitService:
"""Service with own dependencies."""
# the service to demonstrate providers.
def __init__(self, calculator: Calculator):
self.__calculator = calculator
def calculate(self):
return self.__calculator.calculate()
@injector.provider
def provide_complex_service(calculator: Calculator) -> ComplexInitService:
return ComplexInitService(calculator)
def configure(binder):
# function that tells the framework how to create a dependency based on the requested type
# in the RPC/HTTP endpoint
binder.bind(Calculator, to=provide_calculator)
binder.bind(ComplexInitService, to=provide_calculator)
INJECTOR = NamekoInjector(configure)
@INJECTOR.decorate_service
class OrderWorker:
# Mandatory field for service discovery
name = "order_worker"
@rpc
def get_orders(self, service: OrderService):
return [order.orderid for order in service.get_orders()]
@rpc
def calcualte_complex_stuff(self, service: ComplexInitService):
return service.calculate()我使用了以下代码片段来插入该项目
rm pyproject.toml poetry.lock
poetry init \
--name nameko-injector-demo \
--no-interaction \
--dependency 'nameko-injector:1.1.2'
# poetry installNameko配置示例
AMQP_URI: 'amqps://user:password@vhots.rmq.cloudamqp.com/vhost'
rpc_exchange: 'nameko-rpc'
max_workers: 10
parent_calls_tracked: 10运行服务
poetry run nameko run --config config.yml service:OrderWorker用nameko shell进行测试
poetry run nameko shell --config config.yml
Nameko Python 3.8.9 (default, May 13 2021, 00:13:06)
[Clang 7.1.0 (tags/RELEASE_710/final)] shell on darwin
Broker: amqps://user:password@crow.rmq.cloudamqp.com/vhost (from --config)
>>> n.rpc.order_worker.get_orders()
[1, 2]
>>> n.rpc.order_worker.calcualte_complex_stuff()
42https://stackoverflow.com/questions/60565168
复制相似问题