首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >dify源码分析(1)

dify源码分析(1)

作者头像
golangLeetcode
发布2026-03-18 18:40:07
发布2026-03-18 18:40:07
1100
举报

在本地部署完dify后Dify使用deepseek,我们发现启动了10个容器,我们先从这10个容器的功能开始探索dify的源码

代码语言:javascript
复制
[+] Running 10/10
 ⠿ Container docker-sandbox-1        Started                                                                        16.7s
 ⠿ Container docker-ssrf_proxy-1     Started                                                                        16.9s
 ⠿ Container docker-web-1            Started                                                                        16.7s
 ⠿ Container docker-worker-1         Started                                                                        50.2s
 ⠿ Container docker-plugin_daemon-1  Started                                                                        49.7s
 ⠿ Container docker-api-1            Started                                                                        50.2s
 ⠿ Container docker-nginx-1          Started                                                                        48.1s
 ⠿ Container docker-redis-1          Started                                                                         2.0s
 ⠿ Container docker-db-1             Healthy                                                                        33.8s
 ⠿ Container docker-weaviate-1       Started 

首先是两个安全组件,阅读docker-compose.yaml,发现docker-sandbox-1 使用的镜像是

代码语言:javascript
复制
langgenius/dify-sandbox:0.2.12

用户上传的 Python 脚本、自定义工具等,需在沙盒中隔离运行。

docker-ssrf_proxy-1,SSRF(服务器端请求伪造)代理,用于安全地处理外部网络请求, 对应的配置位于dify/docker/ssrf_proxy/squid.conf.template使用的镜像是:

代码语言:javascript
复制
ubuntu/squid:latest

接下来的几个存储组件,docker-redis-1‌ 缓存和消息队列服务,用于提升性能和任务调度docker-db-1‌数据库服务(通常是PostgreSQL),存储应用数据docker-weaviate-1‌向量数据库,用于存储和检索嵌入向量(embeddings),支持AI应用的语义搜索。

docker-nginx-1 Web服务器,负责反向代理和负载均衡。

剩下的就是dify定义的四个容器:1,docker-web-1‌ Dify的前端Web界面,提供用户操作界面。2,docker-worker-1‌ 后台工作进程,处理异步任务(如模型推理、数据处理)。3,docker-plugin_daemon-1‌ 插件守护进程,管理插件的生命周期和运行。4,docker-api-1‌ Dify的后端API服务,提供RESTful接口。

web对应的源码位于 dify/web/目录,是基于react的next.js框架开发的,我们接触操作的页面就是它实现的。

剩下的后端源码都是基于python实现的,源码位于目录:dify/api/

可以使用如下命令启动服务和worker

代码语言:javascript
复制
uv run flask run --host 0.0.0.0 --port=5001 --debug
代码语言:javascript
复制
uv run celery -A app.celery worker -P threads -c 2 --loglevel INFO -Q dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention

我们从入口文件dify/api/app.py开始阅读,它是基于flask框架提供的http服务

代码语言:javascript
复制
    from app_factory import create_app
    app = create_app()
    celery = app.extensions["celery"]
代码语言:javascript
复制
if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5001)

在dify/api/app_factory.py中初始化了flask框架

代码语言:javascript
复制
def create_app() -> DifyApp:
    start_time = time.perf_counter()
    app = create_flask_app_with_configs()
    initialize_extensions(app)
    end_time = time.perf_counter()
    if dify_config.DEBUG:
        logger.info("Finished create_app (%s ms)", round((end_time - start_time) * 1000, 2))
    return app
代码语言:javascript
复制
def create_flask_app_with_configs() -> DifyApp:
    """
    create a raw flask app
    with configs loaded from .env file
    """
    dify_app = DifyApp(__name__)
代码语言:javascript
复制
from flask import Flask
class DifyApp(Flask):
    pass

至此完成了服务器的初始化。API服务模块采用经典三层架构设计:

• 接口层(controllers/):负责HTTP请求接收与响应处理

• 核心逻辑层(core/):封装系统核心能力,如agent、rag、workflow

• 业务服务层(services/):承上启下的中间层,封装复杂业务流程

在dify/api/controllers/service_api/__init__.py,使用flask_restx定义namespace,然后在这个ns下定义了各种接口

代码语言:javascript
复制
from flask import Blueprint
from flask_restx import Namespace
from libs.external_api import ExternalApi
bp = Blueprint("service_api", __name__, url_prefix="/v1")
api = ExternalApi(
    bp,
    version="1.0",
    title="Service API",
    description="API for application services",
)
service_api_ns = Namespace("service_api", description="Service operations", path="/")
api.add_namespace(service_api_ns)

比如chat接口,定义位于dify/api/controllers/service_api/app/conversation.py

代码语言:javascript
复制
@service_api_ns.route("/conversations")
class ConversationApi(Resource):
         try:
            with Session(db.engine) as session:
                return ConversationService.pagination_by_last_id(
                    session=session,
                    app_model=app_model,
                    user=end_user,
                    last_id=last_id,
                    limit=query_args.limit,
                    invoke_from=InvokeFrom.SERVICE_API,
                    sort_by=query_args.sort_by,
                )

对话补全 dify/api/controllers/service_api/app/completion.py

代码语言:javascript
复制
@service_api_ns.route("/completion-messages")
class CompletionApi(Resource):
    @validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
    def post(self, app_model: App, end_user: EndUser):
        """Create a completion for the given prompt.
        This endpoint generates a completion based on the provided inputs and query.
        Supports both blocking and streaming response modes.
        """
         try:
            response = AppGenerateService.generate(
                app_model=app_model,
                user=end_user,
                args=args,
                invoke_from=InvokeFrom.SERVICE_API,
                streaming=streaming,
            )
            return helper.compact_generate_response(response)

工作流dify/api/controllers/service_api/app/workflow.py

代码语言:javascript
复制
@service_api_ns.route("/workflows/run/<string:workflow_run_id>")
class WorkflowRunDetailApi(Resource):
    @service_api_ns.marshal_with(build_workflow_run_model(service_api_ns))
    def get(self, app_model: App, workflow_run_id: str):
        """Get a workflow task running detail.
        Returns detailed information about a specific workflow run.
        """
         session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
        workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
        workflow_run = workflow_run_repo.get_workflow_run_by_id(
            tenant_id=app_model.tenant_id,
            app_id=app_model.id,
            run_id=workflow_run_id,
        )
        return workflow_run
代码语言:javascript
复制
@service_api_ns.route("/workflows/run")
class WorkflowRunApi(Resource):
    @validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
    def post(self, app_model: App, end_user: EndUser):
        """Execute a workflow.
        Runs a workflow with the provided inputs and returns the results.
        Supports both blocking and streaming response modes.
        """
        
         try:
            response = AppGenerateService.generate(
                app_model=app_model, user=end_user, args=args, invoke_from=InvokeFrom.SERVICE_API, streaming=streaming
            )
            return helper.compact_generate_response(response)

可以看到,都是调用了AppGenerateService.generate来完成答案的生成。对应代码目录在dify/api/services/app_generate_service.py

代码语言:javascript
复制
class AppGenerateService:
    @classmethod
    @trace_span(AppGenerateHandler)
    def generate(
        cls,
        app_model: App,
        user: Union[Account, EndUser],
        args: Mapping[str, Any],
        invoke_from: InvokeFrom,
        streaming: bool = True,
        root_node_id: str | None = None,
    ):
        """
        App Content Generate
        :param app_model: app model
        :param user: user
        :param args: args
        :param invoke_from: invoke from
        :param streaming: streaming
        :return:
        """
         try:
            request_id = rate_limit.enter(request_id)
            if app_model.mode == AppMode.COMPLETION:
                return rate_limit.generate(
                    CompletionAppGenerator.convert_to_event_stream(
                        CompletionAppGenerator().generate(
                            app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
                        ),
                    ),
                    request_id=request_id,
                )
            elif app_model.mode == AppMode.AGENT_CHAT or app_model.is_agent:
                return rate_limit.generate(
                    AgentChatAppGenerator.convert_to_event_stream(
                        AgentChatAppGenerator().generate(
                            app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
                        ),
                    ),
                    request_id,
                )
            elif app_model.mode == AppMode.CHAT:
                return rate_limit.generate(
                    ChatAppGenerator.convert_to_event_stream(
                        ChatAppGenerator().generate(
                            app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
                        ),
                    ),
                    request_id=request_id,
                )
            elif app_model.mode == AppMode.ADVANCED_CHAT:
                workflow_id = args.get("workflow_id")
                workflow = cls._get_workflow(app_model, invoke_from, workflow_id)
                return rate_limit.generate(
                    AdvancedChatAppGenerator.convert_to_event_stream(
                        AdvancedChatAppGenerator().generate(
                            app_model=app_model,
                            workflow=workflow,
                            user=user,
                            args=args,
                            invoke_from=invoke_from,
                            streaming=streaming,
                        ),
                    ),
                    request_id=request_id,
                )
            elif app_model.mode == AppMode.WORKFLOW:
                workflow_id = args.get("workflow_id")
                workflow = cls._get_workflow(app_model, invoke_from, workflow_id)
                return rate_limit.generate(
                    WorkflowAppGenerator.convert_to_event_stream(
                        WorkflowAppGenerator().generate(
                            app_model=app_model,
                            workflow=workflow,
                            user=user,
                            args=args,
                            invoke_from=invoke_from,
                            streaming=streaming,
                            root_node_id=root_node_id,
                            call_depth=0,
                        ),
                    ),
                    request_id,
                )
            else:
                raise ValueError(f"Invalid app mode {app_model.mode}")
        except Exception:
            quota_charge.refund()
            rate_limit.exit(request_id)
            raise
        finally:
            if not streaming:
                rate_limit.exit(request_id)

根据不同的请求类型,将请求分发到core,比如其中completition源码位于learn/go-dify/api/core/app/apps/completion/app_generator.py

代码语言:javascript
复制
class CompletionAppGenerator(MessageBasedAppGenerator):
    @overload
    def generate(
        self,
        app_model: App,
        user: Union[Account, EndUser],
        args: Mapping[str, Any],
        invoke_from: InvokeFrom,
        streaming: Literal[True],
    ) -> Generator[str | Mapping[str, Any], None, None]: ...
    
    def generate(
        self,
        app_model: App,
        user: Union[Account, EndUser],
        args: Mapping[str, Any],
        invoke_from: InvokeFrom,
        streaming: bool = True,
    ) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]:
        """
        Generate App response.
        :param app_model: App
        :param user: account or end user
        :param args: request args
        :param invoke_from: invoke from source
        :param streaming: is stream
        """
        
         # init application generate entity
        application_generate_entity = CompletionAppGenerateEntity(
            task_id=str(uuid.uuid4()),
            app_config=app_config,
            model_conf=ModelConfigConverter.convert(app_config),
            file_upload_config=file_extra_config,
            inputs=self._prepare_user_inputs(
                user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
            ),
            query=query,
            files=list(file_objs),
            user_id=user.id,
            stream=streaming,
            invoke_from=invoke_from,
            extras={},
            trace_manager=trace_manager,
        )
        # new thread with request context
        @copy_current_request_context
        def worker_with_context():
            return self._generate_worker(
                flask_app=current_app._get_current_object(),  # type: ignore
                application_generate_entity=application_generate_entity,
                queue_manager=queue_manager,
                message_id=message.id,
            )
        worker_thread = threading.Thread(target=worker_with_context)
        worker_thread.start()
        # return response or stream generator
        response = self._handle_response(
            application_generate_entity=application_generate_entity,
            queue_manager=queue_manager,
            conversation=conversation,
            message=message,
            user=user,
            stream=streaming,
        )
        
  def _generate_worker(
        self,
        flask_app: Flask,
        application_generate_entity: CompletionAppGenerateEntity,
        queue_manager: AppQueueManager,
        message_id: str,
    ):
        """
        Generate worker in a new thread.
        :param flask_app: Flask app
        :param application_generate_entity: application generate entity
        :param queue_manager: queue manager
        :param message_id: message ID
        :return:
        """
        with flask_app.app_context():
            try:
                # get message
                message = self._get_message(message_id)
                # chatbot app
                runner = CompletionAppRunner()
                runner.run(
                    application_generate_entity=application_generate_entity,
                    queue_manager=queue_manager,
                    message=message,
                )

继续跟进dify/api/core/app/apps/completion/app_runner.py

代码语言:javascript
复制
class CompletionAppRunner(AppRunner):
    """
    Completion Application Runner
    """
    def run(
        self, application_generate_entity: CompletionAppGenerateEntity, queue_manager: AppQueueManager, message: Message
    ):
        """
        Run application
        :param application_generate_entity: application generate entity
        :param queue_manager: application queue manager
        :param message: message
        :return:
        """
        
        try:
            # process sensitive_word_avoidance
            _, inputs, query = self.moderation_for_inputs(
                app_id=app_record.id,
                tenant_id=app_config.tenant_id,
                app_generate_entity=application_generate_entity,
                inputs=inputs,
                query=query or "",
                message_id=message.id,
            )
            
       invoke_result = model_instance.invoke_llm(
            prompt_messages=prompt_messages,
            model_parameters=application_generate_entity.model_conf.parameters,
            stop=stop,
            stream=application_generate_entity.stream,
            user=application_generate_entity.user_id,
        )

终于到了调用llm的地方dify/api/core/model_manager.py

代码语言:javascript
复制
class ModelInstance:
    """
    Model instance class
    """
    
    @overload
    def invoke_llm(
        self,
        prompt_messages: Sequence[PromptMessage],
        model_parameters: dict | None = None,
        tools: Sequence[PromptMessageTool] | None = None,
        stop: list[str] | None = None,
        stream: Literal[True] = True,
        user: str | None = None,
        callbacks: list[Callback] | None = None,
    ) -> Generator: ...
    
    def invoke_llm(
        self,
        prompt_messages: Sequence[PromptMessage],
        model_parameters: dict | None = None,
        tools: Sequence[PromptMessageTool] | None = None,
        stop: Sequence[str] | None = None,
        stream: bool = True,
        user: str | None = None,
        callbacks: list[Callback] | None = None,
    ) -> Union[LLMResult, Generator]:
        """
        Invoke large language model
        :param prompt_messages: prompt messages
        :param model_parameters: model parameters
        :param tools: tools for tool calling
        :param stop: stop words
        :param stream: is stream response
        :param user: unique user id
        :param callbacks: callbacks
        :return: full response or stream response chunk generator result
        """
        
         return cast(
            Union[LLMResult, Generator],
            self._round_robin_invoke(
                function=self.model_type_instance.invoke,
                model=self.model,
                credentials=self.credentials,
                prompt_messages=prompt_messages,
                model_parameters=model_parameters,
                tools=tools,
                stop=stop,
                stream=stream,
                user=user,
                callbacks=callbacks,
            ),
        )

逐层跟进下去,发现调用了dify/api/core/model_runtime/model_providers/__base/large_language_model.py

代码语言:javascript
复制
class LargeLanguageModel(AIModel):
    """
    Model class for large language model.
    """
    model_type: ModelType = ModelType.LLM
    # pydantic configs
    model_config = ConfigDict(protected_namespaces=())
    def invoke(
        self,
        model: str,
        credentials: dict,
        prompt_messages: list[PromptMessage],
        model_parameters: dict | None = None,
        tools: list[PromptMessageTool] | None = None,
        stop: list[str] | None = None,
        stream: bool = True,
        user: str | None = None,
        callbacks: list[Callback] | None = None,
    ) -> Union[LLMResult, Generator[LLMResultChunk, None, None]]:
        """
        Invoke large language model
        :param model: model name
        :param credentials: model credentials
        :param prompt_messages: prompt messages
        :param model_parameters: model parameters
        :param tools: tools for tool calling
        :param stop: stop words
        :param stream: is stream response
        :param user: unique user id
        :param callbacks: callbacks
        :return: full response or stream response chunk generator result
        """
        # validate and filter model parameters
        if model_parameters is None:
            model_parameters = {}
        self.started_at = time.perf_counter()
        callbacks = callbacks or []
        if dify_config.DEBUG:
            callbacks.append(LoggingCallback())
        # trigger before invoke callbacks
        self._trigger_before_invoke_callbacks(
            model=model,
            credentials=credentials,
            prompt_messages=prompt_messages,
            model_parameters=model_parameters,
            tools=tools,
            stop=stop,
            stream=stream,
            user=user,
            callbacks=callbacks,
        )
        result: Union[LLMResult, Generator[LLMResultChunk, None, None]]
        try:
            from core.plugin.impl.model import PluginModelClient
            plugin_model_manager = PluginModelClient()
            result = plugin_model_manager.invoke_llm(
                tenant_id=self.tenant_id,
                user_id=user or "unknown",
                plugin_id=self.plugin_id,
                provider=self.provider_name,
                model=model,
                credentials=credentials,
                model_parameters=model_parameters,
                prompt_messages=prompt_messages,
                tools=tools,
                stop=list(stop) if stop else None,
                stream=stream,
            )

它其实是调用了model插件dify/api/core/plugin/impl/model.py

代码语言:javascript
复制
class PluginModelClient(BasePluginClient):
   def invoke_llm(
        self,
        tenant_id: str,
        user_id: str,
        plugin_id: str,
        provider: str,
        model: str,
        credentials: dict,
        prompt_messages: list[PromptMessage],
        model_parameters: dict | None = None,
        tools: list[PromptMessageTool] | None = None,
        stop: list[str] | None = None,
        stream: bool = True,
    ) -> Generator[LLMResultChunk, None, None]:
        """
        Invoke llm
        """
        response = self._request_with_plugin_daemon_response_stream(
            method="POST",
            path=f"plugin/{tenant_id}/dispatch/llm/invoke",
            type_=LLMResultChunk,
            data=jsonable_encoder(
                {
                    "user_id": user_id,
                    "data": {
                        "provider": provider,
                        "model_type": "llm",
                        "model": model,
                        "credentials": credentials,
                        "prompt_messages": prompt_messages,
                        "model_parameters": model_parameters,
                        "tools": tools,
                        "stop": stop,
                        "stream": stream,
                    },
                }
            ),
            headers={
                "X-Plugin-ID": plugin_id,
                "Content-Type": "application/json",
            },
        )
        try:
            yield from response 
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-01-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档