摘要:
我试图使用KNative事件通过一个卡夫卡主题公开一个简单的web应用程序。服务器应该能够同时处理多个请求,但不幸的是,当我通过Kafka发送请求时,它似乎是顺序地处理它们。但是,当直接向服务发出简单的HTTP请求时,并发性很好。
设置:
安装程序只使用指向我的KafkaSource KNative Service的Service,并且使用使用bitnami/kafka舵图部署的卡夫卡实例。
我使用的版本是v1.7.1,用于KNative服务和事件,v1.7.0用于卡夫卡事件集成(来自knative-sandbox/eventing-kafka)。
代码:
我正在尝试部署的服务是一个python FastAPI应用程序,在接收到一个请求(带有ID的类型)后,记录接收的请求,休眠5秒,然后返回一个虚拟消息:
import asyncio
from fastapi import FastAPI
from pydantic import BaseModel
import logging
logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(message)s",
level=logging.DEBUG, datefmt="%Y-%m-%d %H:%M:%S",
)
app = FastAPI()
class Item(BaseModel):
id: str
@app.post("/")
async def root(item: Item):
logging.debug(f"Request received with ID: {item.id}")
await asyncio.sleep(5)
logging.debug(f"Request complete for ID: {item.id}")
return {"message": "Hello World"}该应用程序使用uvicorn提供:
FROM python:3.9-slim
RUN pip install fastapi uvicorn
ADD main.py .
ENTRYPOINT uvicorn --host 0.0.0.0 --port 8877 main:app服务部署规范显示,我设置的containerConcurrency值大于1
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: concurrency-test
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/class: "kpa.autoscaling.knative.dev"
autoscaling.knative.dev/metric: "concurrency"
autoscaling.knative.dev/target: "5"
spec:
containerConcurrency: 5
containers:
- name: app
image: dev.local/concurrency-test:latest
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8877
---
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: concurrency-test
spec:
consumerGroup: concurrency-test-group
bootstrapServers:
- kafka.default.svc.cluster.local:9092
topics:
- concurrency-test-requests
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: concurrency-test注意:我也尝试过在
KafkaSource中使用spec.consumers: 2,但是行为是一样的。
日志:
当使用HTTP直接向服务发送两个并发请求时,日志如下所示(两个请求在6秒内完成,因此并发性生效):
2022-09-12 02:14:36 DEBUG Request received with ID: abc
2022-09-12 02:14:37 DEBUG Request received with ID: def
2022-09-12 02:14:41 DEBUG Request complete for ID: abc
INFO: 10.42.0.7:0 - "POST / HTTP/1.1" 200 OK
2022-09-12 02:14:42 DEBUG Request complete for ID: def
INFO: 10.42.0.7:0 - "POST / HTTP/1.1" 200 OK但是,当通过Kafka发送请求时,日志看起来如下(请求正在一个接一个地被处理):
2022-09-12 02:14:55 DEBUG Request received with ID: 111
2022-09-12 02:15:00 DEBUG Request complete for ID: 111
INFO: 10.42.0.7:0 - "POST / HTTP/1.1" 200 OK
2022-09-12 02:15:00 DEBUG Request received with ID: 222
2022-09-12 02:15:05 DEBUG Request complete for ID: 222
INFO: 10.42.0.7:0 - "POST / HTTP/1.1" 200 OK请让我知道这种顺序的请求处理是否是使用仅与KafkaSource的事件时的预期行为,我希望在这个设置中有启用并发的方法。
发布于 2022-09-13 20:23:22
Kafka提供分区内的排序(实现是一个分布式日志)。您可能需要更改Kafka主题上的分区数量,以实现更高的并行性;您还可以使用spec.consumers value来提高吞吐量(未经测试)。
我还鼓励将问题in the eventing-kafka repo与您的问题和任何额外的旋钮,如果有其他行为,您正在寻找。
https://stackoverflow.com/questions/73683932
复制相似问题