在数字化转型浪潮中,客户关系管理系统已成为企业的核心基础设施。从最初简单的联系人管理,到如今涵盖销售自动化、营销管理、客户服务的综合性平台,SaaS模式的企业级CRM系统在架构设计、技术实现上面临着前所未有的挑战。本文将深入探讨多租户SaaS CRM系统的完整架构设计,并提供PHP、Java、Python三种主流技术栈的核心实现方案。
源码及演示:c.xsymz.icu
企业级SaaS CRM系统核心在于多租户架构设计,主要存在三种模式:
1. 独立数据库模式
2. 共享数据库,独立Schema模式
3. 共享数据库,共享Schema模式

现代SaaS CRM系统通常采用微服务架构,核心服务划分如下:
CRM微服务架构:
├── 租户管理服务 (Tenant Service)
├── 用户认证服务 (Auth Service)
├── 客户管理服务 (Account Service)
├── 销售管道服务 (Sales Pipeline Service)
├── 营销自动化服务 (Marketing Automation Service)
├── 客户服务支持 (Customer Support Service)
├── 报表分析服务 (Analytics Service)
└── 消息通知服务 (Notification Service)
// TenantContext.java - 租户上下文管理
@Component
public class TenantContext {
private static final ThreadLocal<String> CURRENT_TENANT = new ThreadLocal<>();
private static final ThreadLocal<User> CURRENT_USER = new ThreadLocal<>();
public static void setCurrentTenant(String tenantId) {
CURRENT_TENANT.set(tenantId);
}
public static String getCurrentTenant() {
return CURRENT_TENANT.get();
}
public static void clear() {
CURRENT_TENANT.remove();
CURRENT_USER.remove();
}
}
// TenantAwareDataSource.java - 多租户数据源路由
@Component
public class TenantAwareDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return TenantContext.getCurrentTenant();
}
@Override
public void afterPropertiesSet() {
// 动态配置多数据源
Map<Object, Object> targetDataSources = new HashMap<>();
// 从配置加载所有租户数据源
List<TenantConfig> tenants = tenantConfigRepository.findAll();
for (TenantConfig tenant : tenants) {
DataSource dataSource = buildDataSource(tenant);
targetDataSources.put(tenant.getTenantId(), dataSource);
}
setTargetDataSources(targetDataSources);
setDefaultTargetDataSource(targetDataSources.get("default"));
super.afterPropertiesSet();
}
private DataSource buildDataSource(TenantConfig tenant) {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(tenant.getJdbcUrl());
config.setUsername(tenant.getDbUsername());
config.setPassword(tenant.getDbPassword());
config.setMaximumPoolSize(20);
config.setMinimumIdle(5);
config.setConnectionTimeout(30000);
config.setIdleTimeout(600000);
config.setMaxLifetime(1800000);
return new HikariDataSource(config);
}
}
// TenantInterceptor.java - 租户拦截器
@Component
public class TenantInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request,
HttpServletResponse response,
Object handler) {
// 从请求头、子域名或JWT Token中获取租户ID
String tenantId = extractTenantId(request);
if (tenantId == null) {
throw new TenantNotFoundException("租户信息未找到");
}
// 验证租户状态是否有效
if (!tenantService.isTenantActive(tenantId)) {
throw new TenantInactiveException("租户已被禁用");
}
TenantContext.setCurrentTenant(tenantId);
return true;
}
@Override
public void afterCompletion(HttpServletRequest request,
HttpServletResponse response,
Object handler, Exception ex) {
TenantContext.clear();
}
private String extractTenantId(HttpServletRequest request) {
// 1. 从子域名获取
String host = request.getServerName();
if (host.contains(".")) {
String subdomain = host.split("\\.")[0];
if (!subdomain.equals("www") && !subdomain.equals("app")) {
return subdomain;
}
}
// 2. 从请求头获取
String headerTenant = request.getHeader("X-Tenant-ID");
if (StringUtils.hasText(headerTenant)) {
return headerTenant;
}
// 3. 从JWT Token获取
String authHeader = request.getHeader("Authorization");
if (StringUtils.hasText(authHeader) && authHeader.startsWith("Bearer ")) {
String token = authHeader.substring(7);
return jwtUtil.extractTenantId(token);
}
return null;
}
}# models.py - 客户数据模型
from django.db import models
from django.contrib.auth import get_user_model
from django.core.validators import EmailValidator, RegexValidator
from django.utils import timezone
from tenant_schemas.models import TenantMixin
from tenant_schemas.postgresql_backend.base import _check_schema_name
User = get_user_model()
class Tenant(TenantMixin):
"""租户模型"""
name = models.CharField(max_length=100, unique=True)
schema_name = models.CharField(max_length=63, unique=True,
validators=[_check_schema_name])
created_on = models.DateField(auto_now_add=True)
is_active = models.BooleanField(default=True)
trial_ends = models.DateTimeField(null=True, blank=True)
plan_type = models.CharField(max_length=20,
choices=[
('free', '免费版'),
('basic', '基础版'),
('professional', '专业版'),
('enterprise', '企业版')
],
default='free')
auto_create_schema = True
class Meta:
verbose_name = "租户"
verbose_name_plural = "租户"
class Account(models.Model):
"""客户/账户模型"""
ACCOUNT_TYPES = (
('customer', '客户'),
('partner', '合作伙伴'),
('competitor', '竞争对手'),
('prospect', '潜在客户'),
)
INDUSTRIES = (
('technology', '科技'),
('finance', '金融'),
('healthcare', '医疗健康'),
('manufacturing', '制造业'),
('retail', '零售'),
('education', '教育'),
('other', '其他'),
)
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE,
related_name='accounts')
name = models.CharField(max_length=255, verbose_name="客户名称")
account_type = models.CharField(max_length=20, choices=ACCOUNT_TYPES,
default='customer')
industry = models.CharField(max_length=50, choices=INDUSTRIES,
null=True, blank=True)
website = models.URLField(max_length=255, null=True, blank=True)
phone = models.CharField(max_length=50, null=True, blank=True)
email = models.EmailField(null=True, blank=True)
# 地址信息
billing_street = models.TextField(null=True, blank=True, verbose_name="账单街道")
billing_city = models.CharField(max_length=100, null=True, blank=True, verbose_name="账单城市")
billing_state = models.CharField(max_length=100, null=True, blank=True, verbose_name="账单省/州")
billing_postal_code = models.CharField(max_length=20, null=True, blank=True, verbose_name="账单邮编")
billing_country = models.CharField(max_length=100, null=True, blank=True, verbose_name="账单国家")
shipping_street = models.TextField(null=True, blank=True, verbose_name="收货街道")
shipping_city = models.CharField(max_length=100, null=True, blank=True, verbose_name="收货城市")
shipping_state = models.CharField(max_length=100, null=True, blank=True, verbose_name="收货省/州")
shipping_postal_code = models.CharField(max_length=20, null=True, blank=True, verbose_name="收货邮编")
shipping_country = models.CharField(max_length=100, null=True, blank=True, verbose_name="收货国家")
# 关系字段
owner = models.ForeignKey(User, on_delete=models.SET_NULL,
null=True, related_name='owned_accounts')
parent_account = models.ForeignKey('self', on_delete=models.SET_NULL,
null=True, blank=True,
related_name='child_accounts')
# 描述和标签
description = models.TextField(null=True, blank=True)
tags = models.ManyToManyField('Tag', blank=True, related_name='accounts')
# 元数据
annual_revenue = models.DecimalField(max_digits=15, decimal_places=2,
null=True, blank=True)
employee_count = models.IntegerField(null=True, blank=True)
rating = models.DecimalField(max_digits=3, decimal_places=2, default=0.0)
# 系统字段
created_by = models.ForeignKey(User, on_delete=models.SET_NULL,
null=True, related_name='created_accounts')
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
is_active = models.BooleanField(default=True)
class Meta:
verbose_name = "客户"
verbose_name_plural = "客户"
unique_together = ['tenant', 'name']
indexes = [
models.Index(fields=['tenant', 'name']),
models.Index(fields=['tenant', 'account_type']),
models.Index(fields=['tenant', 'industry']),
models.Index(fields=['tenant', 'created_at']),
]
def __str__(self):
return f"{self.name} ({self.get_account_type_display()})"
def get_primary_contact(self):
"""获取主要联系人"""
return self.contacts.filter(is_primary=True).first()
def get_open_opportunities(self):
"""获取进行中的商机"""
from sales.models import Opportunity
return Opportunity.objects.filter(
tenant=self.tenant,
account=self,
stage__in=['qualification', 'proposal', 'negotiation', 'closed_won']
)
def get_total_revenue(self):
"""计算客户总营收"""
from sales.models import Opportunity
won_opportunities = Opportunity.objects.filter(
tenant=self.tenant,
account=self,
stage='closed_won',
close_date__isnull=False
)
return won_opportunities.aggregate(
total=models.Sum('amount')
)['total'] or 0
# serializers.py - 客户序列化器
from rest_framework import serializers
from .models import Account, Contact
from django.db import transaction
class AccountSerializer(serializers.ModelSerializer):
"""客户序列化器"""
owner_name = serializers.CharField(source='owner.get_full_name',
read_only=True)
contact_count = serializers.SerializerMethodField()
opportunity_count = serializers.SerializerMethodField()
total_revenue = serializers.SerializerMethodField()
class Meta:
model = Account
fields = [
'id', 'name', 'account_type', 'industry', 'website',
'phone', 'email', 'billing_street', 'billing_city',
'billing_state', 'billing_postal_code', 'billing_country',
'shipping_street', 'shipping_city', 'shipping_state',
'shipping_postal_code', 'shipping_country', 'owner',
'owner_name', 'description', 'annual_revenue',
'employee_count', 'rating', 'contact_count',
'opportunity_count', 'total_revenue', 'created_at',
'updated_at', 'is_active'
]
read_only_fields = ['created_at', 'updated_at']
def get_contact_count(self, obj):
return obj.contacts.count()
def get_opportunity_count(self, obj):
return obj.opportunities.count()
def get_total_revenue(self, obj):
return obj.get_total_revenue()
def create(self, validated_data):
"""创建客户记录"""
request = self.context.get('request')
tenant = get_current_tenant()
with transaction.atomic():
# 设置租户和创建者
validated_data['tenant'] = tenant
validated_data['created_by'] = request.user if request else None
# 处理标签
tags_data = validated_data.pop('tags', [])
account = Account.objects.create(**validated_data)
# 添加标签
if tags_data:
account.tags.set(tags_data)
# 记录操作日志
ActivityLog.objects.create(
tenant=tenant,
user=request.user,
action_type='account_created',
object_type='account',
object_id=account.id,
description=f'创建客户: {account.name}',
ip_address=get_client_ip(request)
)
return account
# views.py - 客户视图
from rest_framework import viewsets, filters, status
from rest_framework.decorators import action
from rest_framework.response import Response
from django_filters.rest_framework import DjangoFilterBackend
from .models import Account, Contact
from .serializers import AccountSerializer, ContactSerializer
from .filters import AccountFilter
from .permissions import AccountPermissions
class AccountViewSet(viewsets.ModelViewSet):
"""客户视图集"""
queryset = Account.objects.all()
serializer_class = AccountSerializer
permission_classes = [AccountPermissions]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_class = AccountFilter
search_fields = ['name', 'email', 'phone', 'description']
ordering_fields = ['name', 'created_at', 'annual_revenue', 'rating']
ordering = ['-created_at']
def get_queryset(self):
"""按租户过滤查询集"""
tenant = get_current_tenant()
queryset = super().get_queryset().filter(
tenant=tenant,
is_active=True
)
# 添加预取相关对象
queryset = queryset.select_related('owner', 'parent_account')
queryset = queryset.prefetch_related('tags', 'contacts')
return queryset
@action(detail=True, methods=['get'])
def contacts(self, request, pk=None):
"""获取客户的所有联系人"""
account = self.get_object()
contacts = account.contacts.filter(is_active=True)
page = self.paginate_queryset(contacts)
if page is not None:
serializer = ContactSerializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = ContactSerializer(contacts, many=True)
return Response(serializer.data)
@action(detail=True, methods=['get'])
def opportunities(self, request, pk=None):
"""获取客户的所有商机"""
from sales.serializers import OpportunitySerializer
account = self.get_object()
opportunities = account.opportunities.all()
page = self.paginate_queryset(opportunities)
if page is not None:
serializer = OpportunitySerializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = OpportunitySerializer(opportunities, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def convert_to_lead(self, request, pk=None):
"""将客户转换为销售线索"""
account = self.get_object()
from leads.models import Lead
from leads.serializers import LeadSerializer
with transaction.atomic():
# 创建销售线索
lead = Lead.objects.create(
tenant=account.tenant,
first_name=account.name,
company=account.name,
email=account.email,
phone=account.phone,
source='account_conversion',
status='new',
assigned_to=account.owner,
account=account
)
# 记录活动日志
ActivityLog.objects.create(
tenant=account.tenant,
user=request.user,
action_type='account_converted_to_lead',
object_type='account',
object_id=account.id,
description=f'客户转换为销售线索: {account.name}',
metadata={
'lead_id': lead.id,
'account_id': account.id
}
)
serializer = LeadSerializer(lead)
return Response(serializer.data, status=status.HTTP_201_CREATED)
@action(detail=False, methods=['get'])
def dashboard_stats(self, request):
"""获取客户仪表板统计"""
tenant = get_current_tenant()
from django.db.models import Count, Sum, Avg
from django.utils import timezone
from datetime import timedelta
# 计算时间范围
thirty_days_ago = timezone.now() - timedelta(days=30)
stats = {
'total_accounts': self.get_queryset().count(),
'total_contacts': Contact.objects.filter(
tenant=tenant, is_active=True
).count(),
'new_accounts_last_30_days': self.get_queryset().filter(
created_at__gte=thirty_days_ago
).count(),
'accounts_by_type': dict(
self.get_queryset().values_list('account_type')
.annotate(count=Count('id'))
.values_list('account_type', 'count')
),
'accounts_by_industry': dict(
self.get_queryset().exclude(industry__isnull=True)
.values_list('industry')
.annotate(count=Count('id'))
.values_list('industry', 'count')
),
'average_rating': self.get_queryset().aggregate(
avg_rating=Avg('rating')
)['avg_rating'] or 0,
'top_accounts_by_revenue': list(
self.get_queryset().annotate(
revenue=Sum('opportunities__amount')
).exclude(revenue__isnull=True)
.order_by('-revenue')[:10]
.values('id', 'name', 'revenue')
)
}
return Response(stats)<?php
// app/Models/SalesPipeline.php
namespace App\Models;
use Illuminate\Database\Eloquent\Model;
use Illuminate\Database\Eloquent\SoftDeletes;
use App\Traits\TenantScoped;
class SalesPipeline extends Model
{
use SoftDeletes, TenantScoped;
protected $fillable = [
'tenant_id',
'name',
'description',
'stages',
'is_default',
'is_active',
'created_by',
'updated_by'
];
protected $casts = [
'stages' => 'array',
'is_default' => 'boolean',
'is_active' => 'boolean',
'created_at' => 'datetime',
'updated_at' => 'datetime'
];
protected $appends = ['stage_count', 'opportunity_count'];
/**
* 获取阶段数量
*/
public function getStageCountAttribute()
{
return count($this->stages ?? []);
}
/**
* 获取商机数量
*/
public function getOpportunityCountAttribute()
{
return $this->opportunities()->count();
}
/**
* 关联商机
*/
public function opportunities()
{
return $this->hasMany(Opportunity::class, 'pipeline_id');
}
/**
* 获取管道阶段配置
*/
public function getPipelineStages(): array
{
$defaultStages = [
[
'id' => 'lead',
'name' => '线索',
'probability' => 10,
'color' => '#FF6B6B',
'order' => 1
],
[
'id' => 'qualification',
'name' => '资质审查',
'probability' => 20,
'color' => '#4ECDC4',
'order' => 2
],
[
'id' => 'proposal',
'name' => '提案',
'probability' => 50,
'color' => '#45B7D1',
'order' => 3
],
[
'id' => 'negotiation',
'name' => '谈判',
'probability' => 80,
'color' => '#96CEB4',
'order' => 4
],
[
'id' => 'closed_won',
'name' => '已赢单',
'probability' => 100,
'color' => '#FECA57',
'order' => 5
],
[
'id' => 'closed_lost',
'name' => '已丢单',
'probability' => 0,
'color' => '#FF9FF3',
'order' => 6
]
];
return $this->stages ?: $defaultStages;
}
/**
* 获取下一个阶段
*/
public function getNextStage(string $currentStageId): ?array
{
$stages = $this->getPipelineStages();
$currentIndex = array_search($currentStageId, array_column($stages, 'id'));
if ($currentIndex !== false && isset($stages[$currentIndex + 1])) {
return $stages[$currentIndex + 1];
}
return null;
}
/**
* 获取管道统计数据
*/
public function getPipelineStats(): array
{
$stages = $this->getPipelineStages();
$stats = [];
foreach ($stages as $stage) {
$count = $this->opportunities()
->where('stage', $stage['id'])
->count();
$totalAmount = $this->opportunities()
->where('stage', $stage['id'])
->sum('amount');
$stats[] = [
'stage' => $stage,
'count' => $count,
'total_amount' => $totalAmount,
'weighted_amount' => $totalAmount * ($stage['probability'] / 100)
];
}
return $stats;
}
}
// app/Models/Opportunity.php
namespace App\Models;
use Illuminate\Database\Eloquent\Model;
use Illuminate\Database\Eloquent\SoftDeletes;
use App\Traits\TenantScoped;
class Opportunity extends Model
{
use SoftDeletes, TenantScoped;
protected $fillable = [
'tenant_id',
'name',
'account_id',
'contact_id',
'pipeline_id',
'stage',
'amount',
'currency',
'probability',
'close_date',
'description',
'source',
'owner_id',
'lost_reason',
'won_reason',
'is_closed',
'closed_at',
'created_by',
'updated_by'
];
protected $casts = [
'amount' => 'decimal:2',
'probability' => 'integer',
'close_date' => 'date',
'is_closed' => 'boolean',
'closed_at' => 'datetime',
'created_at' => 'datetime',
'updated_at' => 'datetime'
];
protected $dates = ['close_date', 'closed_at'];
protected $appends = ['expected_amount', 'days_in_stage'];
/**
* 获取预期金额
*/
public function getExpectedAmountAttribute()
{
return $this->amount * ($this->probability / 100);
}
/**
* 获取在当前阶段的停留天数
*/
public function getDaysInStageAttribute()
{
if (!$this->stage_updated_at) {
return null;
}
return $this->stage_updated_at->diffInDays(now());
}
/**
* 关联客户
*/
public function account()
{
return $this->belongsTo(Account::class);
}
/**
* 关联联系人
*/
public function contact()
{
return $this->belongsTo(Contact::class);
}
/**
* 关联销售管道
*/
public function pipeline()
{
return $this->belongsTo(SalesPipeline::class);
}
/**
* 关联负责人
*/
public function owner()
{
return $this->belongsTo(User::class, 'owner_id');
}
/**
* 关联活动记录
*/
public function activities()
{
return $this->morphMany(Activity::class, 'activityable');
}
/**
* 关联产品项
*/
public function products()
{
return $this->hasMany(OpportunityProduct::class);
}
/**
* 更新商机阶段
*/
public function updateStage(string $newStage, ?string $reason = null): bool
{
$oldStage = $this->stage;
$this->stage = $newStage;
$this->stage_updated_at = now();
// 更新概率
$stageConfig = collect($this->pipeline->getPipelineStages())
->where('id', $newStage)
->first();
if ($stageConfig) {
$this->probability = $stageConfig['probability'];
}
// 检查是否关闭
if (in_array($newStage, ['closed_won', 'closed_lost'])) {
$this->is_closed = true;
$this->closed_at = now();
if ($newStage === 'closed_won') {
$this->won_reason = $reason;
} else {
$this->lost_reason = $reason;
}
}
$saved = $this->save();
if ($saved) {
// 记录阶段变更历史
StageHistory::create([
'tenant_id' => $this->tenant_id,
'opportunity_id' => $this->id,
'old_stage' => $oldStage,
'new_stage' => $newStage,
'changed_by' => auth()->id(),
'change_reason' => $reason
]);
// 触发阶段变更事件
event(new OpportunityStageChanged($this, $oldStage, $newStage));
}
return $saved;
}
/**
* 计算销售预测
*/
public static function getSalesForecast(int $tenantId, string $period = 'quarter'): array
{
$query = self::where('tenant_id', $tenantId)
->where('is_closed', false)
->whereNotNull('close_date');
// 按周期过滤
if ($period === 'quarter') {
$query->whereBetween('close_date', [
now()->startOfQuarter(),
now()->endOfQuarter()
]);
} elseif ($period === 'month') {
$query->whereBetween('close_date', [
now()->startOfMonth(),
now()->endOfMonth()
]);
} elseif ($period === 'year') {
$query->whereBetween('close_date', [
now()->startOfYear(),
now()->endOfYear()
]);
}
$opportunities = $query->get();
$totalAmount = $opportunities->sum('amount');
$expectedAmount = $opportunities->sum('expected_amount');
$stages = $opportunities->groupBy('stage')->map(function ($items, $stage) {
return [
'count' => $items->count(),
'total_amount' => $items->sum('amount'),
'expected_amount' => $items->sum('expected_amount')
];
});
$owners = $opportunities->groupBy('owner_id')->map(function ($items) {
$owner = $items->first()->owner;
return [
'owner' => $owner ? $owner->name : '未分配',
'count' => $items->count(),
'total_amount' => $items->sum('amount'),
'expected_amount' => $items->sum('expected_amount')
];
});
return [
'period' => $period,
'total_opportunities' => $opportunities->count(),
'total_amount' => $totalAmount,
'expected_amount' => $expectedAmount,
'by_stage' => $stages,
'by_owner' => $owners,
'pipeline_health' => $this->calculatePipelineHealth($opportunities)
];
}
}# marketing/tasks.py - 营销自动化任务
from celery import shared_task
from datetime import datetime, timedelta
from django.db import transaction
from django.db.models import Q, Count
from django.core.mail import send_mail
from django.template.loader import render_to_string
from django.utils import timezone
import logging
import json
logger = logging.getLogger(__name__)
@shared_task(bind=True, max_retries=3)
def execute_campaign_workflow(self, campaign_id, contact_ids=None):
"""执行营销活动工作流"""
from .models import Campaign, Contact, CampaignLog
try:
campaign = Campaign.objects.get(id=campaign_id)
tenant = campaign.tenant
logger.info(f"开始执行营销活动: {campaign.name} (租户: {tenant.id})")
# 获取目标联系人
if contact_ids:
contacts = Contact.objects.filter(
tenant=tenant,
id__in=contact_ids,
is_active=True,
unsubscribed=False
)
else:
contacts = campaign.get_target_contacts()
total_contacts = contacts.count()
logger.info(f"目标联系人数量: {total_contacts}")
if total_contacts == 0:
logger.warning("没有找到目标联系人")
return {"status": "completed", "message": "No target contacts"}
# 执行工作流
workflow = campaign.workflow
executed_count = 0
error_count = 0
for contact in contacts:
try:
with transaction.atomic():
# 创建工作流执行记录
execution = CampaignLog.objects.create(
tenant=tenant,
campaign=campaign,
contact=contact,
status='pending',
workflow_data=workflow
)
# 执行工作流步骤
execute_workflow_steps(execution, workflow, contact)
execution.status = 'completed'
execution.completed_at = timezone.now()
execution.save()
executed_count += 1
except Exception as e:
logger.error(f"执行营销活动失败 (联系人: {contact.id}): {str(e)}")
error_count += 1
# 记录失败日志
CampaignLog.objects.create(
tenant=tenant,
campaign=campaign,
contact=contact,
status='failed',
error_message=str(e)
)
# 更新活动状态
campaign.last_executed = timezone.now()
campaign.execution_count += 1
campaign.save()
logger.info(f"营销活动执行完成: 成功 {executed_count}, 失败 {error_count}")
return {
"status": "completed",
"executed": executed_count,
"failed": error_count,
"total": total_contacts
}
except Exception as e:
logger.error(f"执行营销活动失败: {str(e)}")
raise self.retry(exc=e, countdown=60)
def execute_workflow_steps(execution, workflow, contact):
"""执行工作流步骤"""
steps = workflow.get('steps', [])
execution_data = execution.execution_data or {}
for step_index, step in enumerate(steps):
try:
step_type = step.get('type')
step_config = step.get('config', {})
# 记录步骤开始
execution_data[f'step_{step_index}'] = {
'type': step_type,
'started_at': timezone.now().isoformat(),
'status': 'processing'
}
execution.execution_data = execution_data
execution.save()
# 执行步骤
if step_type == 'send_email':
send_workflow_email(contact, step_config)
elif step_type == 'wait':
schedule_wait_step(execution.id, step_index, step_config)
elif step_type == 'condition':
handle_condition_step(contact, step_config, workflow, step_index)
elif step_type == 'update_contact':
update_contact_fields(contact, step_config)
elif step_type == 'add_to_list':
add_contact_to_list(contact, step_config)
elif step_type == 'webhook':
trigger_webhook(contact, step_config)
# 记录步骤完成
execution_data[f'step_{step_index}']['status'] = 'completed'
execution_data[f'step_{step_index}']['completed_at'] = timezone.now().isoformat()
except Exception as e:
logger.error(f"工作流步骤执行失败: {str(e)}")
execution_data[f'step_{step_index}']['status'] = 'failed'
execution_data[f'step_{step_index}']['error'] = str(e)
raise
execution.execution_data = execution_data
execution.save()
@shared_task
def send_workflow_email(contact, config):
"""发送工作流邮件"""
from django.conf import settings
template_id = config.get('template_id')
subject = config.get('subject', '')
from_email = config.get('from_email', settings.DEFAULT_FROM_EMAIL)
# 获取邮件模板
if template_id:
from .models import EmailTemplate
try:
template = EmailTemplate.objects.get(id=template_id, tenant=contact.tenant)
subject = template.subject
html_content = template.content
except EmailTemplate.DoesNotExist:
html_content = config.get('content', '')
else:
html_content = config.get('content', '')
# 个性化内容
personalization_vars = {
'{{contact.first_name}}': contact.first_name or '',
'{{contact.last_name}}': contact.last_name or '',
'{{contact.email}}': contact.email or '',
'{{contact.company}}': contact.account.name if contact.account else '',
'{{unsubscribe_url}}': f"{settings.SITE_URL}/unsubscribe/{contact.unsubscribe_token}"
}
for var, value in personalization_vars.items():
html_content = html_content.replace(var, value)
subject = subject.replace(var, value)
# 发送邮件
send_mail(
subject=subject,
message='', # 纯文本版本
html_message=html_content,
from_email=from_email,
recipient_list=[contact.email],
fail_silently=False
)
# 记录邮件发送
from .models import EmailLog
EmailLog.objects.create(
tenant=contact.tenant,
contact=contact,
campaign=config.get('campaign'),
subject=subject,
status='sent',
sent_at=timezone.now()
)
# marketing/models.py - 营销自动化模型
from django.db import models
from django.contrib.postgres.fields import JSONField, ArrayField
from django.utils import timezone
import uuid
class Campaign(models.Model):
"""营销活动模型"""
CAMPAIGN_TYPES = (
('email', '邮件营销'),
('sms', '短信营销'),
('social', '社交媒体'),
('event', '活动营销'),
('other', '其他')
)
CAMPAIGN_STATUS = (
('draft', '草稿'),
('active', '进行中'),
('paused', '已暂停'),
('completed', '已完成'),
('archived', '已归档')
)
tenant = models.ForeignKey('tenants.Tenant', on_delete=models.CASCADE)
name = models.CharField(max_length=255, verbose_name="活动名称")
campaign_type = models.CharField(max_length=20, choices=CAMPAIGN_TYPES, default='email')
status = models.CharField(max_length=20, choices=CAMPAIGN_STATUS, default='draft')
# 目标受众
target_list = models.ForeignKey('ContactList', on_delete=models.SET_NULL,
null=True, blank=True)
target_segment = JSONField(default=dict, blank=True,
verbose_name="目标受众细分条件")
# 工作流配置
workflow = JSONField(default=dict, verbose_name="工作流配置")
trigger_type = models.CharField(max_length=50, default='manual',
choices=[
('manual', '手动触发'),
('scheduled', '计划任务'),
('event', '事件触发'),
('api', 'API触发')
])
trigger_config = JSONField(default=dict, blank=True, verbose_name="触发器配置")
# 统计信息
total_contacts = models.IntegerField(default=0, verbose_name="总联系人")
sent_count = models.IntegerField(default=0, verbose_name="已发送")
opened_count = models.IntegerField(default=0, verbose_name="已打开")
clicked_count = models.IntegerField(default=0, verbose_name="已点击")
converted_count = models.IntegerField(default=0, verbose_name="已转化")
bounce_count = models.IntegerField(default=0, verbose_name="退回数")
# 时间信息
scheduled_for = models.DateTimeField(null=True, blank=True, verbose_name="计划时间")
started_at = models.DateTimeField(null=True, blank=True, verbose_name="开始时间")
completed_at = models.DateTimeField(null=True, blank=True, verbose_name="完成时间")
last_executed = models.DateTimeField(null=True, blank=True, verbose_name="最后执行时间")
execution_count = models.IntegerField(default=0, verbose_name="执行次数")
# 元数据
created_by = models.ForeignKey('users.User', on_delete=models.SET_NULL,
null=True, related_name='created_campaigns')
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
is_active = models.BooleanField(default=True)
class Meta:
verbose_name = "营销活动"
verbose_name_plural = "营销活动"
indexes = [
models.Index(fields=['tenant', 'status']),
models.Index(fields=['tenant', 'campaign_type']),
models.Index(fields=['tenant', 'scheduled_for']),
models.Index(fields=['tenant', 'created_at']),
]
ordering = ['-created_at']
def __str__(self):
return f"{self.name} ({self.get_campaign_type_display()})"
def get_target_contacts(self):
"""获取目标联系人"""
from contacts.models import Contact
queryset = Contact.objects.filter(
tenant=self.tenant,
is_active=True,
unsubscribed=False
)
# 如果指定了联系人列表
if self.target_list:
queryset = queryset.filter(lists=self.target_list)
# 应用细分条件
if self.target_segment:
queryset = self.apply_segment_filters(queryset)
return queryset
def apply_segment_filters(self, queryset):
"""应用细分过滤器"""
segment = self.target_segment
if segment.get('conditions'):
conditions = segment['conditions']
for condition in conditions:
field = condition.get('field')
operator = condition.get('operator')
value = condition.get('value')
if not all([field, operator]):
continue
# 构建查询条件
query = self.build_condition_query(field, operator, value)
if query:
queryset = queryset.filter(query)
return queryset
def build_condition_query(self, field, operator, value):
"""构建条件查询"""
from django.db.models import Q
field_mapping = {
'first_name': 'first_name',
'last_name': 'last_name',
'email': 'email',
'company': 'account__name',
'industry': 'account__industry',
'country': 'account__billing_country',
'created_at': 'created_at',
'last_activity': 'last_activity_at'
}
db_field = field_mapping.get(field)
if not db_field:
return None
if operator == 'equals':
return Q(**{f"{db_field}__iexact": value})
elif operator == 'not_equals':
return ~Q(**{f"{db_field}__iexact": value})
elif operator == 'contains':
return Q(**{f"{db_field}__icontains": value})
elif operator == 'not_contains':
return ~Q(**{f"{db_field}__icontains": value})
elif operator == 'starts_with':
return Q(**{f"{db_field}__istartswith": value})
elif operator == 'ends_with':
return Q(**{f"{db_field}__iendswith": value})
elif operator == 'greater_than':
return Q(**{f"{db_field}__gt": value})
elif operator == 'less_than':
return Q(**{f"{db_field}__lt": value})
elif operator == 'is_empty':
return Q(**{f"{db_field}__isnull": True}) | Q(**{f"{db_field}": ''})
elif operator == 'is_not_empty':
return ~Q(**{f"{db_field}__isnull": True}) & ~Q(**{f"{db_field}": ''})
return None
def calculate_metrics(self):
"""计算活动指标"""
if self.total_contacts == 0:
return {}
return {
'open_rate': round((self.opened_count / self.total_contacts) * 100, 2)
if self.total_contacts > 0 else 0,
'click_rate': round((self.clicked_count / self.total_contacts) * 100, 2)
if self.total_contacts > 0 else 0,
'conversion_rate': round((self.converted_count / self.total_contacts) * 100, 2)
if self.total_contacts > 0 else 0,
'bounce_rate': round((self.bounce_count / self.total_contacts) * 100, 2)
if self.total_contacts > 0 else 0,
'delivery_rate': round(((self.total_contacts - self.bounce_count) / self.total_contacts) * 100, 2)
if self.total_contacts > 0 else 0
}
def start_campaign(self):
"""启动营销活动"""
if self.status != 'draft':
raise ValueError("只有草稿状态的活动可以启动")
self.status = 'active'
self.started_at = timezone.now()
self.save()
# 异步执行活动
execute_campaign_workflow.delay(self.id)
return True# docker-compose.yml
version: '3.8'
services:
# 主应用服务
crm-api:
build:
context: .
dockerfile: Dockerfile.api
container_name: crm-api
restart: unless-stopped
env_file:
- .env
environment:
- DB_HOST=postgres
- REDIS_HOST=redis
- CELERY_BROKER_URL=redis://redis:6379/0
ports:
- "8000:8000"
depends_on:
- postgres
- redis
volumes:
- ./media:/app/media
- ./logs:/app/logs
networks:
- crm-network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
# Celery Worker
celery-worker:
build:
context: .
dockerfile: Dockerfile.worker
container_name: crm-celery-worker
restart: unless-stopped
env_file:
- .env
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
depends_on:
- redis
- postgres
volumes:
- ./media:/app/media
- ./logs:/app/logs
command: celery -A config.celery worker --loglevel=info --concurrency=4
networks:
- crm-network
# Celery Beat
celery-beat:
build:
context: .
dockerfile: Dockerfile.beat
container_name: crm-celery-beat
restart: unless-stopped
env_file:
- .env
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
depends_on:
- redis
- postgres
volumes:
- ./media:/app/media
- ./logs:/app/logs
command: celery -A config.celery beat --loglevel=info
networks:
- crm-network
# PostgreSQL数据库
postgres:
image: postgres:14-alpine
container_name: crm-postgres
restart: unless-stopped
environment:
- POSTGRES_DB=${DB_NAME}
- POSTGRES_USER=${DB_USER}
- POSTGRES_PASSWORD=${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init-scripts:/docker-entrypoint-initdb.d
ports:
- "5432:5432"
networks:
- crm-network
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${DB_USER}"]
interval: 10s
timeout: 5s
retries: 5
# Redis缓存
redis:
image: redis:7-alpine
container_name: crm-redis
restart: unless-stopped
command: redis-server --requirepass ${REDIS_PASSWORD}
volumes:
- redis_data:/data
ports:
- "6379:6379"
networks:
- crm-network
healthcheck:
test: ["CMD", "redis-cli", "--raw", "incr", "ping"]
interval: 10s
timeout: 5s
retries: 5
# Nginx反向代理
nginx:
image: nginx:1.23-alpine
container_name: crm-nginx
restart: unless-stopped
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf
- ./nginx/conf.d:/etc/nginx/conf.d
- ./ssl:/etc/nginx/ssl
- ./media:/app/media
- ./logs/nginx:/var/log/nginx
depends_on:
- crm-api
networks:
- crm-network
# 监控系统
prometheus:
image: prom/prometheus:latest
container_name: crm-prometheus
restart: unless-stopped
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
ports:
- "9090:9090"
networks:
- crm-network
grafana:
image: grafana/grafana:latest
container_name: crm-grafana
restart: unless-stopped
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
volumes:
- grafana_data:/var/lib/grafana
- ./monitoring/dashboards:/etc/grafana/provisioning/dashboards
ports:
- "3000:3000"
networks:
- crm-network
networks:
crm-network:
driver: bridge
volumes:
postgres_data:
redis_data:
prometheus_data:
grafana_data:# monitoring/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
rule_files:
- "alerts/*.yml"
scrape_configs:
- job_name: 'crm-api'
static_configs:
- targets: ['crm-api:8000']
metrics_path: '/metrics'
scrape_interval: 10s
- job_name: 'postgres'
static_configs:
- targets: ['postgres-exporter:9187']
- job_name: 'redis'
static_configs:
- targets: ['redis-exporter:9121']
- job_name: 'nginx'
static_configs:
- targets: ['nginx-exporter:9113']
# security/middleware.py
from django.utils.deprecation import MiddlewareMixin
from django.conf import settings
import time
from ipware import get_client_ip
class SecurityMiddleware(MiddlewareMixin):
"""安全中间件"""
def process_request(self, request):
# 设置安全头部
self.set_security_headers(request)
# IP速率限制检查
if not self.check_rate_limit(request):
return self.rate_limit_response()
# SQL注入防护
self.sanitize_inputs(request)
# XSS防护
self.xss_protection(request)
return None
def set_security_headers(self, request):
"""设置安全HTTP头部"""
response = self.get_response(request)
security_headers = {
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'X-XSS-Protection': '1; mode=block',
'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
'Content-Security-Policy': "default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval'; style-src 'self' 'unsafe-inline';",
'Referrer-Policy': 'strict-origin-when-cross-origin',
'Permissions-Policy': 'geolocation=(), microphone=(), camera=()'
}
for header, value in security_headers.items():
response[header] = value
return response
def check_rate_limit(self, request):
"""检查速率限制"""
client_ip, _ = get_client_ip(request)
if not client_ip:
return True
# 使用Redis实现滑动窗口限流
redis_key = f"rate_limit:{client_ip}:{int(time.time() // 60)}"
try:
from django_redis import get_redis_connection
redis = get_redis_connection("default")
# 当前分钟内的请求数
current = redis.incr(redis_key)
if current == 1:
# 设置过期时间为61秒
redis.expire(redis_key, 61)
# 检查是否超过限制
if current > settings.RATE_LIMIT_PER_MINUTE:
return False
except Exception as e:
# Redis不可用时,跳过限流
pass
return True
-- 数据库索引优化
-- accounts表索引
CREATE INDEX idx_accounts_tenant_created ON accounts(tenant_id, created_at DESC);
CREATE INDEX idx_accounts_industry ON accounts(tenant_id, industry) WHERE industry IS NOT NULL;
CREATE INDEX idx_accounts_owner ON accounts(tenant_id, owner_id) WHERE owner_id IS NOT NULL;
-- contacts表索引
CREATE INDEX idx_contacts_email ON contacts(tenant_id, email) WHERE email IS NOT NULL;
CREATE INDEX idx_contacts_account ON contacts(tenant_id, account_id) WHERE account_id IS NOT NULL;
CREATE INDEX idx_contacts_last_activity ON contacts(tenant_id, last_activity_at DESC) WHERE last_activity_at IS NOT NULL;
-- opportunities表分区
CREATE TABLE opportunities_2023 PARTITION OF opportunities
FOR VALUES FROM ('2023-01-01') TO ('2024-01-01');
CREATE TABLE opportunities_2024 PARTITION OF opportunities
FOR VALUES FROM ('2024-01-01') TO ('2025-01-01');
-- 物化视图用于报表
CREATE MATERIALIZED VIEW mv_sales_performance AS
SELECT
o.tenant_id,
DATE_TRUNC('month', o.close_date) as month,
o.owner_id,
u.name as owner_name,
COUNT(CASE WHEN o.stage = 'closed_won' THEN 1 END) as won_count,
COUNT(CASE WHEN o.stage = 'closed_lost' THEN 1 END) as lost_count,
SUM(CASE WHEN o.stage = 'closed_won' THEN o.amount ELSE 0 END) as won_amount,
AVG(CASE WHEN o.stage = 'closed_won' THEN o.amount END) as avg_deal_size,
AVG(EXTRACT(DAY FROM (o.closed_at - o.created_at))) as avg_sales_cycle
FROM opportunities o
JOIN users u ON o.owner_id = u.id
WHERE o.is_closed = true
AND o.close_date >= DATE_TRUNC('year', CURRENT_DATE - INTERVAL '1 year')
GROUP BY 1, 2, 3, 4;
-- 创建刷新物化视图的存储过程
CREATE OR REPLACE PROCEDURE refresh_sales_performance_mv()
LANGUAGE plpgsql
AS $$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY mv_sales_performance;
END;
$$;
企业级SaaS CRM系统的开发是一个复杂而系统的工程,需要综合考虑架构设计、数据安全、系统性能、可扩展性等多个方面。本文从多租户架构设计出发,详细介绍了PHP、Java、Python三种技术栈的实现方案,涵盖了客户管理、销售管道、营销自动化等核心模块。

在实际开发中,还需要考虑以下几点:
一个成功的SaaS CRM系统不仅是技术的堆砌,更是对业务流程的深刻理解和优化。通过合理的架构设计和持续的技术创新,才能构建出真正满足企业需求的客户关系管理系统。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。