首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >SaaS企业级CRM源码架构|客户关系管理系统源码完整系统(PHP/Java/Python)

SaaS企业级CRM源码架构|客户关系管理系统源码完整系统(PHP/Java/Python)

原创
作者头像
用户12281965
发布2026-03-17 10:06:04
发布2026-03-17 10:06:04
1230
举报

引言:现代企业级CRM系统的技术演进

在数字化转型浪潮中,客户关系管理系统已成为企业的核心基础设施。从最初简单的联系人管理,到如今涵盖销售自动化、营销管理、客户服务的综合性平台,SaaS模式的企业级CRM系统在架构设计、技术实现上面临着前所未有的挑战。本文将深入探讨多租户SaaS CRM系统的完整架构设计,并提供PHP、Java、Python三种主流技术栈的核心实现方案。

源码及演示:c.xsymz.icu

一、SaaS CRM系统架构设计

1.1 多租户架构设计模式

企业级SaaS CRM系统核心在于多租户架构设计,主要存在三种模式:

1. 独立数据库模式

  • 每个租户拥有独立的数据库实例
  • 数据隔离级别最高,安全性最好
  • 运维成本高,适合大型企业客户

2. 共享数据库,独立Schema模式

  • 所有租户共享同一数据库实例
  • 每个租户拥有独立的Schema
  • 平衡了隔离性与资源利用率

3. 共享数据库,共享Schema模式

  • 所有租户共享数据库和Schema
  • 通过tenant_id字段进行数据隔离
  • 资源利用率最高,但需要严格的数据隔离逻辑

1.2 微服务架构设计

现代SaaS CRM系统通常采用微服务架构,核心服务划分如下:

CRM微服务架构:

├── 租户管理服务 (Tenant Service)

├── 用户认证服务 (Auth Service)

├── 客户管理服务 (Account Service)

├── 销售管道服务 (Sales Pipeline Service)

├── 营销自动化服务 (Marketing Automation Service)

├── 客户服务支持 (Customer Support Service)

├── 报表分析服务 (Analytics Service)

└── 消息通知服务 (Notification Service)

二、核心模块设计与实现

2.1 多租户数据隔离实现(Java + Spring Boot)

代码语言:java
复制
// TenantContext.java - 租户上下文管理
@Component
public class TenantContext {
    private static final ThreadLocal<String> CURRENT_TENANT = new ThreadLocal<>();
    private static final ThreadLocal<User> CURRENT_USER = new ThreadLocal<>();
    
    public static void setCurrentTenant(String tenantId) {
        CURRENT_TENANT.set(tenantId);
    }
    
    public static String getCurrentTenant() {
        return CURRENT_TENANT.get();
    }
    
    public static void clear() {
        CURRENT_TENANT.remove();
        CURRENT_USER.remove();
    }
}

// TenantAwareDataSource.java - 多租户数据源路由
@Component
public class TenantAwareDataSource extends AbstractRoutingDataSource {
    
    @Override
    protected Object determineCurrentLookupKey() {
        return TenantContext.getCurrentTenant();
    }
    
    @Override
    public void afterPropertiesSet() {
        // 动态配置多数据源
        Map<Object, Object> targetDataSources = new HashMap<>();
        
        // 从配置加载所有租户数据源
        List<TenantConfig> tenants = tenantConfigRepository.findAll();
        for (TenantConfig tenant : tenants) {
            DataSource dataSource = buildDataSource(tenant);
            targetDataSources.put(tenant.getTenantId(), dataSource);
        }
        
        setTargetDataSources(targetDataSources);
        setDefaultTargetDataSource(targetDataSources.get("default"));
        super.afterPropertiesSet();
    }
    
    private DataSource buildDataSource(TenantConfig tenant) {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl(tenant.getJdbcUrl());
        config.setUsername(tenant.getDbUsername());
        config.setPassword(tenant.getDbPassword());
        config.setMaximumPoolSize(20);
        config.setMinimumIdle(5);
        config.setConnectionTimeout(30000);
        config.setIdleTimeout(600000);
        config.setMaxLifetime(1800000);
        
        return new HikariDataSource(config);
    }
}

// TenantInterceptor.java - 租户拦截器
@Component
public class TenantInterceptor implements HandlerInterceptor {
    
    @Override
    public boolean preHandle(HttpServletRequest request, 
                           HttpServletResponse response, 
                           Object handler) {
        // 从请求头、子域名或JWT Token中获取租户ID
        String tenantId = extractTenantId(request);
        
        if (tenantId == null) {
            throw new TenantNotFoundException("租户信息未找到");
        }
        
        // 验证租户状态是否有效
        if (!tenantService.isTenantActive(tenantId)) {
            throw new TenantInactiveException("租户已被禁用");
        }
        
        TenantContext.setCurrentTenant(tenantId);
        return true;
    }
    
    @Override
    public void afterCompletion(HttpServletRequest request, 
                              HttpServletResponse response, 
                              Object handler, Exception ex) {
        TenantContext.clear();
    }
    
    private String extractTenantId(HttpServletRequest request) {
        // 1. 从子域名获取
        String host = request.getServerName();
        if (host.contains(".")) {
            String subdomain = host.split("\\.")[0];
            if (!subdomain.equals("www") && !subdomain.equals("app")) {
                return subdomain;
            }
        }
        
        // 2. 从请求头获取
        String headerTenant = request.getHeader("X-Tenant-ID");
        if (StringUtils.hasText(headerTenant)) {
            return headerTenant;
        }
        
        // 3. 从JWT Token获取
        String authHeader = request.getHeader("Authorization");
        if (StringUtils.hasText(authHeader) && authHeader.startsWith("Bearer ")) {
            String token = authHeader.substring(7);
            return jwtUtil.extractTenantId(token);
        }
        
        return null;
    }
}

2.2 客户管理模块(Python + Django)

代码语言:python
复制
# models.py - 客户数据模型
from django.db import models
from django.contrib.auth import get_user_model
from django.core.validators import EmailValidator, RegexValidator
from django.utils import timezone
from tenant_schemas.models import TenantMixin
from tenant_schemas.postgresql_backend.base import _check_schema_name

User = get_user_model()

class Tenant(TenantMixin):
    """租户模型"""
    name = models.CharField(max_length=100, unique=True)
    schema_name = models.CharField(max_length=63, unique=True, 
                                   validators=[_check_schema_name])
    created_on = models.DateField(auto_now_add=True)
    is_active = models.BooleanField(default=True)
    trial_ends = models.DateTimeField(null=True, blank=True)
    plan_type = models.CharField(max_length=20, 
                                choices=[
                                    ('free', '免费版'),
                                    ('basic', '基础版'),
                                    ('professional', '专业版'),
                                    ('enterprise', '企业版')
                                ],
                                default='free')
    
    auto_create_schema = True
    
    class Meta:
        verbose_name = "租户"
        verbose_name_plural = "租户"

class Account(models.Model):
    """客户/账户模型"""
    ACCOUNT_TYPES = (
        ('customer', '客户'),
        ('partner', '合作伙伴'),
        ('competitor', '竞争对手'),
        ('prospect', '潜在客户'),
    )
    
    INDUSTRIES = (
        ('technology', '科技'),
        ('finance', '金融'),
        ('healthcare', '医疗健康'),
        ('manufacturing', '制造业'),
        ('retail', '零售'),
        ('education', '教育'),
        ('other', '其他'),
    )
    
    tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE, 
                               related_name='accounts')
    name = models.CharField(max_length=255, verbose_name="客户名称")
    account_type = models.CharField(max_length=20, choices=ACCOUNT_TYPES, 
                                   default='customer')
    industry = models.CharField(max_length=50, choices=INDUSTRIES, 
                               null=True, blank=True)
    website = models.URLField(max_length=255, null=True, blank=True)
    phone = models.CharField(max_length=50, null=True, blank=True)
    email = models.EmailField(null=True, blank=True)
    
    # 地址信息
    billing_street = models.TextField(null=True, blank=True, verbose_name="账单街道")
    billing_city = models.CharField(max_length=100, null=True, blank=True, verbose_name="账单城市")
    billing_state = models.CharField(max_length=100, null=True, blank=True, verbose_name="账单省/州")
    billing_postal_code = models.CharField(max_length=20, null=True, blank=True, verbose_name="账单邮编")
    billing_country = models.CharField(max_length=100, null=True, blank=True, verbose_name="账单国家")
    
    shipping_street = models.TextField(null=True, blank=True, verbose_name="收货街道")
    shipping_city = models.CharField(max_length=100, null=True, blank=True, verbose_name="收货城市")
    shipping_state = models.CharField(max_length=100, null=True, blank=True, verbose_name="收货省/州")
    shipping_postal_code = models.CharField(max_length=20, null=True, blank=True, verbose_name="收货邮编")
    shipping_country = models.CharField(max_length=100, null=True, blank=True, verbose_name="收货国家")
    
    # 关系字段
    owner = models.ForeignKey(User, on_delete=models.SET_NULL, 
                             null=True, related_name='owned_accounts')
    parent_account = models.ForeignKey('self', on_delete=models.SET_NULL, 
                                      null=True, blank=True, 
                                      related_name='child_accounts')
    
    # 描述和标签
    description = models.TextField(null=True, blank=True)
    tags = models.ManyToManyField('Tag', blank=True, related_name='accounts')
    
    # 元数据
    annual_revenue = models.DecimalField(max_digits=15, decimal_places=2, 
                                        null=True, blank=True)
    employee_count = models.IntegerField(null=True, blank=True)
    rating = models.DecimalField(max_digits=3, decimal_places=2, default=0.0)
    
    # 系统字段
    created_by = models.ForeignKey(User, on_delete=models.SET_NULL, 
                                  null=True, related_name='created_accounts')
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    is_active = models.BooleanField(default=True)
    
    class Meta:
        verbose_name = "客户"
        verbose_name_plural = "客户"
        unique_together = ['tenant', 'name']
        indexes = [
            models.Index(fields=['tenant', 'name']),
            models.Index(fields=['tenant', 'account_type']),
            models.Index(fields=['tenant', 'industry']),
            models.Index(fields=['tenant', 'created_at']),
        ]
    
    def __str__(self):
        return f"{self.name} ({self.get_account_type_display()})"
    
    def get_primary_contact(self):
        """获取主要联系人"""
        return self.contacts.filter(is_primary=True).first()
    
    def get_open_opportunities(self):
        """获取进行中的商机"""
        from sales.models import Opportunity
        return Opportunity.objects.filter(
            tenant=self.tenant,
            account=self,
            stage__in=['qualification', 'proposal', 'negotiation', 'closed_won']
        )
    
    def get_total_revenue(self):
        """计算客户总营收"""
        from sales.models import Opportunity
        won_opportunities = Opportunity.objects.filter(
            tenant=self.tenant,
            account=self,
            stage='closed_won',
            close_date__isnull=False
        )
        return won_opportunities.aggregate(
            total=models.Sum('amount')
        )['total'] or 0

# serializers.py - 客户序列化器
from rest_framework import serializers
from .models import Account, Contact
from django.db import transaction

class AccountSerializer(serializers.ModelSerializer):
    """客户序列化器"""
    owner_name = serializers.CharField(source='owner.get_full_name', 
                                      read_only=True)
    contact_count = serializers.SerializerMethodField()
    opportunity_count = serializers.SerializerMethodField()
    total_revenue = serializers.SerializerMethodField()
    
    class Meta:
        model = Account
        fields = [
            'id', 'name', 'account_type', 'industry', 'website', 
            'phone', 'email', 'billing_street', 'billing_city',
            'billing_state', 'billing_postal_code', 'billing_country',
            'shipping_street', 'shipping_city', 'shipping_state',
            'shipping_postal_code', 'shipping_country', 'owner',
            'owner_name', 'description', 'annual_revenue',
            'employee_count', 'rating', 'contact_count',
            'opportunity_count', 'total_revenue', 'created_at',
            'updated_at', 'is_active'
        ]
        read_only_fields = ['created_at', 'updated_at']
    
    def get_contact_count(self, obj):
        return obj.contacts.count()
    
    def get_opportunity_count(self, obj):
        return obj.opportunities.count()
    
    def get_total_revenue(self, obj):
        return obj.get_total_revenue()
    
    def create(self, validated_data):
        """创建客户记录"""
        request = self.context.get('request')
        tenant = get_current_tenant()
        
        with transaction.atomic():
            # 设置租户和创建者
            validated_data['tenant'] = tenant
            validated_data['created_by'] = request.user if request else None
            
            # 处理标签
            tags_data = validated_data.pop('tags', [])
            
            account = Account.objects.create(**validated_data)
            
            # 添加标签
            if tags_data:
                account.tags.set(tags_data)
            
            # 记录操作日志
            ActivityLog.objects.create(
                tenant=tenant,
                user=request.user,
                action_type='account_created',
                object_type='account',
                object_id=account.id,
                description=f'创建客户: {account.name}',
                ip_address=get_client_ip(request)
            )
            
            return account

# views.py - 客户视图
from rest_framework import viewsets, filters, status
from rest_framework.decorators import action
from rest_framework.response import Response
from django_filters.rest_framework import DjangoFilterBackend
from .models import Account, Contact
from .serializers import AccountSerializer, ContactSerializer
from .filters import AccountFilter
from .permissions import AccountPermissions

class AccountViewSet(viewsets.ModelViewSet):
    """客户视图集"""
    queryset = Account.objects.all()
    serializer_class = AccountSerializer
    permission_classes = [AccountPermissions]
    filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
    filterset_class = AccountFilter
    search_fields = ['name', 'email', 'phone', 'description']
    ordering_fields = ['name', 'created_at', 'annual_revenue', 'rating']
    ordering = ['-created_at']
    
    def get_queryset(self):
        """按租户过滤查询集"""
        tenant = get_current_tenant()
        queryset = super().get_queryset().filter(
            tenant=tenant, 
            is_active=True
        )
        
        # 添加预取相关对象
        queryset = queryset.select_related('owner', 'parent_account')
        queryset = queryset.prefetch_related('tags', 'contacts')
        
        return queryset
    
    @action(detail=True, methods=['get'])
    def contacts(self, request, pk=None):
        """获取客户的所有联系人"""
        account = self.get_object()
        contacts = account.contacts.filter(is_active=True)
        page = self.paginate_queryset(contacts)
        
        if page is not None:
            serializer = ContactSerializer(page, many=True)
            return self.get_paginated_response(serializer.data)
        
        serializer = ContactSerializer(contacts, many=True)
        return Response(serializer.data)
    
    @action(detail=True, methods=['get'])
    def opportunities(self, request, pk=None):
        """获取客户的所有商机"""
        from sales.serializers import OpportunitySerializer
        account = self.get_object()
        opportunities = account.opportunities.all()
        page = self.paginate_queryset(opportunities)
        
        if page is not None:
            serializer = OpportunitySerializer(page, many=True)
            return self.get_paginated_response(serializer.data)
        
        serializer = OpportunitySerializer(opportunities, many=True)
        return Response(serializer.data)
    
    @action(detail=True, methods=['post'])
    def convert_to_lead(self, request, pk=None):
        """将客户转换为销售线索"""
        account = self.get_object()
        
        from leads.models import Lead
        from leads.serializers import LeadSerializer
        
        with transaction.atomic():
            # 创建销售线索
            lead = Lead.objects.create(
                tenant=account.tenant,
                first_name=account.name,
                company=account.name,
                email=account.email,
                phone=account.phone,
                source='account_conversion',
                status='new',
                assigned_to=account.owner,
                account=account
            )
            
            # 记录活动日志
            ActivityLog.objects.create(
                tenant=account.tenant,
                user=request.user,
                action_type='account_converted_to_lead',
                object_type='account',
                object_id=account.id,
                description=f'客户转换为销售线索: {account.name}',
                metadata={
                    'lead_id': lead.id,
                    'account_id': account.id
                }
            )
        
        serializer = LeadSerializer(lead)
        return Response(serializer.data, status=status.HTTP_201_CREATED)
    
    @action(detail=False, methods=['get'])
    def dashboard_stats(self, request):
        """获取客户仪表板统计"""
        tenant = get_current_tenant()
        
        from django.db.models import Count, Sum, Avg
        from django.utils import timezone
        from datetime import timedelta
        
        # 计算时间范围
        thirty_days_ago = timezone.now() - timedelta(days=30)
        
        stats = {
            'total_accounts': self.get_queryset().count(),
            'total_contacts': Contact.objects.filter(
                tenant=tenant, is_active=True
            ).count(),
            'new_accounts_last_30_days': self.get_queryset().filter(
                created_at__gte=thirty_days_ago
            ).count(),
            'accounts_by_type': dict(
                self.get_queryset().values_list('account_type')
                .annotate(count=Count('id'))
                .values_list('account_type', 'count')
            ),
            'accounts_by_industry': dict(
                self.get_queryset().exclude(industry__isnull=True)
                .values_list('industry')
                .annotate(count=Count('id'))
                .values_list('industry', 'count')
            ),
            'average_rating': self.get_queryset().aggregate(
                avg_rating=Avg('rating')
            )['avg_rating'] or 0,
            'top_accounts_by_revenue': list(
                self.get_queryset().annotate(
                    revenue=Sum('opportunities__amount')
                ).exclude(revenue__isnull=True)
                .order_by('-revenue')[:10]
                .values('id', 'name', 'revenue')
            )
        }
        
        return Response(stats)

2.3 销售管道管理(PHP + Laravel)

代码语言:php
复制
<?php
// app/Models/SalesPipeline.php
namespace App\Models;

use Illuminate\Database\Eloquent\Model;
use Illuminate\Database\Eloquent\SoftDeletes;
use App\Traits\TenantScoped;

class SalesPipeline extends Model
{
    use SoftDeletes, TenantScoped;
    
    protected $fillable = [
        'tenant_id',
        'name',
        'description',
        'stages',
        'is_default',
        'is_active',
        'created_by',
        'updated_by'
    ];
    
    protected $casts = [
        'stages' => 'array',
        'is_default' => 'boolean',
        'is_active' => 'boolean',
        'created_at' => 'datetime',
        'updated_at' => 'datetime'
    ];
    
    protected $appends = ['stage_count', 'opportunity_count'];
    
    /**
     * 获取阶段数量
     */
    public function getStageCountAttribute()
    {
        return count($this->stages ?? []);
    }
    
    /**
     * 获取商机数量
     */
    public function getOpportunityCountAttribute()
    {
        return $this->opportunities()->count();
    }
    
    /**
     * 关联商机
     */
    public function opportunities()
    {
        return $this->hasMany(Opportunity::class, 'pipeline_id');
    }
    
    /**
     * 获取管道阶段配置
     */
    public function getPipelineStages(): array
    {
        $defaultStages = [
            [
                'id' => 'lead',
                'name' => '线索',
                'probability' => 10,
                'color' => '#FF6B6B',
                'order' => 1
            ],
            [
                'id' => 'qualification',
                'name' => '资质审查',
                'probability' => 20,
                'color' => '#4ECDC4',
                'order' => 2
            ],
            [
                'id' => 'proposal',
                'name' => '提案',
                'probability' => 50,
                'color' => '#45B7D1',
                'order' => 3
            ],
            [
                'id' => 'negotiation',
                'name' => '谈判',
                'probability' => 80,
                'color' => '#96CEB4',
                'order' => 4
            ],
            [
                'id' => 'closed_won',
                'name' => '已赢单',
                'probability' => 100,
                'color' => '#FECA57',
                'order' => 5
            ],
            [
                'id' => 'closed_lost',
                'name' => '已丢单',
                'probability' => 0,
                'color' => '#FF9FF3',
                'order' => 6
            ]
        ];
        
        return $this->stages ?: $defaultStages;
    }
    
    /**
     * 获取下一个阶段
     */
    public function getNextStage(string $currentStageId): ?array
    {
        $stages = $this->getPipelineStages();
        $currentIndex = array_search($currentStageId, array_column($stages, 'id'));
        
        if ($currentIndex !== false && isset($stages[$currentIndex + 1])) {
            return $stages[$currentIndex + 1];
        }
        
        return null;
    }
    
    /**
     * 获取管道统计数据
     */
    public function getPipelineStats(): array
    {
        $stages = $this->getPipelineStages();
        $stats = [];
        
        foreach ($stages as $stage) {
            $count = $this->opportunities()
                ->where('stage', $stage['id'])
                ->count();
            
            $totalAmount = $this->opportunities()
                ->where('stage', $stage['id'])
                ->sum('amount');
            
            $stats[] = [
                'stage' => $stage,
                'count' => $count,
                'total_amount' => $totalAmount,
                'weighted_amount' => $totalAmount * ($stage['probability'] / 100)
            ];
        }
        
        return $stats;
    }
}

// app/Models/Opportunity.php
namespace App\Models;

use Illuminate\Database\Eloquent\Model;
use Illuminate\Database\Eloquent\SoftDeletes;
use App\Traits\TenantScoped;

class Opportunity extends Model
{
    use SoftDeletes, TenantScoped;
    
    protected $fillable = [
        'tenant_id',
        'name',
        'account_id',
        'contact_id',
        'pipeline_id',
        'stage',
        'amount',
        'currency',
        'probability',
        'close_date',
        'description',
        'source',
        'owner_id',
        'lost_reason',
        'won_reason',
        'is_closed',
        'closed_at',
        'created_by',
        'updated_by'
    ];
    
    protected $casts = [
        'amount' => 'decimal:2',
        'probability' => 'integer',
        'close_date' => 'date',
        'is_closed' => 'boolean',
        'closed_at' => 'datetime',
        'created_at' => 'datetime',
        'updated_at' => 'datetime'
    ];
    
    protected $dates = ['close_date', 'closed_at'];
    
    protected $appends = ['expected_amount', 'days_in_stage'];
    
    /**
     * 获取预期金额
     */
    public function getExpectedAmountAttribute()
    {
        return $this->amount * ($this->probability / 100);
    }
    
    /**
     * 获取在当前阶段的停留天数
     */
    public function getDaysInStageAttribute()
    {
        if (!$this->stage_updated_at) {
            return null;
        }
        
        return $this->stage_updated_at->diffInDays(now());
    }
    
    /**
     * 关联客户
     */
    public function account()
    {
        return $this->belongsTo(Account::class);
    }
    
    /**
     * 关联联系人
     */
    public function contact()
    {
        return $this->belongsTo(Contact::class);
    }
    
    /**
     * 关联销售管道
     */
    public function pipeline()
    {
        return $this->belongsTo(SalesPipeline::class);
    }
    
    /**
     * 关联负责人
     */
    public function owner()
    {
        return $this->belongsTo(User::class, 'owner_id');
    }
    
    /**
     * 关联活动记录
     */
    public function activities()
    {
        return $this->morphMany(Activity::class, 'activityable');
    }
    
    /**
     * 关联产品项
     */
    public function products()
    {
        return $this->hasMany(OpportunityProduct::class);
    }
    
    /**
     * 更新商机阶段
     */
    public function updateStage(string $newStage, ?string $reason = null): bool
    {
        $oldStage = $this->stage;
        
        $this->stage = $newStage;
        $this->stage_updated_at = now();
        
        // 更新概率
        $stageConfig = collect($this->pipeline->getPipelineStages())
            ->where('id', $newStage)
            ->first();
            
        if ($stageConfig) {
            $this->probability = $stageConfig['probability'];
        }
        
        // 检查是否关闭
        if (in_array($newStage, ['closed_won', 'closed_lost'])) {
            $this->is_closed = true;
            $this->closed_at = now();
            
            if ($newStage === 'closed_won') {
                $this->won_reason = $reason;
            } else {
                $this->lost_reason = $reason;
            }
        }
        
        $saved = $this->save();
        
        if ($saved) {
            // 记录阶段变更历史
            StageHistory::create([
                'tenant_id' => $this->tenant_id,
                'opportunity_id' => $this->id,
                'old_stage' => $oldStage,
                'new_stage' => $newStage,
                'changed_by' => auth()->id(),
                'change_reason' => $reason
            ]);
            
            // 触发阶段变更事件
            event(new OpportunityStageChanged($this, $oldStage, $newStage));
        }
        
        return $saved;
    }
    
    /**
     * 计算销售预测
     */
    public static function getSalesForecast(int $tenantId, string $period = 'quarter'): array
    {
        $query = self::where('tenant_id', $tenantId)
            ->where('is_closed', false)
            ->whereNotNull('close_date');
        
        // 按周期过滤
        if ($period === 'quarter') {
            $query->whereBetween('close_date', [
                now()->startOfQuarter(),
                now()->endOfQuarter()
            ]);
        } elseif ($period === 'month') {
            $query->whereBetween('close_date', [
                now()->startOfMonth(),
                now()->endOfMonth()
            ]);
        } elseif ($period === 'year') {
            $query->whereBetween('close_date', [
                now()->startOfYear(),
                now()->endOfYear()
            ]);
        }
        
        $opportunities = $query->get();
        
        $totalAmount = $opportunities->sum('amount');
        $expectedAmount = $opportunities->sum('expected_amount');
        
        $stages = $opportunities->groupBy('stage')->map(function ($items, $stage) {
            return [
                'count' => $items->count(),
                'total_amount' => $items->sum('amount'),
                'expected_amount' => $items->sum('expected_amount')
            ];
        });
        
        $owners = $opportunities->groupBy('owner_id')->map(function ($items) {
            $owner = $items->first()->owner;
            return [
                'owner' => $owner ? $owner->name : '未分配',
                'count' => $items->count(),
                'total_amount' => $items->sum('amount'),
                'expected_amount' => $items->sum('expected_amount')
            ];
        });
        
        return [
            'period' => $period,
            'total_opportunities' => $opportunities->count(),
            'total_amount' => $totalAmount,
            'expected_amount' => $expectedAmount,
            'by_stage' => $stages,
            'by_owner' => $owners,
            'pipeline_health' => $this->calculatePipelineHealth($opportunities)
        ];
    }
}

2.4 营销自动化服务(Python + Celery + Redis)

代码语言:python
复制
# marketing/tasks.py - 营销自动化任务
from celery import shared_task
from datetime import datetime, timedelta
from django.db import transaction
from django.db.models import Q, Count
from django.core.mail import send_mail
from django.template.loader import render_to_string
from django.utils import timezone
import logging
import json

logger = logging.getLogger(__name__)

@shared_task(bind=True, max_retries=3)
def execute_campaign_workflow(self, campaign_id, contact_ids=None):
    """执行营销活动工作流"""
    from .models import Campaign, Contact, CampaignLog
    
    try:
        campaign = Campaign.objects.get(id=campaign_id)
        tenant = campaign.tenant
        
        logger.info(f"开始执行营销活动: {campaign.name} (租户: {tenant.id})")
        
        # 获取目标联系人
        if contact_ids:
            contacts = Contact.objects.filter(
                tenant=tenant,
                id__in=contact_ids,
                is_active=True,
                unsubscribed=False
            )
        else:
            contacts = campaign.get_target_contacts()
        
        total_contacts = contacts.count()
        logger.info(f"目标联系人数量: {total_contacts}")
        
        if total_contacts == 0:
            logger.warning("没有找到目标联系人")
            return {"status": "completed", "message": "No target contacts"}
        
        # 执行工作流
        workflow = campaign.workflow
        executed_count = 0
        error_count = 0
        
        for contact in contacts:
            try:
                with transaction.atomic():
                    # 创建工作流执行记录
                    execution = CampaignLog.objects.create(
                        tenant=tenant,
                        campaign=campaign,
                        contact=contact,
                        status='pending',
                        workflow_data=workflow
                    )
                    
                    # 执行工作流步骤
                    execute_workflow_steps(execution, workflow, contact)
                    
                    execution.status = 'completed'
                    execution.completed_at = timezone.now()
                    execution.save()
                    
                    executed_count += 1
                    
            except Exception as e:
                logger.error(f"执行营销活动失败 (联系人: {contact.id}): {str(e)}")
                error_count += 1
                
                # 记录失败日志
                CampaignLog.objects.create(
                    tenant=tenant,
                    campaign=campaign,
                    contact=contact,
                    status='failed',
                    error_message=str(e)
                )
        
        # 更新活动状态
        campaign.last_executed = timezone.now()
        campaign.execution_count += 1
        campaign.save()
        
        logger.info(f"营销活动执行完成: 成功 {executed_count}, 失败 {error_count}")
        
        return {
            "status": "completed",
            "executed": executed_count,
            "failed": error_count,
            "total": total_contacts
        }
        
    except Exception as e:
        logger.error(f"执行营销活动失败: {str(e)}")
        raise self.retry(exc=e, countdown=60)

def execute_workflow_steps(execution, workflow, contact):
    """执行工作流步骤"""
    steps = workflow.get('steps', [])
    execution_data = execution.execution_data or {}
    
    for step_index, step in enumerate(steps):
        try:
            step_type = step.get('type')
            step_config = step.get('config', {})
            
            # 记录步骤开始
            execution_data[f'step_{step_index}'] = {
                'type': step_type,
                'started_at': timezone.now().isoformat(),
                'status': 'processing'
            }
            execution.execution_data = execution_data
            execution.save()
            
            # 执行步骤
            if step_type == 'send_email':
                send_workflow_email(contact, step_config)
            elif step_type == 'wait':
                schedule_wait_step(execution.id, step_index, step_config)
            elif step_type == 'condition':
                handle_condition_step(contact, step_config, workflow, step_index)
            elif step_type == 'update_contact':
                update_contact_fields(contact, step_config)
            elif step_type == 'add_to_list':
                add_contact_to_list(contact, step_config)
            elif step_type == 'webhook':
                trigger_webhook(contact, step_config)
            
            # 记录步骤完成
            execution_data[f'step_{step_index}']['status'] = 'completed'
            execution_data[f'step_{step_index}']['completed_at'] = timezone.now().isoformat()
            
        except Exception as e:
            logger.error(f"工作流步骤执行失败: {str(e)}")
            execution_data[f'step_{step_index}']['status'] = 'failed'
            execution_data[f'step_{step_index}']['error'] = str(e)
            raise
    
    execution.execution_data = execution_data
    execution.save()

@shared_task
def send_workflow_email(contact, config):
    """发送工作流邮件"""
    from django.conf import settings
    
    template_id = config.get('template_id')
    subject = config.get('subject', '')
    from_email = config.get('from_email', settings.DEFAULT_FROM_EMAIL)
    
    # 获取邮件模板
    if template_id:
        from .models import EmailTemplate
        try:
            template = EmailTemplate.objects.get(id=template_id, tenant=contact.tenant)
            subject = template.subject
            html_content = template.content
        except EmailTemplate.DoesNotExist:
            html_content = config.get('content', '')
    else:
        html_content = config.get('content', '')
    
    # 个性化内容
    personalization_vars = {
        '{{contact.first_name}}': contact.first_name or '',
        '{{contact.last_name}}': contact.last_name or '',
        '{{contact.email}}': contact.email or '',
        '{{contact.company}}': contact.account.name if contact.account else '',
        '{{unsubscribe_url}}': f"{settings.SITE_URL}/unsubscribe/{contact.unsubscribe_token}"
    }
    
    for var, value in personalization_vars.items():
        html_content = html_content.replace(var, value)
        subject = subject.replace(var, value)
    
    # 发送邮件
    send_mail(
        subject=subject,
        message='',  # 纯文本版本
        html_message=html_content,
        from_email=from_email,
        recipient_list=[contact.email],
        fail_silently=False
    )
    
    # 记录邮件发送
    from .models import EmailLog
    EmailLog.objects.create(
        tenant=contact.tenant,
        contact=contact,
        campaign=config.get('campaign'),
        subject=subject,
        status='sent',
        sent_at=timezone.now()
    )

# marketing/models.py - 营销自动化模型
from django.db import models
from django.contrib.postgres.fields import JSONField, ArrayField
from django.utils import timezone
import uuid

class Campaign(models.Model):
    """营销活动模型"""
    CAMPAIGN_TYPES = (
        ('email', '邮件营销'),
        ('sms', '短信营销'),
        ('social', '社交媒体'),
        ('event', '活动营销'),
        ('other', '其他')
    )
    
    CAMPAIGN_STATUS = (
        ('draft', '草稿'),
        ('active', '进行中'),
        ('paused', '已暂停'),
        ('completed', '已完成'),
        ('archived', '已归档')
    )
    
    tenant = models.ForeignKey('tenants.Tenant', on_delete=models.CASCADE)
    name = models.CharField(max_length=255, verbose_name="活动名称")
    campaign_type = models.CharField(max_length=20, choices=CAMPAIGN_TYPES, default='email')
    status = models.CharField(max_length=20, choices=CAMPAIGN_STATUS, default='draft')
    
    # 目标受众
    target_list = models.ForeignKey('ContactList', on_delete=models.SET_NULL, 
                                   null=True, blank=True)
    target_segment = JSONField(default=dict, blank=True, 
                              verbose_name="目标受众细分条件")
    
    # 工作流配置
    workflow = JSONField(default=dict, verbose_name="工作流配置")
    trigger_type = models.CharField(max_length=50, default='manual',
                                   choices=[
                                       ('manual', '手动触发'),
                                       ('scheduled', '计划任务'),
                                       ('event', '事件触发'),
                                       ('api', 'API触发')
                                   ])
    trigger_config = JSONField(default=dict, blank=True, verbose_name="触发器配置")
    
    # 统计信息
    total_contacts = models.IntegerField(default=0, verbose_name="总联系人")
    sent_count = models.IntegerField(default=0, verbose_name="已发送")
    opened_count = models.IntegerField(default=0, verbose_name="已打开")
    clicked_count = models.IntegerField(default=0, verbose_name="已点击")
    converted_count = models.IntegerField(default=0, verbose_name="已转化")
    bounce_count = models.IntegerField(default=0, verbose_name="退回数")
    
    # 时间信息
    scheduled_for = models.DateTimeField(null=True, blank=True, verbose_name="计划时间")
    started_at = models.DateTimeField(null=True, blank=True, verbose_name="开始时间")
    completed_at = models.DateTimeField(null=True, blank=True, verbose_name="完成时间")
    last_executed = models.DateTimeField(null=True, blank=True, verbose_name="最后执行时间")
    execution_count = models.IntegerField(default=0, verbose_name="执行次数")
    
    # 元数据
    created_by = models.ForeignKey('users.User', on_delete=models.SET_NULL,
                                  null=True, related_name='created_campaigns')
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    is_active = models.BooleanField(default=True)
    
    class Meta:
        verbose_name = "营销活动"
        verbose_name_plural = "营销活动"
        indexes = [
            models.Index(fields=['tenant', 'status']),
            models.Index(fields=['tenant', 'campaign_type']),
            models.Index(fields=['tenant', 'scheduled_for']),
            models.Index(fields=['tenant', 'created_at']),
        ]
        ordering = ['-created_at']
    
    def __str__(self):
        return f"{self.name} ({self.get_campaign_type_display()})"
    
    def get_target_contacts(self):
        """获取目标联系人"""
        from contacts.models import Contact
        
        queryset = Contact.objects.filter(
            tenant=self.tenant,
            is_active=True,
            unsubscribed=False
        )
        
        # 如果指定了联系人列表
        if self.target_list:
            queryset = queryset.filter(lists=self.target_list)
        
        # 应用细分条件
        if self.target_segment:
            queryset = self.apply_segment_filters(queryset)
        
        return queryset
    
    def apply_segment_filters(self, queryset):
        """应用细分过滤器"""
        segment = self.target_segment
        
        if segment.get('conditions'):
            conditions = segment['conditions']
            
            for condition in conditions:
                field = condition.get('field')
                operator = condition.get('operator')
                value = condition.get('value')
                
                if not all([field, operator]):
                    continue
                
                # 构建查询条件
                query = self.build_condition_query(field, operator, value)
                if query:
                    queryset = queryset.filter(query)
        
        return queryset
    
    def build_condition_query(self, field, operator, value):
        """构建条件查询"""
        from django.db.models import Q
        
        field_mapping = {
            'first_name': 'first_name',
            'last_name': 'last_name',
            'email': 'email',
            'company': 'account__name',
            'industry': 'account__industry',
            'country': 'account__billing_country',
            'created_at': 'created_at',
            'last_activity': 'last_activity_at'
        }
        
        db_field = field_mapping.get(field)
        if not db_field:
            return None
        
        if operator == 'equals':
            return Q(**{f"{db_field}__iexact": value})
        elif operator == 'not_equals':
            return ~Q(**{f"{db_field}__iexact": value})
        elif operator == 'contains':
            return Q(**{f"{db_field}__icontains": value})
        elif operator == 'not_contains':
            return ~Q(**{f"{db_field}__icontains": value})
        elif operator == 'starts_with':
            return Q(**{f"{db_field}__istartswith": value})
        elif operator == 'ends_with':
            return Q(**{f"{db_field}__iendswith": value})
        elif operator == 'greater_than':
            return Q(**{f"{db_field}__gt": value})
        elif operator == 'less_than':
            return Q(**{f"{db_field}__lt": value})
        elif operator == 'is_empty':
            return Q(**{f"{db_field}__isnull": True}) | Q(**{f"{db_field}": ''})
        elif operator == 'is_not_empty':
            return ~Q(**{f"{db_field}__isnull": True}) & ~Q(**{f"{db_field}": ''})
        
        return None
    
    def calculate_metrics(self):
        """计算活动指标"""
        if self.total_contacts == 0:
            return {}
        
        return {
            'open_rate': round((self.opened_count / self.total_contacts) * 100, 2) 
                       if self.total_contacts > 0 else 0,
            'click_rate': round((self.clicked_count / self.total_contacts) * 100, 2) 
                        if self.total_contacts > 0 else 0,
            'conversion_rate': round((self.converted_count / self.total_contacts) * 100, 2) 
                             if self.total_contacts > 0 else 0,
            'bounce_rate': round((self.bounce_count / self.total_contacts) * 100, 2) 
                         if self.total_contacts > 0 else 0,
            'delivery_rate': round(((self.total_contacts - self.bounce_count) / self.total_contacts) * 100, 2) 
                           if self.total_contacts > 0 else 0
        }
    
    def start_campaign(self):
        """启动营销活动"""
        if self.status != 'draft':
            raise ValueError("只有草稿状态的活动可以启动")
        
        self.status = 'active'
        self.started_at = timezone.now()
        self.save()
        
        # 异步执行活动
        execute_campaign_workflow.delay(self.id)
        
        return True

三、系统部署与运维

3.1 Docker容器化部署

代码语言:yaml
复制
# docker-compose.yml
version: '3.8'

services:
  # 主应用服务
  crm-api:
    build:
      context: .
      dockerfile: Dockerfile.api
    container_name: crm-api
    restart: unless-stopped
    env_file:
      - .env
    environment:
      - DB_HOST=postgres
      - REDIS_HOST=redis
      - CELERY_BROKER_URL=redis://redis:6379/0
    ports:
      - "8000:8000"
    depends_on:
      - postgres
      - redis
    volumes:
      - ./media:/app/media
      - ./logs:/app/logs
    networks:
      - crm-network
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s

  # Celery Worker
  celery-worker:
    build:
      context: .
      dockerfile: Dockerfile.worker
    container_name: crm-celery-worker
    restart: unless-stopped
    env_file:
      - .env
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    depends_on:
      - redis
      - postgres
    volumes:
      - ./media:/app/media
      - ./logs:/app/logs
    command: celery -A config.celery worker --loglevel=info --concurrency=4
    networks:
      - crm-network

  # Celery Beat
  celery-beat:
    build:
      context: .
      dockerfile: Dockerfile.beat
    container_name: crm-celery-beat
    restart: unless-stopped
    env_file:
      - .env
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    depends_on:
      - redis
      - postgres
    volumes:
      - ./media:/app/media
      - ./logs:/app/logs
    command: celery -A config.celery beat --loglevel=info
    networks:
      - crm-network

  # PostgreSQL数据库
  postgres:
    image: postgres:14-alpine
    container_name: crm-postgres
    restart: unless-stopped
    environment:
      - POSTGRES_DB=${DB_NAME}
      - POSTGRES_USER=${DB_USER}
      - POSTGRES_PASSWORD=${DB_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./init-scripts:/docker-entrypoint-initdb.d
    ports:
      - "5432:5432"
    networks:
      - crm-network
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U ${DB_USER}"]
      interval: 10s
      timeout: 5s
      retries: 5

  # Redis缓存
  redis:
    image: redis:7-alpine
    container_name: crm-redis
    restart: unless-stopped
    command: redis-server --requirepass ${REDIS_PASSWORD}
    volumes:
      - redis_data:/data
    ports:
      - "6379:6379"
    networks:
      - crm-network
    healthcheck:
      test: ["CMD", "redis-cli", "--raw", "incr", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  # Nginx反向代理
  nginx:
    image: nginx:1.23-alpine
    container_name: crm-nginx
    restart: unless-stopped
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx/nginx.conf:/etc/nginx/nginx.conf
      - ./nginx/conf.d:/etc/nginx/conf.d
      - ./ssl:/etc/nginx/ssl
      - ./media:/app/media
      - ./logs/nginx:/var/log/nginx
    depends_on:
      - crm-api
    networks:
      - crm-network

  # 监控系统
  prometheus:
    image: prom/prometheus:latest
    container_name: crm-prometheus
    restart: unless-stopped
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    ports:
      - "9090:9090"
    networks:
      - crm-network

  grafana:
    image: grafana/grafana:latest
    container_name: crm-grafana
    restart: unless-stopped
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
    volumes:
      - grafana_data:/var/lib/grafana
      - ./monitoring/dashboards:/etc/grafana/provisioning/dashboards
    ports:
      - "3000:3000"
    networks:
      - crm-network

networks:
  crm-network:
    driver: bridge

volumes:
  postgres_data:
  redis_data:
  prometheus_data:
  grafana_data:

3.2 监控与告警配置

代码语言:yaml
复制
# monitoring/prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

alerting:
  alertmanagers:
    - static_configs:
        - targets:
          - alertmanager:9093

rule_files:
  - "alerts/*.yml"

scrape_configs:
  - job_name: 'crm-api'
    static_configs:
      - targets: ['crm-api:8000']
    metrics_path: '/metrics'
    scrape_interval: 10s

  - job_name: 'postgres'
    static_configs:
      - targets: ['postgres-exporter:9187']

  - job_name: 'redis'
    static_configs:
      - targets: ['redis-exporter:9121']

  - job_name: 'nginx'
    static_configs:
      - targets: ['nginx-exporter:9113']

四、安全与性能优化

4.1 安全防护措施

代码语言:python
复制
# security/middleware.py
from django.utils.deprecation import MiddlewareMixin
from django.conf import settings
import time
from ipware import get_client_ip

class SecurityMiddleware(MiddlewareMixin):
    """安全中间件"""
    
    def process_request(self, request):
        # 设置安全头部
        self.set_security_headers(request)
        
        # IP速率限制检查
        if not self.check_rate_limit(request):
            return self.rate_limit_response()
        
        # SQL注入防护
        self.sanitize_inputs(request)
        
        # XSS防护
        self.xss_protection(request)
        
        return None
    
    def set_security_headers(self, request):
        """设置安全HTTP头部"""
        response = self.get_response(request)
        
        security_headers = {
            'X-Content-Type-Options': 'nosniff',
            'X-Frame-Options': 'DENY',
            'X-XSS-Protection': '1; mode=block',
            'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
            'Content-Security-Policy': "default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval'; style-src 'self' 'unsafe-inline';",
            'Referrer-Policy': 'strict-origin-when-cross-origin',
            'Permissions-Policy': 'geolocation=(), microphone=(), camera=()'
        }
        
        for header, value in security_headers.items():
            response[header] = value
        
        return response
    
    def check_rate_limit(self, request):
        """检查速率限制"""
        client_ip, _ = get_client_ip(request)
        
        if not client_ip:
            return True
        
        # 使用Redis实现滑动窗口限流
        redis_key = f"rate_limit:{client_ip}:{int(time.time() // 60)}"
        
        try:
            from django_redis import get_redis_connection
            redis = get_redis_connection("default")
            
            # 当前分钟内的请求数
            current = redis.incr(redis_key)
            
            if current == 1:
                # 设置过期时间为61秒
                redis.expire(redis_key, 61)
            
            # 检查是否超过限制
            if current > settings.RATE_LIMIT_PER_MINUTE:
                return False
                
        except Exception as e:
            # Redis不可用时,跳过限流
            pass
        
        return True

4.2 数据库性能优化

代码语言:sql
复制
-- 数据库索引优化
-- accounts表索引
CREATE INDEX idx_accounts_tenant_created ON accounts(tenant_id, created_at DESC);
CREATE INDEX idx_accounts_industry ON accounts(tenant_id, industry) WHERE industry IS NOT NULL;
CREATE INDEX idx_accounts_owner ON accounts(tenant_id, owner_id) WHERE owner_id IS NOT NULL;

-- contacts表索引
CREATE INDEX idx_contacts_email ON contacts(tenant_id, email) WHERE email IS NOT NULL;
CREATE INDEX idx_contacts_account ON contacts(tenant_id, account_id) WHERE account_id IS NOT NULL;
CREATE INDEX idx_contacts_last_activity ON contacts(tenant_id, last_activity_at DESC) WHERE last_activity_at IS NOT NULL;

-- opportunities表分区
CREATE TABLE opportunities_2023 PARTITION OF opportunities
    FOR VALUES FROM ('2023-01-01') TO ('2024-01-01');
    
CREATE TABLE opportunities_2024 PARTITION OF opportunities
    FOR VALUES FROM ('2024-01-01') TO ('2025-01-01');

-- 物化视图用于报表
CREATE MATERIALIZED VIEW mv_sales_performance AS
SELECT 
    o.tenant_id,
    DATE_TRUNC('month', o.close_date) as month,
    o.owner_id,
    u.name as owner_name,
    COUNT(CASE WHEN o.stage = 'closed_won' THEN 1 END) as won_count,
    COUNT(CASE WHEN o.stage = 'closed_lost' THEN 1 END) as lost_count,
    SUM(CASE WHEN o.stage = 'closed_won' THEN o.amount ELSE 0 END) as won_amount,
    AVG(CASE WHEN o.stage = 'closed_won' THEN o.amount END) as avg_deal_size,
    AVG(EXTRACT(DAY FROM (o.closed_at - o.created_at))) as avg_sales_cycle
FROM opportunities o
JOIN users u ON o.owner_id = u.id
WHERE o.is_closed = true
    AND o.close_date >= DATE_TRUNC('year', CURRENT_DATE - INTERVAL '1 year')
GROUP BY 1, 2, 3, 4;

-- 创建刷新物化视图的存储过程
CREATE OR REPLACE PROCEDURE refresh_sales_performance_mv()
LANGUAGE plpgsql
AS $$
BEGIN
    REFRESH MATERIALIZED VIEW CONCURRENTLY mv_sales_performance;
END;
$$;

结语

企业级SaaS CRM系统的开发是一个复杂而系统的工程,需要综合考虑架构设计、数据安全、系统性能、可扩展性等多个方面。本文从多租户架构设计出发,详细介绍了PHP、Java、Python三种技术栈的实现方案,涵盖了客户管理、销售管道、营销自动化等核心模块。

在实际开发中,还需要考虑以下几点:

  1. 数据迁移策略:支持从传统CRM系统平滑迁移
  2. 第三方集成:与邮件系统、社交媒体、支付网关等集成
  3. 移动端支持:开发React Native或Flutter移动应用
  4. 国际化:支持多语言、多时区、多货币
  5. 合规性:满足GDPR、CCPA等数据保护法规

一个成功的SaaS CRM系统不仅是技术的堆砌,更是对业务流程的深刻理解和优化。通过合理的架构设计和持续的技术创新,才能构建出真正满足企业需求的客户关系管理系统。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言:现代企业级CRM系统的技术演进
  • 一、SaaS CRM系统架构设计
    • 1.1 多租户架构设计模式
    • 1.2 微服务架构设计
  • 二、核心模块设计与实现
    • 2.1 多租户数据隔离实现(Java + Spring Boot)
    • 2.2 客户管理模块(Python + Django)
    • 2.3 销售管道管理(PHP + Laravel)
    • 2.4 营销自动化服务(Python + Celery + Redis)
  • 三、系统部署与运维
    • 3.1 Docker容器化部署
    • 3.2 监控与告警配置
  • 四、安全与性能优化
    • 4.1 安全防护措施
    • 4.2 数据库性能优化
  • 结语
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档