首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >KNative服务在使用KafkaSource事件时不并发处理请求

KNative服务在使用KafkaSource事件时不并发处理请求
EN

Stack Overflow用户
提问于 2022-09-12 02:22:51
回答 1查看 113关注 0票数 2

摘要:

我试图使用KNative事件通过一个卡夫卡主题公开一个简单的web应用程序。服务器应该能够同时处理多个请求,但不幸的是,当我通过Kafka发送请求时,它似乎是顺序地处理它们。但是,当直接向服务发出简单的HTTP请求时,并发性很好。

设置:

安装程序只使用指向我的KafkaSource KNative ServiceService,并且使用使用bitnami/kafka舵图部署的卡夫卡实例。

我使用的版本是v1.7.1,用于KNative服务和事件,v1.7.0用于卡夫卡事件集成(来自knative-sandbox/eventing-kafka)。

代码:

我正在尝试部署的服务是一个python FastAPI应用程序,在接收到一个请求(带有ID的类型)后,记录接收的请求,休眠5秒,然后返回一个虚拟消息:

代码语言:javascript
复制
import asyncio
from fastapi import FastAPI
from pydantic import BaseModel
import logging

logging.basicConfig(
    format="%(asctime)s %(levelname)-8s %(message)s",
    level=logging.DEBUG, datefmt="%Y-%m-%d %H:%M:%S",
)

app = FastAPI()

class Item(BaseModel):
    id: str

@app.post("/")
async def root(item: Item):
    logging.debug(f"Request received with ID: {item.id}")
    await asyncio.sleep(5)
    logging.debug(f"Request complete for ID: {item.id}")
    return {"message": "Hello World"}

该应用程序使用uvicorn提供:

代码语言:javascript
复制
FROM python:3.9-slim

RUN pip install fastapi uvicorn

ADD main.py .

ENTRYPOINT uvicorn --host 0.0.0.0 --port 8877 main:app

服务部署规范显示,我设置的containerConcurrency值大于1

代码语言:javascript
复制
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: concurrency-test
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/class: "kpa.autoscaling.knative.dev"
        autoscaling.knative.dev/metric: "concurrency"
        autoscaling.knative.dev/target: "5"
    spec:
      containerConcurrency: 5
      containers:
        - name: app
          image: dev.local/concurrency-test:latest
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 8877
---
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: concurrency-test
spec:
  consumerGroup: concurrency-test-group
  bootstrapServers:
  - kafka.default.svc.cluster.local:9092
  topics:
  - concurrency-test-requests
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: concurrency-test

注意:我也尝试过在KafkaSource中使用spec.consumers: 2,但是行为是一样的。

日志:

当使用HTTP直接向服务发送两个并发请求时,日志如下所示(两个请求在6秒内完成,因此并发性生效):

代码语言:javascript
复制
2022-09-12 02:14:36 DEBUG    Request received with ID: abc
2022-09-12 02:14:37 DEBUG    Request received with ID: def
2022-09-12 02:14:41 DEBUG    Request complete for ID: abc
INFO:     10.42.0.7:0 - "POST / HTTP/1.1" 200 OK
2022-09-12 02:14:42 DEBUG    Request complete for ID: def
INFO:     10.42.0.7:0 - "POST / HTTP/1.1" 200 OK

但是,当通过Kafka发送请求时,日志看起来如下(请求正在一个接一个地被处理):

代码语言:javascript
复制
2022-09-12 02:14:55 DEBUG    Request received with ID: 111
2022-09-12 02:15:00 DEBUG    Request complete for ID: 111
INFO:     10.42.0.7:0 - "POST / HTTP/1.1" 200 OK
2022-09-12 02:15:00 DEBUG    Request received with ID: 222
2022-09-12 02:15:05 DEBUG    Request complete for ID: 222
INFO:     10.42.0.7:0 - "POST / HTTP/1.1" 200 OK

请让我知道这种顺序的请求处理是否是使用仅与KafkaSource的事件时的预期行为,我希望在这个设置中有启用并发的方法。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-09-13 20:23:22

Kafka提供分区内的排序(实现是一个分布式日志)。您可能需要更改Kafka主题上的分区数量,以实现更高的并行性;您还可以使用spec.consumers value来提高吞吐量(未经测试)。

我还鼓励将问题in the eventing-kafka repo与您的问题和任何额外的旋钮,如果有其他行为,您正在寻找。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73683932

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档