
在数字化转型的浪潮中,电商数据采集已成为企业竞争力的重要组成部分。特别是亚马逊SP广告数据的精准采集,直接影响着企业的营销决策和ROI优化。本文将从云原生架构的角度,深入探讨如何构建一个高可用、高性能的数据采集系统,实现98%的SP广告数据采集成功率。

在云环境中,我们将数据采集系统拆分为多个独立的微服务,每个服务专注于特定的功能领域:
# docker-compose.yml - 微服务架构示例
version: '3.8'
services:
# API网关服务
api-gateway:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- auth-service
- scraper-service
- data-processor
networks:
- pangolin-network
# 认证服务
auth-service:
build: ./services/auth
environment:
- JWT_SECRET=${JWT_SECRET}
- REDIS_URL=redis://redis:6379
depends_on:
- redis
- postgres
networks:
- pangolin-network
# 爬虫调度服务
scraper-service:
build: ./services/scraper
environment:
- PANGOLIN_API_KEY=${PANGOLIN_API_KEY}
- RABBITMQ_URL=amqp://rabbitmq:5672
- REDIS_URL=redis://redis:6379
depends_on:
- rabbitmq
- redis
deploy:
replicas: 3
networks:
- pangolin-network
# 数据处理服务
data-processor:
build: ./services/processor
environment:
- MONGODB_URL=mongodb://mongodb:27017/pangolin
- ELASTICSEARCH_URL=http://elasticsearch:9200
depends_on:
- mongodb
- elasticsearch
networks:
- pangolin-network
# 任务队列服务
task-scheduler:
build: ./services/scheduler
environment:
- CRON_SCHEDULE=${CRON_SCHEDULE}
- RABBITMQ_URL=amqp://rabbitmq:5672
depends_on:
- rabbitmq
networks:
- pangolin-network
# 基础设施服务
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
networks:
- pangolin-network
rabbitmq:
image: rabbitmq:3-management
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=${RABBITMQ_PASSWORD}
volumes:
- rabbitmq_data:/var/lib/rabbitmq
networks:
- pangolin-network
mongodb:
image: mongo:6
environment:
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=${MONGO_PASSWORD}
volumes:
- mongodb_data:/data/db
networks:
- pangolin-network
elasticsearch:
image: elasticsearch:8.8.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
networks:
- pangolin-network
postgres:
image: postgres:15
environment:
- POSTGRES_DB=pangolin_auth
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- pangolin-network
volumes:
redis_data:
rabbitmq_data:
mongodb_data:
elasticsearch_data:
postgres_data:
networks:
pangolin-network:
driver: bridge# Dockerfile - 爬虫服务容器化
FROM python:3.11-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
wget \
gnupg \
unzip \
curl \
&& rm -rf /var/lib/apt/lists/*
# 安装Chrome浏览器
RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \
&& echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list \
&& apt-get update \
&& apt-get install -y google-chrome-stable \
&& rm -rf /var/lib/apt/lists/*
# 安装ChromeDriver
RUN CHROME_DRIVER_VERSION=`curl -sS chromedriver.storage.googleapis.com/LATEST_RELEASE` \
&& wget -O /tmp/chromedriver.zip http://chromedriver.storage.googleapis.com/$CHROME_DRIVER_VERSION/chromedriver_linux64.zip \
&& unzip /tmp/chromedriver.zip chromedriver -d /usr/local/bin/ \
&& rm /tmp/chromedriver.zip \
&& chmod +x /usr/local/bin/chromedriver
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 设置环境变量
ENV PYTHONPATH=/app
ENV CHROME_BIN=/usr/bin/google-chrome
ENV CHROME_DRIVER=/usr/local/bin/chromedriver
# 创建非root用户
RUN useradd -m -u 1000 scraper && chown -R scraper:scraper /app
USER scraper
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# 启动命令
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]# tencent_cloud_config.py - 腾讯云资源配置
import os
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.cvm.v20170312 import cvm_client, models as cvm_models
from tencentcloud.clb.v20180317 import clb_client, models as clb_models
class TencentCloudManager:
def __init__(self):
self.cred = credential.Credential(
os.getenv("TENCENT_SECRET_ID"),
os.getenv("TENCENT_SECRET_KEY")
)
self.region = "ap-beijing"
def setup_auto_scaling_group(self):
"""配置弹性伸缩组"""
config = {
"AutoScalingGroupName": "pangolin-scraper-asg",
"LaunchConfigurationId": "asc-xxx", # 启动配置ID
"MaxSize": 10, # 最大实例数
"MinSize": 2, # 最小实例数
"DesiredCapacity": 3, # 期望实例数
"VpcId": "vpc-xxx",
"SubnetIds": ["subnet-xxx"],
"HealthCheckType": "CLB",
"HealthCheckGracePeriod": 300,
"DefaultCooldown": 300,
"Tags": [
{
"Key": "Project",
"Value": "Pangolin"
},
{
"Key": "Environment",
"Value": "Production"
}
]
}
return config
def setup_load_balancer(self):
"""配置负载均衡器"""
try:
client = clb_client.ClbClient(self.cred, self.region)
# 创建负载均衡实例
req = clb_models.CreateLoadBalancerRequest()
req.LoadBalancerType = "OPEN"
req.LoadBalancerName = "pangolin-api-lb"
req.VpcId = "vpc-xxx"
req.SubnetId = "subnet-xxx"
req.ProjectId = 0
req.AddressIPVersion = "IPV4"
req.Tags = [
{
"TagKey": "Project",
"TagValue": "Pangolin"
}
]
resp = client.CreateLoadBalancer(req)
lb_id = resp.LoadBalancerIds[0]
# 配置监听器
self._setup_listeners(client, lb_id)
return lb_id
except Exception as e:
print(f"创建负载均衡器失败: {e}")
return None
def _setup_listeners(self, client, lb_id):
"""配置监听器"""
# HTTP监听器
req = clb_models.CreateListenerRequest()
req.LoadBalancerId = lb_id
req.Ports = [80]
req.Protocol = "HTTP"
req.ListenerNames = ["pangolin-http-listener"]
req.HealthCheck = {
"HealthSwitch": 1,
"TimeOut": 5,
"IntervalTime": 10,
"HealthNum": 3,
"UnHealthNum": 3,
"HttpCode": 200,
"HttpCheckPath": "/health",
"HttpCheckMethod": "GET"
}
client.CreateListener(req)
# HTTPS监听器
req.Ports = [443]
req.Protocol = "HTTPS"
req.ListenerNames = ["pangolin-https-listener"]
req.Certificate = {
"SSLMode": "UNIDIRECTIONAL",
"CertId": "cert-xxx" # SSL证书ID
}
client.CreateListener(req)
class CloudResourceMonitor:
def __init__(self):
self.cloud_manager = TencentCloudManager()
async def monitor_resource_usage(self):
"""监控云资源使用情况"""
metrics = {
"cpu_utilization": await self._get_cpu_metrics(),
"memory_utilization": await self._get_memory_metrics(),
"network_traffic": await self._get_network_metrics(),
"disk_usage": await self._get_disk_metrics()
}
# 根据指标自动调整资源
await self._auto_scale_resources(metrics)
return metrics
async def _auto_scale_resources(self, metrics):
"""根据指标自动调整资源"""
cpu_threshold = 70 # CPU使用率阈值
memory_threshold = 80 # 内存使用率阈值
if (metrics["cpu_utilization"] > cpu_threshold or
metrics["memory_utilization"] > memory_threshold):
# 触发扩容
await self._scale_out()
elif (metrics["cpu_utilization"] < 30 and
metrics["memory_utilization"] < 40):
# 触发缩容
await self._scale_in()
async def _scale_out(self):
"""扩容操作"""
print("🚀 检测到高负载,开始扩容...")
# 实现扩容逻辑
async def _scale_in(self):
"""缩容操作"""
print("📉 检测到低负载,开始缩容...")
# 实现缩容逻辑# cloud_storage_manager.py - 云存储管理
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from tencentcloud.cos.v20180517 import cos_client, models as cos_models
from tencentcloud.cdb.v20170320 import cdb_client, models as cdb_models
class CloudStorageManager:
def __init__(self):
self.cos_client = self._init_cos_client()
self.cdb_client = self._init_cdb_client()
self.bucket_name = "pangolin-data-bucket"
def _init_cos_client(self):
"""初始化对象存储客户端"""
cred = credential.Credential(
os.getenv("TENCENT_SECRET_ID"),
os.getenv("TENCENT_SECRET_KEY")
)
return cos_client.CosClient(cred, "ap-beijing")
def _init_cdb_client(self):
"""初始化云数据库客户端"""
cred = credential.Credential(
os.getenv("TENCENT_SECRET_ID"),
os.getenv("TENCENT_SECRET_KEY")
)
return cdb_client.CdbClient(cred, "ap-beijing")
async def store_raw_data(self, data: Dict, data_type: str = "sp_ads"):
"""存储原始数据到对象存储"""
try:
# 生成存储路径
date_path = datetime.now().strftime("%Y/%m/%d")
timestamp = int(datetime.now().timestamp())
object_key = f"raw_data/{data_type}/{date_path}/{timestamp}.json"
# 上传到COS
req = cos_models.PutObjectRequest()
req.Bucket = self.bucket_name
req.Key = object_key
req.Body = json.dumps(data, ensure_ascii=False).encode('utf-8')
req.ContentType = "application/json"
resp = self.cos_client.PutObject(req)
# 记录存储信息到数据库
await self._record_storage_info(object_key, data_type, len(data))
return object_key
except Exception as e:
print(f"存储原始数据失败: {e}")
return None
async def store_processed_data(self, data: List[Dict], data_type: str = "sp_ads"):
"""存储处理后的数据"""
try:
# 批量插入到云数据库
await self._batch_insert_to_cdb(data, data_type)
# 同时备份到对象存储
backup_key = await self.store_raw_data({
"processed_data": data,
"processed_at": datetime.now().isoformat(),
"count": len(data)
}, f"processed_{data_type}")
return backup_key
except Exception as e:
print(f"存储处理数据失败: {e}")
return None
async def _batch_insert_to_cdb(self, data: List[Dict], table_name: str):
"""批量插入数据到云数据库"""
# 这里使用云数据库的批量插入API
# 实际实现需要根据具体的数据库类型调整
pass
async def _record_storage_info(self, object_key: str, data_type: str, data_size: int):
"""记录存储信息"""
storage_info = {
"object_key": object_key,
"data_type": data_type,
"data_size": data_size,
"created_at": datetime.now().isoformat(),
"status": "stored"
}
# 插入到元数据表
await self._insert_metadata(storage_info)
async def setup_data_lifecycle(self):
"""设置数据生命周期管理"""
lifecycle_config = {
"Rules": [
{
"ID": "raw-data-lifecycle",
"Status": "Enabled",
"Filter": {
"Prefix": "raw_data/"
},
"Transitions": [
{
"Days": 30,
"StorageClass": "STANDARD_IA" # 30天后转为低频存储
},
{
"Days": 90,
"StorageClass": "ARCHIVE" # 90天后转为归档存储
}
],
"Expiration": {
"Days": 365 # 365天后删除
}
},
{
"ID": "processed-data-lifecycle",
"Status": "Enabled",
"Filter": {
"Prefix": "processed_data/"
},
"Transitions": [
{
"Days": 7,
"StorageClass": "STANDARD_IA"
},
{
"Days": 30,
"StorageClass": "ARCHIVE"
}
]
}
]
}
# 应用生命周期配置
req = cos_models.PutBucketLifecycleRequest()
req.Bucket = self.bucket_name
req.LifecycleConfiguration = lifecycle_config
self.cos_client.PutBucketLifecycle(req)
class DataBackupManager:
def __init__(self):
self.storage_manager = CloudStorageManager()
async def setup_cross_region_backup(self):
"""设置跨地域备份"""
backup_regions = ["ap-shanghai", "ap-guangzhou"]
for region in backup_regions:
await self._create_backup_bucket(region)
await self._setup_cross_region_replication(region)
async def _create_backup_bucket(self, region: str):
"""在指定地域创建备份存储桶"""
backup_bucket_name = f"pangolin-backup-{region}"
req = cos_models.PutBucketRequest()
req.Bucket = backup_bucket_name
# 创建地域客户端
cred = credential.Credential(
os.getenv("TENCENT_SECRET_ID"),
os.getenv("TENCENT_SECRET_KEY")
)
regional_client = cos_client.CosClient(cred, region)
try:
regional_client.PutBucket(req)
print(f"✅ 在 {region} 创建备份存储桶成功")
except Exception as e:
print(f"❌ 在 {region} 创建备份存储桶失败: {e}")
async def _setup_cross_region_replication(self, target_region: str):
"""设置跨地域复制"""
replication_config = {
"Role": f"qcs::cam::uin/{os.getenv('TENCENT_UIN')}:roleName/COS_QcsRole",
"Rules": [
{
"ID": f"backup-to-{target_region}",
"Status": "Enabled",
"Priority": 1,
"Filter": {
"Prefix": "raw_data/"
},
"Destination": {
"Bucket": f"qcs::cos:{target_region}::{self.storage_manager.bucket_name}",
"StorageClass": "STANDARD"
}
}
]
}
req = cos_models.PutBucketReplicationRequest()
req.Bucket = self.storage_manager.bucket_name
req.ReplicationConfiguration = replication_config
self.storage_manager.cos_client.PutBucketReplication(req)# cloud_monitoring.py - 云监控系统
import asyncio
from datetime import datetime, timedelta
from tencentcloud.monitor.v20180724 import monitor_client, models as monitor_models
from tencentcloud.cls.v20201016 import cls_client, models as cls_models
class CloudMonitoringSystem:
def __init__(self):
self.monitor_client = self._init_monitor_client()
self.cls_client = self._init_cls_client()
self.log_topic_id = "xxx-xxx-xxx" # 日志主题ID
def _init_monitor_client(self):
"""初始化云监控客户端"""
cred = credential.Credential(
os.getenv("TENCENT_SECRET_ID"),
os.getenv("TENCENT_SECRET_KEY")
)
return monitor_client.MonitorClient(cred, "ap-beijing")
def _init_cls_client(self):
"""初始化日志服务客户端"""
cred = credential.Credential(
os.getenv("TENCENT_SECRET_ID"),
os.getenv("TENCENT_SECRET_KEY")
)
return cls_client.ClsClient(cred, "ap-beijing")
async def setup_custom_metrics(self):
"""设置自定义监控指标"""
metrics = [
{
"MetricName": "scraping_success_rate",
"MetricCName": "采集成功率",
"Unit": "Percent",
"Period": [60, 300, 3600] # 1分钟、5分钟、1小时
},
{
"MetricName": "api_response_time",
"MetricCName": "API响应时间",
"Unit": "ms",
"Period": [60, 300]
},
{
"MetricName": "data_quality_score",
"MetricCName": "数据质量评分",
"Unit": "Score",
"Period": [300, 3600]
},
{
"MetricName": "concurrent_requests",
"MetricCName": "并发请求数",
"Unit": "Count",
"Period": [60]
}
]
for metric in metrics:
await self._create_custom_metric(metric)
async def _create_custom_metric(self, metric_config: Dict):
"""创建自定义指标"""
req = monitor_models.CreateServiceDiscoveryRequest()
# 实现自定义指标创建逻辑
pass
async def report_metrics(self, metrics_data: Dict):
"""上报监控数据"""
try:
req = monitor_models.PutMonitorDataRequest()
req.Metrics = []
for metric_name, value in metrics_data.items():
metric_data = {
"MetricName": metric_name,
"Value": value,
"Timestamp": int(datetime.now().timestamp())
}
req.Metrics.append(metric_data)
resp = self.monitor_client.PutMonitorData(req)
print(f"✅ 监控数据上报成功: {len(req.Metrics)} 个指标")
except Exception as e:
print(f"❌ 监控数据上报失败: {e}")
async def setup_alert_policies(self):
"""设置告警策略"""
alert_policies = [
{
"PolicyName": "采集成功率告警",
"MetricName": "scraping_success_rate",
"Condition": "LT", # 小于
"Threshold": 95, # 95%
"Period": 300, # 5分钟
"ContinuePeriod": 2 # 连续2个周期
},
{
"PolicyName": "API响应时间告警",
"MetricName": "api_response_time",
"Condition": "GT", # 大于
"Threshold": 5000, # 5秒
"Period": 60,
"ContinuePeriod": 3
},
{
"PolicyName": "并发请求数告警",
"MetricName": "concurrent_requests",
"Condition": "GT",
"Threshold": 100,
"Period": 60,
"ContinuePeriod": 1
}
]
for policy in alert_policies:
await self._create_alert_policy(policy)
async def _create_alert_policy(self, policy_config: Dict):
"""创建告警策略"""
req = monitor_models.CreateAlarmPolicyRequest()
req.PolicyName = policy_config["PolicyName"]
req.MonitorType = "MT_QCE"
req.Enable = 1
req.ProjectId = 0
# 配置告警条件
condition = {
"MetricName": policy_config["MetricName"],
"Period": policy_config["Period"],
"Operator": policy_config["Condition"],
"Value": str(policy_config["Threshold"]),
"ContinuePeriod": policy_config["ContinuePeriod"]
}
req.Conditions = [condition]
# 配置告警通知
req.EventConditions = []
req.NoticeIds = ["notice-xxx"] # 通知模板ID
try:
resp = self.monitor_client.CreateAlarmPolicy(req)
print(f"✅ 创建告警策略成功: {policy_config['PolicyName']}")
except Exception as e:
print(f"❌ 创建告警策略失败: {e}")
async def send_structured_logs(self, log_data: Dict):
"""发送结构化日志"""
try:
req = cls_models.UploadLogRequest()
req.TopicId = self.log_topic_id
req.HashKey = str(hash(log_data.get("request_id", "")))
# 构造日志内容
log_content = {
"timestamp": datetime.now().isoformat(),
"level": log_data.get("level", "INFO"),
"service": "pangolin-scraper",
"message": log_data.get("message", ""),
"request_id": log_data.get("request_id", ""),
"user_id": log_data.get("user_id", ""),
"keyword": log_data.get("keyword", ""),
"success_rate": log_data.get("success_rate", 0),
"response_time": log_data.get("response_time", 0),
"error_code": log_data.get("error_code", ""),
"error_message": log_data.get("error_message", "")
}
req.LogItems = [
{
"Time": int(datetime.now().timestamp()),
"Contents": [
{
"Key": key,
"Value": str(value)
} for key, value in log_content.items()
]
}
]
resp = self.cls_client.UploadLog(req)
except Exception as e:
print(f"发送结构化日志失败: {e}")
# 使用示例
monitoring = CloudMonitoringSystem()
async def monitor_scraping_task(keyword: str, user_id: str):
"""监控采集任务"""
request_id = f"req_{int(datetime.now().timestamp())}"
start_time = datetime.now()
try:
# 执行采集任务
result = await execute_scraping_task(keyword)
# 计算指标
end_time = datetime.now()
response_time = (end_time - start_time).total_seconds() * 1000
success_rate = calculate_success_rate(result)
# 上报监控数据
await monitoring.report_metrics({
"api_response_time": response_time,
"scraping_success_rate": success_rate,
"concurrent_requests": get_current_concurrent_count()
})
# 发送结构化日志
await monitoring.send_structured_logs({
"level": "INFO",
"message": "采集任务完成",
"request_id": request_id,
"user_id": user_id,
"keyword": keyword,
"success_rate": success_rate,
"response_time": response_time
})
return result
except Exception as e:
# 记录错误日志
await monitoring.send_structured_logs({
"level": "ERROR",
"message": "采集任务失败",
"request_id": request_id,
"user_id": user_id,
"keyword": keyword,
"error_code": type(e).__name__,
"error_message": str(e)
})
raise# intelligent_cache.py - 智能缓存系统
import redis
import json
import hashlib
from datetime import datetime, timedelta
from typing import Dict, Optional, Any
class IntelligentCacheManager:
def __init__(self):
self.redis_client = redis.Redis(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", 6379)),
db=0,
decode_responses=True
)
self.default_ttl = 3600 # 1小时
def _generate_cache_key(self, keyword: str, marketplace: str, filters: Dict = None) -> str:
"""生成缓存键"""
cache_data = {
"keyword": keyword.lower().strip(),
"marketplace": marketplace,
"filters": filters or {}
}
cache_string = json.dumps(cache_data, sort_keys=True)
return f"sp_ads:{hashlib.md5(cache_string.encode()).hexdigest()}"
async def get_cached_data(self, keyword: str, marketplace: str, filters: Dict = None) -> Optional[Dict]:
"""获取缓存数据"""
cache_key = self._generate_cache_key(keyword, marketplace, filters)
try:
cached_data = self.redis_client.get(cache_key)
if cached_data:
data = json.loads(cached_data)
# 检查数据新鲜度
cached_time = datetime.fromisoformat(data["cached_at"])
if self._is_data_fresh(cached_time, keyword):
print(f"🎯 缓存命中: {keyword}")
return data["result"]
else:
# 数据过期,删除缓存
self.redis_client.delete(cache_key)
except Exception as e:
print(f"获取缓存数据失败: {e}")
return None
async def set_cached_data(self, keyword: str, marketplace: str, data: Dict, filters: Dict = None):
"""设置缓存数据"""
cache_key = self._generate_cache_key(keyword, marketplace, filters)
# 根据关键词热度动态调整TTL
ttl = self._calculate_dynamic_ttl(keyword)
cache_data = {
"result": data,
"cached_at": datetime.now().isoformat(),
"keyword": keyword,
"marketplace": marketplace,
"ttl": ttl
}
try:
self.redis_client.setex(
cache_key,
ttl,
json.dumps(cache_data, ensure_ascii=False)
)
print(f"💾 缓存保存: {keyword} (TTL: {ttl}s)")
except Exception as e:
print(f"设置缓存数据失败: {e}")
def _is_data_fresh(self, cached_time: datetime, keyword: str) -> bool:
"""判断数据是否新鲜"""
now = datetime.now()
age = (now - cached_time).total_seconds()
# 根据关键词类型调整新鲜度要求
if self._is_trending_keyword(keyword):
max_age = 300 # 热门关键词5分钟
elif self._is_seasonal_keyword(keyword):
max_age = 1800 # 季节性关键词30分钟
else:
max_age = 3600 # 普通关键词1小时
return age < max_age
def _calculate_dynamic_ttl(self, keyword: str) -> int:
"""计算动态TTL"""
base_ttl = self.default_ttl
# 根据关键词特征调整TTL
if self._is_trending_keyword(keyword):
return base_ttl // 12 # 5分钟
elif self._is_seasonal_keyword(keyword):
return base_ttl // 2 # 30分钟
elif self._is_stable_keyword(keyword):
return base_ttl * 4 # 4小时
else:
return base_ttl # 1小时
def _is_trending_keyword(self, keyword: str) -> bool:
"""判断是否为热门关键词"""
trending_patterns = [
"black friday", "cyber monday", "prime day",
"christmas", "valentine", "mother's day"
]
return any(pattern in keyword.lower() for pattern in trending_patterns)
def _is_seasonal_keyword(self, keyword: str) -> bool:
"""判断是否为季节性关键词"""
seasonal_patterns = [
"summer", "winter", "spring", "fall",
"holiday", "back to school", "graduation"
]
return any(pattern in keyword.lower() for pattern in seasonal_patterns)
def _is_stable_keyword(self, keyword: str) -> bool:
"""判断是否为稳定关键词"""
stable_patterns = [
"phone case", "laptop", "book",
"kitchen", "home", "office"
]
return any(pattern in keyword.lower() for pattern in stable_patterns)
class CacheWarmupManager:
def __init__(self):
self.cache_manager = IntelligentCacheManager()
async def warmup_popular_keywords(self):
"""预热热门关键词缓存"""
popular_keywords = await self._get_popular_keywords()
for keyword_data in popular_keywords:
keyword = keyword_data["keyword"]
marketplace = keyword_data["marketplace"]
try:
# 检查缓存是否存在
cached_data = await self.cache_manager.get_cached_data(keyword, marketplace)
if not cached_data:
# 预热缓存
print(f"🔥 预热缓存: {keyword}")
data = await fetch_sp_ads_data(keyword, marketplace)
await self.cache_manager.set_cached_data(keyword, marketplace, data)
# 避免请求过于频繁
await asyncio.sleep(1)
except Exception as e:
print(f"预热关键词 {keyword} 失败: {e}")
async def _get_popular_keywords(self) -> List[Dict]:
"""获取热门关键词列表"""
# 从数据库或配置文件获取热门关键词
return [
{"keyword": "wireless earbuds", "marketplace": "amazon.com"},
{"keyword": "laptop stand", "marketplace": "amazon.com"},
{"keyword": "phone case", "marketplace": "amazon.com"},
# 更多热门关键词...
]# cost_optimization.py - 成本优化管理
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List
import numpy as np
class CostOptimizationManager:
def __init__(self):
self.cost_metrics = {
"api_calls": 0,
"compute_hours": 0,
"storage_gb": 0,
"bandwidth_gb": 0,
"total_cost": 0.0
}
self.cost_limits = {
"daily_limit": 1000.0, # 每日成本限制
"monthly_limit": 25000.0 # 每月成本限制
}
async def optimize_request_scheduling(self, requests: List[Dict]) -> List[Dict]:
"""优化请求调度以降低成本"""
# 按优先级和成本效益排序
prioritized_requests = self._prioritize_requests(requests)
# 批量处理相似请求
batched_requests = self._batch_similar_requests(prioritized_requests)
# 时间窗口优化
scheduled_requests = self._schedule_by_time_window(batched_requests)
return scheduled_requests
def _prioritize_requests(self, requests: List[Dict]) -> List[Dict]:
"""按优先级排序请求"""
def calculate_priority_score(request: Dict) -> float:
# 基础优先级
base_priority = request.get("priority", 5)
# 用户等级加权
user_tier_weight = {
"premium": 1.5,
"standard": 1.0,
"basic": 0.7
}.get(request.get("user_tier", "standard"), 1.0)
# 数据新鲜度需求
freshness_weight = 1.2 if request.get("require_fresh_data") else 1.0
# 成本效益比
estimated_cost = self._estimate_request_cost(request)
cost_efficiency = 1.0 / max(estimated_cost, 0.01)
return base_priority * user_tier_weight * freshness_weight * cost_efficiency
return sorted(requests, key=calculate_priority_score, reverse=True)
def _batch_similar_requests(self, requests: List[Dict]) -> List[List[Dict]]:
"""批量处理相似请求"""
batches = []
processed = set()
for i, request in enumerate(requests):
if i in processed:
continue
batch = [request]
processed.add(i)
# 查找相似请求
for j, other_request in enumerate(requests[i+1:], i+1):
if j in processed:
continue
if self._are_requests_similar(request, other_request):
batch.append(other_request)
processed.add(j)
# 限制批次大小
if len(batch) >= 10:
break
batches.append(batch)
return batches
def _are_requests_similar(self, req1: Dict, req2: Dict) -> bool:
"""判断两个请求是否相似"""
# 相同市场
if req1.get("marketplace") != req2.get("marketplace"):
return False
# 关键词相似度
keyword1 = req1.get("keyword", "").lower()
keyword2 = req2.get("keyword", "").lower()
# 简单的相似度计算
common_words = set(keyword1.split()) & set(keyword2.split())
total_words = set(keyword1.split()) | set(keyword2.split())
similarity = len(common_words) / len(total_words) if total_words else 0
return similarity > 0.6
def _schedule_by_time_window(self, batches: List[List[Dict]]) -> List[Dict]:
"""按时间窗口调度请求"""
scheduled = []
current_time = datetime.now()
# 定义成本较低的时间窗口(例如:凌晨时段)
low_cost_hours = [0, 1, 2, 3, 4, 5]
for batch in batches:
# 计算批次的总成本
batch_cost = sum(self._estimate_request_cost(req) for req in batch)
# 高成本批次安排在低成本时间窗口
if batch_cost > 50 and current_time.hour not in low_cost_hours:
# 延迟到下一个低成本时间窗口
next_low_cost_hour = min(h for h in low_cost_hours if h > current_time.hour)
if not next_low_cost_hour:
next_low_cost_hour = min(low_cost_hours)
scheduled_time = current_time.replace(
hour=next_low_cost_hour,
minute=0,
second=0,
microsecond=0
)
if scheduled_time <= current_time:
scheduled_time += timedelta(days=1)
for req in batch:
req["scheduled_time"] = scheduled_time.isoformat()
scheduled.extend(batch)
return scheduled
def _estimate_request_cost(self, request: Dict) -> float:
"""估算请求成本"""
base_cost = 0.05 # 基础API调用成本
# 根据市场调整成本
marketplace_multiplier = {
"amazon.com": 1.0,
"amazon.co.uk": 1.1,
"amazon.de": 1.1,
"amazon.jp": 1.2
}.get(request.get("marketplace", "amazon.com"), 1.0)
# 根据数据复杂度调整成本
complexity_multiplier = {
"basic": 1.0,
"detailed": 1.5,
"comprehensive": 2.0
}.get(request.get("data_level", "basic"), 1.0)
# 实时数据需求增加成本
realtime_multiplier = 1.3 if request.get("require_fresh_data") else 1.0
return base_cost * marketplace_multiplier * complexity_multiplier * realtime_multiplier
async def monitor_cost_usage(self):
"""监控成本使用情况"""
current_cost = await self._calculate_current_cost()
# 检查是否接近限制
daily_usage_rate = current_cost["daily"] / self.cost_limits["daily_limit"]
monthly_usage_rate = current_cost["monthly"] / self.cost_limits["monthly_limit"]
if daily_usage_rate > 0.8:
await self._trigger_cost_alert("daily", daily_usage_rate)
if monthly_usage_rate > 0.8:
await self._trigger_cost_alert("monthly", monthly_usage_rate)
# 自动调整策略
if daily_usage_rate > 0.9:
await self._apply_cost_reduction_measures()
async def _calculate_current_cost(self) -> Dict:
"""计算当前成本"""
# 从监控系统获取实际成本数据
return {
"daily": 750.0,
"monthly": 18500.0
}
async def _trigger_cost_alert(self, period: str, usage_rate: float):
"""触发成本告警"""
print(f"⚠️ 成本告警: {period} 使用率达到 {usage_rate:.1%}")
# 发送告警通知
alert_data = {
"type": "cost_alert",
"period": period,
"usage_rate": usage_rate,
"timestamp": datetime.now().isoformat()
}
# 这里可以集成邮件、短信或其他通知方式
await self._send_alert_notification(alert_data)
async def _apply_cost_reduction_measures(self):
"""应用成本削减措施"""
print("🔧 应用成本削减措施...")
measures = [
self._reduce_request_frequency,
self._increase_cache_usage,
self._defer_non_critical_requests,
self._optimize_resource_allocation
]
for measure in measures:
try:
await measure()
except Exception as e:
print(f"应用成本削减措施失败: {e}")
async def _reduce_request_frequency(self):
"""降低请求频率"""
# 增加请求间隔
print("📉 降低请求频率")
async def _increase_cache_usage(self):
"""增加缓存使用"""
# 延长缓存TTL
print("💾 增加缓存使用")
async def _defer_non_critical_requests(self):
"""延迟非关键请求"""
# 将低优先级请求延迟到低成本时间窗口
print("⏰ 延迟非关键请求")
async def _optimize_resource_allocation(self):
"""优化资源分配"""
# 调整计算资源配置
print("⚙️ 优化资源分配")
# 使用示例
cost_optimizer = CostOptimizationManager()
async def cost_aware_batch_processing(requests: List[Dict]):
"""成本感知的批量处理"""
# 优化请求调度
optimized_requests = await cost_optimizer.optimize_request_scheduling(requests)
# 监控成本使用
await cost_optimizer.monitor_cost_usage()
# 执行优化后的请求
results = []
for request in optimized_requests:
try:
result = await execute_optimized_request(request)
results.append(result)
except Exception as e:
print(f"执行请求失败: {e}")
return results# .github/workflows/deploy.yml - GitHub Actions配置
name: Deploy Pangolin Scraper
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
env:
TENCENT_SECRET_ID: ${{ secrets.TENCENT_SECRET_ID }}
TENCENT_SECRET_KEY: ${{ secrets.TENCENT_SECRET_KEY }}
DOCKER_REGISTRY: ccr.ccs.tencentyun.com/pangolin
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run tests
run: |
pytest tests/ --cov=src/ --cov-report=xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
build:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Login to Tencent Container Registry
uses: docker/login-action@v2
with:
registry: ccr.ccs.tencentyun.com
username: ${{ secrets.TCR_USERNAME }}
password: ${{ secrets.TCR_PASSWORD }}
- name: Build and push Docker image
uses: docker/build-push-action@v4
with:
context: .
push: true
tags: |
${{ env.DOCKER_REGISTRY }}/scraper:latest
${{ env.DOCKER_REGISTRY }}/scraper:${{ github.sha }}
cache-from: type=gha
cache-to: type=gha,mode=max
deploy:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Deploy to Tencent Cloud
run: |
# 安装腾讯云CLI
pip install tccli
# 配置CLI
tccli configure set secretId $TENCENT_SECRET_ID
tccli configure set secretKey $TENCENT_SECRET_KEY
tccli configure set region ap-beijing
# 更新容器服务
./scripts/deploy.sh ${{ github.sha }}
notify:
needs: [test, build, deploy]
runs-on: ubuntu-latest
if: always()
steps:
- name: Notify deployment status
uses: 8398a7/action-slack@v3
with:
status: ${{ job.status }}
channel: '#deployments'
webhook_url: ${{ secrets.SLACK_WEBHOOK }}# health_check.py - 健康检查系统
import asyncio
import aiohttp
import psutil
from datetime import datetime, timedelta
from typing import Dict, List, Optional
class HealthCheckManager:
def __init__(self):
self.health_endpoints = [
{"name": "api_gateway", "url": "http://localhost:80/health"},
{"name": "scraper_service", "url": "http://localhost:8001/health"},
{"name": "data_processor", "url": "http://localhost:8002/health"},
{"name": "task_scheduler", "url": "http://localhost:8003/health"}
]
self.system_thresholds = {
"cpu_percent": 80,
"memory_percent": 85,
"disk_percent": 90
}
async def run_comprehensive_health_check(self) -> Dict:
"""运行全面健康检查"""
health_report = {
"timestamp": datetime.now().isoformat(),
"overall_status": "healthy",
"services": {},
"system": {},
"dependencies": {}
}
# 检查服务健康状态
service_results = await self._check_services_health()
health_report["services"] = service_results
# 检查系统资源
system_results = await self._check_system_health()
health_report["system"] = system_results
# 检查外部依赖
dependency_results = await self._check_dependencies_health()
health_report["dependencies"] = dependency_results
# 确定整体状态
health_report["overall_status"] = self._determine_overall_status(
service_results, system_results, dependency_results
)
# 如果状态不健康,触发自动恢复
if health_report["overall_status"] != "healthy":
await self._trigger_auto_recovery(health_report)
return health_report
async def _check_services_health(self) -> Dict:
"""检查服务健康状态"""
results = {}
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session:
for endpoint in self.health_endpoints:
try:
start_time = datetime.now()
async with session.get(endpoint["url"]) as response:
end_time = datetime.now()
response_time = (end_time - start_time).total_seconds() * 1000
if response.status == 200:
data = await response.json()
results[endpoint["name"]] = {
"status": "healthy",
"response_time_ms": response_time,
"details": data
}
else:
results[endpoint["name"]] = {
"status": "unhealthy",
"response_time_ms": response_time,
"error": f"HTTP {response.status}"
}
except asyncio.TimeoutError:
results[endpoint["name"]] = {
"status": "unhealthy",
"error": "timeout"
}
except Exception as e:
results[endpoint["name"]] = {
"status": "unhealthy",
"error": str(e)
}
return results
async def _check_system_health(self) -> Dict:
"""检查系统健康状态"""
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
system_health = {
"cpu": {
"usage_percent": cpu_percent,
"status": "healthy" if cpu_percent < self.system_thresholds["cpu_percent"] else "warning"
},
"memory": {
"usage_percent": memory.percent,
"available_gb": memory.available / (1024**3),
"status": "healthy" if memory.percent < self.system_thresholds["memory_percent"] else "warning"
},
"disk": {
"usage_percent": disk.percent,
"free_gb": disk.free / (1024**3),
"status": "healthy" if disk.percent < self.system_thresholds["disk_percent"] else "warning"
}
}
return system_health
async def _check_dependencies_health(self) -> Dict:
"""检查外部依赖健康状态"""
dependencies = {
"redis": {"host": "localhost", "port": 6379},
"mongodb": {"host": "localhost", "port": 27017},
"rabbitmq": {"host": "localhost", "port": 5672},
"elasticsearch": {"host": "localhost", "port": 9200}
}
results = {}
for name, config in dependencies.items():
try:
# 简单的端口连接测试
reader, writer = await asyncio.wait_for(
asyncio.open_connection(config["host"], config["port"]),
timeout=5
)
writer.close()
await writer.wait_closed()
results[name] = {"status": "healthy"}
except Exception as e:
results[name] = {
"status": "unhealthy",
"error": str(e)
}
return results
def _determine_overall_status(self, services: Dict, system: Dict, dependencies: Dict) -> str:
"""确定整体健康状态"""
# 检查关键服务
critical_services = ["api_gateway", "scraper_service"]
for service in critical_services:
if services.get(service, {}).get("status") != "healthy":
return "critical"
# 检查系统资源
for resource, data in system.items():
if data.get("status") == "warning":
return "warning"
# 检查依赖
critical_dependencies = ["redis", "mongodb"]
for dep in critical_dependencies:
if dependencies.get(dep, {}).get("status") != "healthy":
return "critical"
return "healthy"
async def _trigger_auto_recovery(self, health_report: Dict):
"""触发自动恢复"""
print(f"🚨 检测到健康问题,开始自动恢复...")
recovery_actions = []
# 分析问题并确定恢复动作
if health_report["overall_status"] == "critical":
recovery_actions.extend([
self._restart_critical_services,
self._scale_up_resources,
self._clear_cache_if_needed
])
elif health_report["overall_status"] == "warning":
recovery_actions.extend([
self._optimize_resource_usage,
self._clear_temporary_files
])
# 执行恢复动作
for action in recovery_actions:
try:
await action(health_report)
print(f"✅ 恢复动作完成: {action.__name__}")
except Exception as e:
print(f"❌ 恢复动作失败: {action.__name__} - {e}")
async def _restart_critical_services(self, health_report: Dict):
"""重启关键服务"""
unhealthy_services = [
name for name, data in health_report["services"].items()
if data.get("status") != "healthy"
]
for service in unhealthy_services:
print(f"🔄 重启服务: {service}")
# 实现服务重启逻辑
await self._restart_service(service)
async def _restart_service(self, service_name: str):
"""重启指定服务"""
# 使用Docker或Kubernetes API重启服务
restart_commands = {
"api_gateway": "docker restart pangolin_api_gateway",
"scraper_service": "docker restart pangolin_scraper",
"data_processor": "docker restart pangolin_processor",
"task_scheduler": "docker restart pangolin_scheduler"
}
command = restart_commands.get(service_name)
if command:
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
print(f"✅ 服务 {service_name} 重启成功")
else:
print(f"❌ 服务 {service_name} 重启失败: {stderr.decode()}")
async def _scale_up_resources(self, health_report: Dict):
"""扩容资源"""
print("📈 开始资源扩容...")
# 实现自动扩容逻辑
async def _clear_cache_if_needed(self, health_report: Dict):
"""清理缓存"""
print("🧹 清理缓存...")
# 实现缓存清理逻辑
async def _optimize_resource_usage(self, health_report: Dict):
"""优化资源使用"""
print("⚙️ 优化资源使用...")
# 调整并发数
if health_report["system"]["cpu"]["usage_percent"] > 70:
await self._reduce_concurrency()
# 清理临时文件
await self._clear_temporary_files()
async def _clear_temporary_files(self):
"""清理临时文件"""
print("🗑️ 清理临时文件...")
# 实现临时文件清理逻辑
# 定时健康检查任务
async def scheduled_health_check():
"""定时健康检查"""
health_checker = HealthCheckManager()
while True:
try:
health_report = await health_checker.run_comprehensive_health_check()
# 记录健康状态
print(f"🏥 健康检查完成: {health_report['overall_status']}")
# 上报到监控系统
await report_health_metrics(health_report)
except Exception as e:
print(f"健康检查失败: {e}")
# 每5分钟检查一次
await asyncio.sleep(300)
async def report_health_metrics(health_report: Dict):
"""上报健康指标到监控系统"""
# 实现指标上报逻辑
pass通过本文的深入探讨,我们全面展示了如何基于云原生架构构建一个高效、可靠的亚马逊SP广告数据采集系统。Pangolinfo Scrape API凭借其98%的采集成功率,为企业提供了强有力的数据支撑。
云原生架构设计让系统具备了出色的弹性伸缩能力和高可用性,通过微服务拆分、容器化部署和自动化运维,确保了系统的稳定运行。智能缓存策略和成本优化机制有效降低了运营成本,提升了资源利用效率。多层数据验证和质量保障体系确保了数据的准确性和完整性。
对于电商企业而言,精准的SP广告数据不仅能够优化广告投放策略,更能够深入洞察市场趋势和竞争态势。通过Pangolin的云架构解决方案,企业能够实现数据驱动的决策制定,在激烈的市场竞争中占据先机。
随着人工智能和机器学习技术的不断发展,数据采集系统将朝着更加智能化的方向演进。我们预期未来的系统将具备自适应学习能力,能够根据市场变化自动调整采集策略,为企业提供更加精准和及时的数据服务。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。