
在本地部署完dify后Dify使用deepseek,我们发现启动了10个容器,我们先从这10个容器的功能开始探索dify的源码
[+] Running 10/10
⠿ Container docker-sandbox-1 Started 16.7s
⠿ Container docker-ssrf_proxy-1 Started 16.9s
⠿ Container docker-web-1 Started 16.7s
⠿ Container docker-worker-1 Started 50.2s
⠿ Container docker-plugin_daemon-1 Started 49.7s
⠿ Container docker-api-1 Started 50.2s
⠿ Container docker-nginx-1 Started 48.1s
⠿ Container docker-redis-1 Started 2.0s
⠿ Container docker-db-1 Healthy 33.8s
⠿ Container docker-weaviate-1 Started 首先是两个安全组件,阅读docker-compose.yaml,发现docker-sandbox-1 使用的镜像是
langgenius/dify-sandbox:0.2.12用户上传的 Python 脚本、自定义工具等,需在沙盒中隔离运行。
docker-ssrf_proxy-1,SSRF(服务器端请求伪造)代理,用于安全地处理外部网络请求, 对应的配置位于dify/docker/ssrf_proxy/squid.conf.template使用的镜像是:
ubuntu/squid:latest接下来的几个存储组件,docker-redis-1 缓存和消息队列服务,用于提升性能和任务调度docker-db-1数据库服务(通常是PostgreSQL),存储应用数据docker-weaviate-1向量数据库,用于存储和检索嵌入向量(embeddings),支持AI应用的语义搜索。
docker-nginx-1 Web服务器,负责反向代理和负载均衡。
剩下的就是dify定义的四个容器:1,docker-web-1 Dify的前端Web界面,提供用户操作界面。2,docker-worker-1 后台工作进程,处理异步任务(如模型推理、数据处理)。3,docker-plugin_daemon-1 插件守护进程,管理插件的生命周期和运行。4,docker-api-1 Dify的后端API服务,提供RESTful接口。
web对应的源码位于 dify/web/目录,是基于react的next.js框架开发的,我们接触操作的页面就是它实现的。
剩下的后端源码都是基于python实现的,源码位于目录:dify/api/
可以使用如下命令启动服务和worker
uv run flask run --host 0.0.0.0 --port=5001 --debuguv run celery -A app.celery worker -P threads -c 2 --loglevel INFO -Q dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention我们从入口文件dify/api/app.py开始阅读,它是基于flask框架提供的http服务
from app_factory import create_app
app = create_app()
celery = app.extensions["celery"]if __name__ == "__main__":
app.run(host="0.0.0.0", port=5001)在dify/api/app_factory.py中初始化了flask框架
def create_app() -> DifyApp:
start_time = time.perf_counter()
app = create_flask_app_with_configs()
initialize_extensions(app)
end_time = time.perf_counter()
if dify_config.DEBUG:
logger.info("Finished create_app (%s ms)", round((end_time - start_time) * 1000, 2))
return appdef create_flask_app_with_configs() -> DifyApp:
"""
create a raw flask app
with configs loaded from .env file
"""
dify_app = DifyApp(__name__)from flask import Flask
class DifyApp(Flask):
pass至此完成了服务器的初始化。API服务模块采用经典三层架构设计:
• 接口层(controllers/):负责HTTP请求接收与响应处理
• 核心逻辑层(core/):封装系统核心能力,如agent、rag、workflow
• 业务服务层(services/):承上启下的中间层,封装复杂业务流程
在dify/api/controllers/service_api/__init__.py,使用flask_restx定义namespace,然后在这个ns下定义了各种接口
from flask import Blueprint
from flask_restx import Namespace
from libs.external_api import ExternalApi
bp = Blueprint("service_api", __name__, url_prefix="/v1")
api = ExternalApi(
bp,
version="1.0",
title="Service API",
description="API for application services",
)
service_api_ns = Namespace("service_api", description="Service operations", path="/")
api.add_namespace(service_api_ns)比如chat接口,定义位于dify/api/controllers/service_api/app/conversation.py
@service_api_ns.route("/conversations")
class ConversationApi(Resource):
try:
with Session(db.engine) as session:
return ConversationService.pagination_by_last_id(
session=session,
app_model=app_model,
user=end_user,
last_id=last_id,
limit=query_args.limit,
invoke_from=InvokeFrom.SERVICE_API,
sort_by=query_args.sort_by,
)对话补全 dify/api/controllers/service_api/app/completion.py
@service_api_ns.route("/completion-messages")
class CompletionApi(Resource):
@validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
def post(self, app_model: App, end_user: EndUser):
"""Create a completion for the given prompt.
This endpoint generates a completion based on the provided inputs and query.
Supports both blocking and streaming response modes.
"""
try:
response = AppGenerateService.generate(
app_model=app_model,
user=end_user,
args=args,
invoke_from=InvokeFrom.SERVICE_API,
streaming=streaming,
)
return helper.compact_generate_response(response)工作流dify/api/controllers/service_api/app/workflow.py
@service_api_ns.route("/workflows/run/<string:workflow_run_id>")
class WorkflowRunDetailApi(Resource):
@service_api_ns.marshal_with(build_workflow_run_model(service_api_ns))
def get(self, app_model: App, workflow_run_id: str):
"""Get a workflow task running detail.
Returns detailed information about a specific workflow run.
"""
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
workflow_run = workflow_run_repo.get_workflow_run_by_id(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
run_id=workflow_run_id,
)
return workflow_run@service_api_ns.route("/workflows/run")
class WorkflowRunApi(Resource):
@validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
def post(self, app_model: App, end_user: EndUser):
"""Execute a workflow.
Runs a workflow with the provided inputs and returns the results.
Supports both blocking and streaming response modes.
"""
try:
response = AppGenerateService.generate(
app_model=app_model, user=end_user, args=args, invoke_from=InvokeFrom.SERVICE_API, streaming=streaming
)
return helper.compact_generate_response(response)可以看到,都是调用了AppGenerateService.generate来完成答案的生成。对应代码目录在dify/api/services/app_generate_service.py
class AppGenerateService:
@classmethod
@trace_span(AppGenerateHandler)
def generate(
cls,
app_model: App,
user: Union[Account, EndUser],
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: bool = True,
root_node_id: str | None = None,
):
"""
App Content Generate
:param app_model: app model
:param user: user
:param args: args
:param invoke_from: invoke from
:param streaming: streaming
:return:
"""
try:
request_id = rate_limit.enter(request_id)
if app_model.mode == AppMode.COMPLETION:
return rate_limit.generate(
CompletionAppGenerator.convert_to_event_stream(
CompletionAppGenerator().generate(
app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
),
),
request_id=request_id,
)
elif app_model.mode == AppMode.AGENT_CHAT or app_model.is_agent:
return rate_limit.generate(
AgentChatAppGenerator.convert_to_event_stream(
AgentChatAppGenerator().generate(
app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
),
),
request_id,
)
elif app_model.mode == AppMode.CHAT:
return rate_limit.generate(
ChatAppGenerator.convert_to_event_stream(
ChatAppGenerator().generate(
app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
),
),
request_id=request_id,
)
elif app_model.mode == AppMode.ADVANCED_CHAT:
workflow_id = args.get("workflow_id")
workflow = cls._get_workflow(app_model, invoke_from, workflow_id)
return rate_limit.generate(
AdvancedChatAppGenerator.convert_to_event_stream(
AdvancedChatAppGenerator().generate(
app_model=app_model,
workflow=workflow,
user=user,
args=args,
invoke_from=invoke_from,
streaming=streaming,
),
),
request_id=request_id,
)
elif app_model.mode == AppMode.WORKFLOW:
workflow_id = args.get("workflow_id")
workflow = cls._get_workflow(app_model, invoke_from, workflow_id)
return rate_limit.generate(
WorkflowAppGenerator.convert_to_event_stream(
WorkflowAppGenerator().generate(
app_model=app_model,
workflow=workflow,
user=user,
args=args,
invoke_from=invoke_from,
streaming=streaming,
root_node_id=root_node_id,
call_depth=0,
),
),
request_id,
)
else:
raise ValueError(f"Invalid app mode {app_model.mode}")
except Exception:
quota_charge.refund()
rate_limit.exit(request_id)
raise
finally:
if not streaming:
rate_limit.exit(request_id)根据不同的请求类型,将请求分发到core,比如其中completition源码位于learn/go-dify/api/core/app/apps/completion/app_generator.py
class CompletionAppGenerator(MessageBasedAppGenerator):
@overload
def generate(
self,
app_model: App,
user: Union[Account, EndUser],
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: Literal[True],
) -> Generator[str | Mapping[str, Any], None, None]: ...
def generate(
self,
app_model: App,
user: Union[Account, EndUser],
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: bool = True,
) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]:
"""
Generate App response.
:param app_model: App
:param user: account or end user
:param args: request args
:param invoke_from: invoke from source
:param streaming: is stream
"""
# init application generate entity
application_generate_entity = CompletionAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
model_conf=ModelConfigConverter.convert(app_config),
file_upload_config=file_extra_config,
inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
),
query=query,
files=list(file_objs),
user_id=user.id,
stream=streaming,
invoke_from=invoke_from,
extras={},
trace_manager=trace_manager,
)
# new thread with request context
@copy_current_request_context
def worker_with_context():
return self._generate_worker(
flask_app=current_app._get_current_object(), # type: ignore
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
message_id=message.id,
)
worker_thread = threading.Thread(target=worker_with_context)
worker_thread.start()
# return response or stream generator
response = self._handle_response(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=streaming,
)
def _generate_worker(
self,
flask_app: Flask,
application_generate_entity: CompletionAppGenerateEntity,
queue_manager: AppQueueManager,
message_id: str,
):
"""
Generate worker in a new thread.
:param flask_app: Flask app
:param application_generate_entity: application generate entity
:param queue_manager: queue manager
:param message_id: message ID
:return:
"""
with flask_app.app_context():
try:
# get message
message = self._get_message(message_id)
# chatbot app
runner = CompletionAppRunner()
runner.run(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
message=message,
)继续跟进dify/api/core/app/apps/completion/app_runner.py
class CompletionAppRunner(AppRunner):
"""
Completion Application Runner
"""
def run(
self, application_generate_entity: CompletionAppGenerateEntity, queue_manager: AppQueueManager, message: Message
):
"""
Run application
:param application_generate_entity: application generate entity
:param queue_manager: application queue manager
:param message: message
:return:
"""
try:
# process sensitive_word_avoidance
_, inputs, query = self.moderation_for_inputs(
app_id=app_record.id,
tenant_id=app_config.tenant_id,
app_generate_entity=application_generate_entity,
inputs=inputs,
query=query or "",
message_id=message.id,
)
invoke_result = model_instance.invoke_llm(
prompt_messages=prompt_messages,
model_parameters=application_generate_entity.model_conf.parameters,
stop=stop,
stream=application_generate_entity.stream,
user=application_generate_entity.user_id,
)终于到了调用llm的地方dify/api/core/model_manager.py
class ModelInstance:
"""
Model instance class
"""
@overload
def invoke_llm(
self,
prompt_messages: Sequence[PromptMessage],
model_parameters: dict | None = None,
tools: Sequence[PromptMessageTool] | None = None,
stop: list[str] | None = None,
stream: Literal[True] = True,
user: str | None = None,
callbacks: list[Callback] | None = None,
) -> Generator: ...
def invoke_llm(
self,
prompt_messages: Sequence[PromptMessage],
model_parameters: dict | None = None,
tools: Sequence[PromptMessageTool] | None = None,
stop: Sequence[str] | None = None,
stream: bool = True,
user: str | None = None,
callbacks: list[Callback] | None = None,
) -> Union[LLMResult, Generator]:
"""
Invoke large language model
:param prompt_messages: prompt messages
:param model_parameters: model parameters
:param tools: tools for tool calling
:param stop: stop words
:param stream: is stream response
:param user: unique user id
:param callbacks: callbacks
:return: full response or stream response chunk generator result
"""
return cast(
Union[LLMResult, Generator],
self._round_robin_invoke(
function=self.model_type_instance.invoke,
model=self.model,
credentials=self.credentials,
prompt_messages=prompt_messages,
model_parameters=model_parameters,
tools=tools,
stop=stop,
stream=stream,
user=user,
callbacks=callbacks,
),
)逐层跟进下去,发现调用了dify/api/core/model_runtime/model_providers/__base/large_language_model.py
class LargeLanguageModel(AIModel):
"""
Model class for large language model.
"""
model_type: ModelType = ModelType.LLM
# pydantic configs
model_config = ConfigDict(protected_namespaces=())
def invoke(
self,
model: str,
credentials: dict,
prompt_messages: list[PromptMessage],
model_parameters: dict | None = None,
tools: list[PromptMessageTool] | None = None,
stop: list[str] | None = None,
stream: bool = True,
user: str | None = None,
callbacks: list[Callback] | None = None,
) -> Union[LLMResult, Generator[LLMResultChunk, None, None]]:
"""
Invoke large language model
:param model: model name
:param credentials: model credentials
:param prompt_messages: prompt messages
:param model_parameters: model parameters
:param tools: tools for tool calling
:param stop: stop words
:param stream: is stream response
:param user: unique user id
:param callbacks: callbacks
:return: full response or stream response chunk generator result
"""
# validate and filter model parameters
if model_parameters is None:
model_parameters = {}
self.started_at = time.perf_counter()
callbacks = callbacks or []
if dify_config.DEBUG:
callbacks.append(LoggingCallback())
# trigger before invoke callbacks
self._trigger_before_invoke_callbacks(
model=model,
credentials=credentials,
prompt_messages=prompt_messages,
model_parameters=model_parameters,
tools=tools,
stop=stop,
stream=stream,
user=user,
callbacks=callbacks,
)
result: Union[LLMResult, Generator[LLMResultChunk, None, None]]
try:
from core.plugin.impl.model import PluginModelClient
plugin_model_manager = PluginModelClient()
result = plugin_model_manager.invoke_llm(
tenant_id=self.tenant_id,
user_id=user or "unknown",
plugin_id=self.plugin_id,
provider=self.provider_name,
model=model,
credentials=credentials,
model_parameters=model_parameters,
prompt_messages=prompt_messages,
tools=tools,
stop=list(stop) if stop else None,
stream=stream,
)它其实是调用了model插件dify/api/core/plugin/impl/model.py
class PluginModelClient(BasePluginClient):
def invoke_llm(
self,
tenant_id: str,
user_id: str,
plugin_id: str,
provider: str,
model: str,
credentials: dict,
prompt_messages: list[PromptMessage],
model_parameters: dict | None = None,
tools: list[PromptMessageTool] | None = None,
stop: list[str] | None = None,
stream: bool = True,
) -> Generator[LLMResultChunk, None, None]:
"""
Invoke llm
"""
response = self._request_with_plugin_daemon_response_stream(
method="POST",
path=f"plugin/{tenant_id}/dispatch/llm/invoke",
type_=LLMResultChunk,
data=jsonable_encoder(
{
"user_id": user_id,
"data": {
"provider": provider,
"model_type": "llm",
"model": model,
"credentials": credentials,
"prompt_messages": prompt_messages,
"model_parameters": model_parameters,
"tools": tools,
"stop": stop,
"stream": stream,
},
}
),
headers={
"X-Plugin-ID": plugin_id,
"Content-Type": "application/json",
},
)
try:
yield from response 本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!